随着数据日益膨胀,采用事件流处理数据至关重要。Apache Flink 将批流处理统一到计算引擎中。Apache Pulsar(与 Apache BookKeeper 一起)以 "流 "的方式统一数据。在 Pulsar 中,数据存储成一个副本,以流(streaming)(通过 pub-sub 接口)和 segment(用于批处理)的方式进行访问。 Pulsar 解决了企业在使用不同的存储和消息技术解决方案时遇到的数据孤岛问题。把 Flink 和 Pulsar 结合使用,这两种开源技术可以创建一个统一的数据架构,为实时数据驱动企业提供最佳解决方案。
Pulsar Flink connector 基于 Apache Pulsar 和 Apache Flink 提供弹性数据处理,允许 Apache Flink 读写 Apache Pulsar 中的数据。使用 Pulsar Flink Connector,企业能够更专注于业务逻辑,无需关注存储问题。
我们决定开发 Pulsar Flink Connector(连接器)时,Flink 和 Pulsar 社区都为之振奋,很多用户纷纷开始使用。借助 Pulsar Flink 连接器,惠普(HPE)构建了实时计算平台,BIGO 建了实时消息处理系统,知乎的团队也在评估连接器是否满足其内部需求。
随着越来越多的用户在使用 Pulsar Flink Connector,我们从社区中听到一个共同的问题:数据序列化的设计不恰当。Pulsar 消息是结构化的,且有数据序列化能力(Schema)。旧版本 Pulsar Flink 连接器的序列化机制侧重于 Pulsar 的序列化能力,对 Flink 序列化机制支持的较差。因此用户必须进行大量的配置才能使用连接器进行实时计算。
为了让 Pulsar Flink 连接器更方便使用,我们决定构建完全支持 Flink 序列化机制的功能,节省用户的配置工作量。
Pulsar Flink Connector 2.7.0 支持 Apache Pulsar 2.7.0 和 Apache Flink 1.12 版本中的所有功能,完全兼容 Flink 连接器和 Flink 消息格式。现在,您可以使用 Flink 中的重要功能,如 exactly-once Sink、upsert Pulsar、数据定义语言(DDL)计算列、水印和元数据;还可以使用 Pulsar 中的 Key-Shared 订阅模式,保证消息的处理顺序,而无需太多的配置。此外,您还可以根据自己的业务轻松定制配置。
下面,我们详细介绍Pulsar Flink Connector 2.7.0 的主要功能。
在有些场景下,用户需要消息严格保证消息顺序,才能保证业务处理正确。通常在消息严格保序的情况下,只能同时有一个消费者消费消息,才能保证顺序。这样会导致消息的吞吐量大幅度降低。Pulsar 为这样的场景设计了 Key-Shared 订阅模式,通过对消息增加 Key,将相同 Key Hash 的消息路由到同一个消息者上,这样既保证了消息的消费顺序,又提高了吞吐量。
我们在 Pulsar Flink 连接器中也添加了对该功能的支持。可以通过配置参数 enable-key-hash-range=true
启用这个功能。开启功能后,会根据任务的并行度划分每个消费者处理的 Key Hash 范围。
Pulsar 2.7 版本支持事务,极大增强了 Flink Sink 的容错能力。之前版本中,Sink 算子最多支持at-least-once 语义,这不能完全适配一些对 End-to-End 一致性要求比较高的场景,用户必须自己想办法维护去重的逻辑,这增加了用户编写代码的复杂程度。
在新版本的连接器中,得益于 Pulsar 强大的事务功能,我们设计了 exactly-once 的 sink 算子。Flink 框架早就已经对这种场景有了抽象地支持,其使用两阶段的提交协议来实现 Sink 算子的 Exactly-once 语义,该协议的主要生命周期方法包括 beginTransaction(), preCommit(), commit(), abort(), recoverAndCommit(), recoverAndAbort()。
我们的核心想法在于,用户在创建 sink 时可以自由选择算子的语义,内部的逻辑变化对用户代码是透明的。由于 Pulsar 的事务和 Flink 中两阶段提交协议的生命周期有诸多类似之处,我们可以用比较优雅地方式实现这项工作,而不要对 Pulsar 做过多侵入。
beginTransaction 以及 preCommit 的实现比较简单。只需要开启一个 Pulsar 事务,在 checkpoint 到来后持久化该事务的 TID。在 preCommit 阶段保证消息已经全部被 Flush 到 Pulsar 中,并且确保preCommit 成功的消息最终一定会被 Commit。
实现上需要思考的地方在于 recoverAndCommit 以及 recoverAndAbort 的情景。在 Kafka 连接器中,受限于 Kafka 的一些特性,使用了一些相对 hack 的方式来用于 recoverAndCommit 的工作。而 Pulsar 的事务由于不依赖于具体的 Producer,可以很容易的根据 TID 来接管事务的提交和撤回。
Pulsar 事务的灵活和高效与 Flink 优秀的架构抽象碰撞之后,Pulsar Flink 连接器具备了更大的潜力。同时,由于 Pulsar 的事务正处于活跃迭代状态,我们也会不断优化 Connector 中 Transactional Sink。
Flink 社区用户对 Upsert 模式消息队列有很高的需求,主要原因有三个:
基于这些需求,我们也实现了对 Upsert Pulsar 的支持。使用这个功能,用户能够以 upsert 的方式从 Pulsar 主题中读取数据和向 Pulsar 主题写入数据。
该功能统一了批处理流的来源,优化了任务发现和数据读取的机制。这也是我们实现 Pulsar 批处理和流统一的基石。另外新的 Table API 优化了设计,增加了 DDL 计算列、元数据定义等特性。
FLIP-107 使用户能够在表定义中以元数据列的形式访问连接器元数据。用户在实时计算场景下,通常会有读写消息正文外的一些拓展信息,比如 eventTime、自定义字段等。Pulsar Flink 连接器在支持 metadata 描述符后,用户可以灵活方便地读写 Pulsar 消息的元数据。
atomic
来支持 Pulsar 原生类型当 Flink 需要处理 Pulsar 原生类型时,可以使用 atomic
作为连接器格式。
如果想从 Pulsar Flink Connector 之前的版本迁移到 2.7.0,需要对 SQL 和 API 参数做相应调整。下面我们提供了每个参数的详细信息。
在 SQL 中,DDL 声明中 Pulsar 的配置参数发生了一些变化,主要是参数名发生了变化,值未发生改变。
connector.
前缀全部被移除。connector.type
参数名改为 connector
。connector.startup-mode
改为 scan.startup.mode
。properties.pulsar.reader.readername=testReaderName
。如果您在 Pulsar Flink Connector 中使用了 SQL,在迁移到 Pulsar Flink Connector 2.7.0 时需要相应调整 SQL 配置。以下为 SQL 配置调整示例。
旧版本SQL:
create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client_ip` VARCHAR,
`day` as TO_DATE(rtime),
`hour` as date_format(rtime,'HH')
) with (
'connector.type' ='pulsar',
'connector.version' = '1',
'connector.topic' ='persistent://public/default/test_flink_sql',
'connector.service-url' ='pulsar://xxx',
'connector.admin-url' ='http://xxx',
'connector.startup-mode' ='earliest',
'connector.properties.0.key' ='pulsar.reader.readerName',
'connector.properties.0.value' ='testReaderName',
'format.type' ='json',
'update-mode' ='append'
);
变更后的新版本 SQL:
create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client_ip` VARCHAR,
`day` as TO_DATE(rtime),
`hour` as date_format(rtime,'HH')
) with (
'connector' ='pulsar',
'topic' ='persistent://public/default/test_flink_sql',
'service-url' ='pulsar://xxx',
'admin-url' ='http://xxx',
'scan.startup.mode' ='earliest',
'properties.pulsar.reader.readername' = 'testReaderName',
'format' ='json');
从 API 的角度来看,我们调整了 一些类,更方便定制化。
FlinkPulsarSink
构造方法的签名,增加 PulsarSerializationSchema
。FlinkPulsarRowSink
、FlinkPulsarRowSource
。如果您需要处理 Row 格式,可以使用 Flink Row 相关的序列化组件。PulsarSerializationSchema
可以使用 PulsarSerializationSchemaWrapper.Builder
去创建。TopicKeyExtractor 的功能移动到了 PulsarSerializationSchemaWrapper,参考代码如下
new PulsarSerializationSchemaWrapper.Builder<>(new SimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();
我们基于新的 Flink Source API (FLIP-27),设计了一个集成 Pulsar Source 的批处理和流处理方案。Pulsar 采用分层架构,将数据分为流、批、冷三部分,从而实现无限容量。这让 Pulsar 成为统一批处理和流处理的理想解决方案。
基于 Flink Source API 的解决方案可以简单分为两个部分:SplitEnumerator 和 Reader。SplitEnumerator 发现并分配分区,Reader 从分区中读取数据。
Pulsar 将消息存储在 leader 块中,可以通过 Pulsar admin 获取 leader 信息,然后通过不同的分区策略提供 broker 分区、BookKeeper 分区、Offloader 分区等信息。更多详情可参考 issue #187。
Pulsar Flink Connector 2.7.0 已经发布,欢迎大家试用。新版本易用性更好,并支持 Pulsar 2.7 和 Flink 1.12 中的各种功能。我们把 Pulsar Flink Connector 2.7.0 贡献到 Flink 主仓库。如果你有任何疑问,欢迎提 issue。