sharetwitterlinkedIn

Apache Pulsar 集成 BigQuery 构建数据管道

February 03, 2022
head img

将数据卸载到公司的数据湖中是一项常见的数据工程任务。同时,你可能还希望在导入过程中转换和丰富这些数据,以便为分析做好准备。这篇博文将向你展示如何将 Apache Pulsar 与 Google BigQuery 集成,以在这方面提供一些相关的思路。

假设你将文件存储在外部文件系统中,并且你希望将内容导入到数据湖中,你需要构建一个数据管道来获取、转换和卸载数据,如下图所示。

数据管道利用 Apache Pulsar 作为消息总线,在将数据存储为更易读的格式(如 JSON)之前对数据执行简单的转换。然后,你可以将这些 JSON 记录卸载到你的数据湖中,并使用 Apache Pulsar Cloud Storage Sink 连接器将它们转换为更易于查询的格式,例如 parquet。

构建数据管道

在了解完需要实现的目标后,现在来看看构建此管道所需的各个步骤。

获取数据

首先,你需要从文件系统中读取数据并发送到 Pulsar 主题。下面的代码片段创建了一个在原始事件主题中写入 <String> 消息的生产者。

// 1. 加载输入数据文件。
List<String> events = IngestionUtils.loadEventData();

// 2. Pulsar 客户端示例。
PulsarClient pulsarClient = ClientUtils.initPulsarClient(Optional.empty());

// 3. 创建 Pulsar 生产者。
Producer<String> eventProducer = pulsarClient.newProducer(Schema.STRING)
                .topic(AppConfig.RAW_EVENTS_TOPIC)
                .producerName("raw-events-producer")
                .blockIfQueueFull(true)
                .create();

// 4. 发送一些消息。
for (String event: events) {
       eventProducer.newMessage()
                 .value(event)
                 .sendAsync()
                 .whenComplete(callback);
   }

该 Pulsar 主题包含文件内容。

转换数据

其次,你必须完成以下步骤来转换数据。

  • 从 raw_events 主题中读取 <String> 消息。
  • 将消息解析为 Event 对象。
  • 将消息以 JSON 格式写入下游的 parsed_events 主题。
  1. 通过使用实现以下签名 Function<String, Event>Pulsar Function 来读取消息。
public class EventParserFunc implements Function<String, Event> {
    private static Logger logger;

    @Override
    public Event process(String input, Context context) throws Exception {
        if (logger == null) {
            logger = context.getLogger();
        }
        logger.info("Received input: " + input);
        Event event = IngestionUtils.lineToEvent(input);
        logger.info("Parsed event: " + event);
        return event;
    }
}
  1. 使用以下配置文件部署 Function。
className: io.streamnative.functions.EventParserFunc
tenant: public
namespace: default
name: "event_parser_func"
inputs:
  - "persistent://public/default/raw_events"
output: "persistent://public/default/parsed_events"
parallelism: 1
logTopic: "persistent://public/default/parsed_events_logs"
autoAck: true
cleanupSubscription: true
subName: "parsed_events_sub"

这里重要的部分是 className,它是解析函数、输入主题名和输出主题名的路径。

  1. 运行 mvn clean package 命令打包代码并生成 jar 文件。
  2. 在集群上部署 Pulsar Function。
bin/pulsar-admin functions create \
 --function-config-file config/parser_func_config.yaml \
 --jar myjars/examples.jar

--function-config-file 指向配置文件,--jar 选项指定 jar 文件的路径。

你已成功部署转换消息的 Pulsar Function。

将数据卸载到 Google Cloud Storage

第三步是部署 Cloud Sink 连接器,它将监听 parsed_events 主题并将传入的消息以 Avro 格式存储到 Google Cloud Storage 中。

  1. 通过提供如下所示的配置文件来部署连接器。
tenant: "public"
namespace: "default"
name: "gcs-sink"
inputs:
  - "persistent://public/default/parsed_events"
parallelism: 1

configs:
  provider: "google-cloud-storage"
  gcsServiceAccountKeyFileContent: >
    {
      "type": "service_account",
      "project_id": "",
      "private_key_id": "",
      "private_key": "",
      "client_email": "",
      "client_id": "",
      "auth_uri": "",
      "token_uri": "",
      "auth_provider_x509_cert_url": "",
      "client_x509_cert_url": ""
    }

  bucket: "eventsbucket311"
  region: "us-west1"
  endpoint: "https://storage.googleapis.com/"
  formatType: "parquet"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 10000
  batchTimeMs: 60000

configs 部分包括你要调整以设置连接器的不同配置。你可以在连接器仓库中找到所有可用的配置选项。让我们看一下上面的示例代码。

Let’s walk through the example code above.

首先,将连接证书指定为配置的一部分(你也可以传递一个文件),然后指定 formatType(parquet)partitionerType(time) 以根据日期进行分区。通常在流式管道中,最好避免生成太多小文件,因为如果数据太大,它们会减慢查询速度。在此场景中,每 10,000 条消息会创建一个新文件。

  1. 通过运行以下命令在集群上部署连接器:
bin/pulsar-admin sink create \
--sink-config-file config/gcs_sink.yaml \
--name gcs-sink --archive connectors/pulsar-io-cloud-storage-2.8.1.30.nar

--sink-config-file 提供配置文件的路径,--name 指定连接器名称,--archive 指定 .nar 文件位置。

随着 Pulsar Function 和连接器的启动运行,你可以执行生产者代码生成一些消息。示例数据文件包含 49,999 行(包括标题在内共 50,000 行)。

  1. 运行生产者,然后打开 Google Cloud Storage 控制台,你可以在其中验证是否获取了新文件,以及所有记录是否都已计入。

在 Google Cloud 上查询

在 Google Cloud Storage 中,你应该会看到一个新文件夹,其中包含你在 Pulsar 中创建主题时指定的租户名称。

在上一节中,示例代码使用 public 租户,因此你文件夹结构应该是 public -> default -> topic name -> date

在进入 BigQuery 查询数据之前,你需要设置一个数据集,然后根据 Google Cloud Storage 上的 parquet 文件创建一个新表。

之前,你指定批量大小(即 batchSize)为 10,000 条记录,并且发送了 49,999 条消息。你应该看到 5 个 parquet 文件。

现在,你可以进入 BigQuery,根据数据来创建表,并开始执行查询,以验证一切是否就绪。

  1. 创建数据集。

  1. 创建表。

  1. 通过运行 Select * 来验证你的所有数据是否就绪并可供进一步分析。

祝贺你,你已经成功地将 Apache Pulsar 与 BigQuery 集成!

相关资源

译者信息

姚余钱@深圳觉行科技有限公司,致力于医疗大数据领域。热衷开源,活跃于 Apache Pulsar 社区。

© 北京原流科技有限公司Apache、Apache Pulsar、Apache BookKeeper、Apache Flink 及相关开源项目名称均为 Apache 软件基金会商标。条款隐私