将数据卸载到公司的数据湖中是一项常见的数据工程任务。同时,你可能还希望在导入过程中转换和丰富这些数据,以便为分析做好准备。这篇博文将向你展示如何将 Apache Pulsar 与 Google BigQuery 集成,以在这方面提供一些相关的思路。
假设你将文件存储在外部文件系统中,并且你希望将内容导入到数据湖中,你需要构建一个数据管道来获取、转换和卸载数据,如下图所示。
数据管道利用 Apache Pulsar 作为消息总线,在将数据存储为更易读的格式(如 JSON)之前对数据执行简单的转换。然后,你可以将这些 JSON 记录卸载到你的数据湖中,并使用 Apache Pulsar Cloud Storage Sink 连接器将它们转换为更易于查询的格式,例如 parquet。
在了解完需要实现的目标后,现在来看看构建此管道所需的各个步骤。
首先,你需要从文件系统中读取数据并发送到 Pulsar 主题。下面的代码片段创建了一个在原始事件主题中写入 <String>
消息的生产者。
// 1. 加载输入数据文件。
List<String> events = IngestionUtils.loadEventData();
// 2. Pulsar 客户端示例。
PulsarClient pulsarClient = ClientUtils.initPulsarClient(Optional.empty());
// 3. 创建 Pulsar 生产者。
Producer<String> eventProducer = pulsarClient.newProducer(Schema.STRING)
.topic(AppConfig.RAW_EVENTS_TOPIC)
.producerName("raw-events-producer")
.blockIfQueueFull(true)
.create();
// 4. 发送一些消息。
for (String event: events) {
eventProducer.newMessage()
.value(event)
.sendAsync()
.whenComplete(callback);
}
该 Pulsar 主题包含文件内容。
其次,你必须完成以下步骤来转换数据。
<String>
消息。Function<String, Event>
的 Pulsar Function 来读取消息。public class EventParserFunc implements Function<String, Event> {
private static Logger logger;
@Override
public Event process(String input, Context context) throws Exception {
if (logger == null) {
logger = context.getLogger();
}
logger.info("Received input: " + input);
Event event = IngestionUtils.lineToEvent(input);
logger.info("Parsed event: " + event);
return event;
}
}
className: io.streamnative.functions.EventParserFunc
tenant: public
namespace: default
name: "event_parser_func"
inputs:
- "persistent://public/default/raw_events"
output: "persistent://public/default/parsed_events"
parallelism: 1
logTopic: "persistent://public/default/parsed_events_logs"
autoAck: true
cleanupSubscription: true
subName: "parsed_events_sub"
这里重要的部分是 className
,它是解析函数、输入主题名和输出主题名的路径。
mvn clean package
命令打包代码并生成 jar 文件。bin/pulsar-admin functions create \
--function-config-file config/parser_func_config.yaml \
--jar myjars/examples.jar
--function-config-file
指向配置文件,--jar
选项指定 jar 文件的路径。
你已成功部署转换消息的 Pulsar Function。
第三步是部署 Cloud Sink 连接器,它将监听 parsed_events 主题并将传入的消息以 Avro 格式存储到 Google Cloud Storage 中。
tenant: "public"
namespace: "default"
name: "gcs-sink"
inputs:
- "persistent://public/default/parsed_events"
parallelism: 1
configs:
provider: "google-cloud-storage"
gcsServiceAccountKeyFileContent: >
{
"type": "service_account",
"project_id": "",
"private_key_id": "",
"private_key": "",
"client_email": "",
"client_id": "",
"auth_uri": "",
"token_uri": "",
"auth_provider_x509_cert_url": "",
"client_x509_cert_url": ""
}
bucket: "eventsbucket311"
region: "us-west1"
endpoint: "https://storage.googleapis.com/"
formatType: "parquet"
partitionerType: "time"
timePartitionPattern: "yyyy-MM-dd"
timePartitionDuration: "1d"
batchSize: 10000
batchTimeMs: 60000
configs
部分包括你要调整以设置连接器的不同配置。你可以在连接器仓库中找到所有可用的配置选项。让我们看一下上面的示例代码。
Let’s walk through the example code above.
首先,将连接证书指定为配置的一部分(你也可以传递一个文件),然后指定 formatType(parquet)
和 partitionerType(time)
以根据日期进行分区。通常在流式管道中,最好避免生成太多小文件,因为如果数据太大,它们会减慢查询速度。在此场景中,每 10,000 条消息会创建一个新文件。
bin/pulsar-admin sink create \
--sink-config-file config/gcs_sink.yaml \
--name gcs-sink --archive connectors/pulsar-io-cloud-storage-2.8.1.30.nar
--sink-config-file
提供配置文件的路径,--name
指定连接器名称,--archive
指定 .nar 文件位置。
随着 Pulsar Function 和连接器的启动运行,你可以执行生产者代码生成一些消息。示例数据文件包含 49,999 行(包括标题在内共 50,000 行)。
在 Google Cloud Storage 中,你应该会看到一个新文件夹,其中包含你在 Pulsar 中创建主题时指定的租户名称。
在上一节中,示例代码使用 public 租户,因此你文件夹结构应该是 public -> default -> topic name -> date
。
在进入 BigQuery 查询数据之前,你需要设置一个数据集,然后根据 Google Cloud Storage 上的 parquet 文件创建一个新表。
之前,你指定批量大小(即 batchSize)为 10,000 条记录,并且发送了 49,999 条消息。你应该看到 5 个 parquet 文件。
现在,你可以进入 BigQuery,根据数据来创建表,并开始执行查询,以验证一切是否就绪。
祝贺你,你已经成功地将 Apache Pulsar 与 BigQuery 集成!
姚余钱@深圳觉行科技有限公司,致力于医疗大数据领域。热衷开源,活跃于 Apache Pulsar 社区。