sharetwitterlinkedIn

批流融合升级:Apache Flink 1.15.0 发布 Pulsar Flink Sink Connector

July 13, 2022
head img

背景

随着数据日益膨胀,采用事件流处理数据至关重要。Apache Pulsar 与 Apache Flink 同时支持批流,使得 Pulsar 和 Flink 先天就是契合的伙伴。把 Flink 和 Pulsar 结合使用,这两种开源技术可以创建一个统一的数据架构,为实时数据驱动企业提供最佳解决方案。

为了将 Pulsar 与 Flink 的功能进行整合,为用户提供更强大的开发能力,StreamNative 开发并开源了 Pulsar Flink Connector。Pulsar Flink Connector 基于 Apache Pulsar 和 Apache Flink 提供弹性数据处理,允许 Apache Flink 读写 Apache Pulsar 中的数据。使用 Pulsar Flink Connector,企业能够更专注于业务逻辑,无需关注存储问题。

在 2021 年 9 月,Flink 1.14.0 版本中发布了 Pulsar Flink Connector - Source 连接器。近期,Apache Flink 1.15.0 版本发布,本次 Flink 版本搭载了 Pulsar Flink Connector - Sink 连接器。Pulsar Flink Connector Source 和 Sink 连接器模块合并到 Flink 主仓库,这将方便用户推动 Pulsar 与 Flink 批流一体方案的落地。

Pulsar Sink 基于 Flink 最新的 Sink API实现。Pulsar Sink 连接器可以将经过 Flink 处理后的数据写入一个或多个 Pulsar Topic 或者 Topic 下的某些分区。

如果想要使用旧版的使用 SinkFuntion 接口实现的 Sink 连接器,可以使用 StreamNative 维护的 pulsar-flink。在介绍新的连接器之前,我们先来了解 Sink 连接器的设计思路。

设计思想简述

Pulsar Sink 遵循 FLIP-191 中定义的 Sink API 设计。

无状态的 SinkWriter

EXACTLY_ONCE 一致性下,Pulsar Sink 不会将事务相关的信息存放于检查点快照中。这意味着当 Flink 应用重启时,Pulsar Sink 会创建新的事务实例。上一次运行过程中任何未提交事务中的消息会因为超时中止而无法被下游的消费者所消费。这样的设计保证了 SinkWriter 是无状态的。

Pulsar Schema Evolution

Pulsar Schema Evolution 允许用户在一个 Flink 应用程序中使用的数据模型发生特定改变后(比如向基于 ARVO 的 POJO 类中增加或删除一个字段),仍能使用同一个 Flink 应用程序的代码。可以在 Pulsar 集群内指定哪些类型的数据模型的改变是被允许的,详情请参阅 Pulsar Schema Evolution

Pulsar Flink Sink Connector 简介

使用示例

Pulsar Sink 使用 builder 类来创建 PulsarSink 实例。下面示例展示了如何通过 Pulsar Sink 以“至少一次”的语义将字符串类型的数据发送给 topic1。

Java

DataStream<String> stream = ...

PulsarSink<String> sink = PulsarSink.builder()
    .setServiceUrl(serviceUrl)
    .setAdminUrl(adminUrl)
    .setTopics("topic1")
    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();

stream.sinkTo(sink);

Python

下列为创建一个 PulsarSink 实例必需的属性:

  • Pulsar 数据消费的地址,使用 setServiceUrl(String) 方法提供。
  • Pulsar HTTP 管理地址,使用 setAdminUrl(String) 方法提供。
  • 需要发送到的 Topic 或者是 Topic 下面的分区,详见指定写入的topic或者topic分区
  • 编码 Pulsar 消息的序列化器,详见序列化器

在创建 PulsarSink 时,建议使用 setProducerName(String) 来指定 PulsarSink 内部使用的 Pulsar 生产者名称。这样方便在数据监控页面找到对应的生产者监控指标。

指定写入的 Topic 或者 Topic 分区

PulsarSink 指定写入 Topic 的方式和 Pulsar Source 指定消费的 Topic 或者 Topic 分区的方式类似。PulsarSink 支持以 mixin 风格指定写入的 Topic 或分区。因此,可以指定一组 Topic 或者分区或者是两者都有。

Java

// Topic "some-topic1" 和 "some-topic2"
PulsarSink.builder().setTopics("some-topic1", "some-topic2")

// Topic "topic-a" 的分区 0 和 2
PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")

// Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")

Python

# Topic "some-topic1" 和 "some-topic2"
PulsarSink.builder().set_topics(["some-topic1", "some-topic2"])

# Topic "topic-a" 的分区 0 和 2
PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2"])

# Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2", "some-topic2"])

动态分区发现默认处于开启状态,这意味着 PulsarSink 将会周期性地从 Pulsar 集群中查询 Topic 的元数据来获取可能有的分区数量变更信息。使用 PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL 配置项来指定查询的间隔时间。

可以选择实现 TopicRouter 接口来自定义消息路由策略。此外,阅读 Topic 名称简写将有助于理解 Pulsar 的分区在 Pulsar 连接器中的配置方式。

如果在 PulsarSink 中同时指定了某个 Topic 和其下属的分区,那么 PulsarSink 将会自动将两者合并,仅使用外层的 Topic。

举个例子,如果通过 PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0") 来指定写入的 Topic,那么其结果等价于 PulsarSink.builder().setTopics("some-topic1")

序列化器

序列化器(PulsarSerializationSchema)负责将 Flink 中的每条记录序列化成 byte 数组,并通过网络发送至指定的写入 Topic。和 Pulsar Source 类似的是,序列化器同时支持使用基于 Flink 的 SerializationSchema 接口实现序列化器和使用 Pulsar 原生的 Schema 类型实现的序列化器。不过序列化器并不支持 Pulsar 的 Schema.AUTO_PRODUCE_BYTES()。

如果不需要指定 Message 接口中提供的 key 或者其他的消息属性,可以从上述 2 种预定义的 PulsarSerializationSchema 实现中选择适合需求的一种使用。

  • 使用 Pulsar 的 Schema 来序列化 Flink 中的数据。
// 原始数据类型
PulsarSerializationSchema.pulsarSchema(Schema)

// 有结构数据类型(JSON、Protobuf、Avro 等)
PulsarSerializationSchema.pulsarSchema(Schema, Class)

// 键值对类型
PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
  • 使用 Flink 的 SerializationSchema 来序列化数据。

Java

PulsarSerializationSchema.flinkSchema(SerializationSchema)

Python

PulsarSerializationSchema.flink_schema(SimpleStringSchema())

同时使用 PulsarSerializationSchema.pulsarSchema() 以及在 builder 中指定 PulsarSinkBuilder.enableSchemaEvolution() 可以启用 Schema evolution 特性。该特性会使用 Pulsar Broker 端提供的 Schema 版本兼容性检测以及 Schema 版本演进。下列示例展示了如何启用 Schema Evolution。

Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
PulsarSerializationSchema<SomePojo> pulsarSchema = PulsarSerializationSchema.pulsarSchema(schema, SomePojo.class);

PulsarSink<String> sink = PulsarSink.builder()
    ...
    .setSerializationSchema(pulsarSchema)
    .enableSchemaEvolution()
    .build();

如果想要使用 Pulsar 原生的 Schema 序列化消息而不需要 Schema Evolution 特性,那么写入的 Topic 会使用 Schema.BYTES 作为消息的 Schema,对应 Topic 的消费者需要自己负责反序列化的工作。

例如,如果使用 PulsarSerializationSchema.pulsarSchema(Schema.STRING) 而不使用 PulsarSinkBuilder.enableSchemaEvolution()。那么在写入 Topic 中所记录的消息 Schema 将会是 Schema.BYTES。

消息路由策略

在 Pulsar Sink 中,消息路由发生在于分区之间,而非上层 Topic。对于给定 Topic 的情况,路由算法会首先会查询出 Topic 之上所有的分区信息,并在这些分区上实现消息的路由。Pulsar Sink 默认提供 2 种路由策略的实现。

  • KeyHashTopicRouter:使用消息的 key 对应的哈希值来取模计算出消息对应的 Topic 分区。使用此路由可以将具有相同 key 的消息发送至同一个 Topic 分区。消息的 key 可以在自定义 PulsarSerializationSchema 时,在 serialize() 方法内使用 PulsarMessageBuilder.key(String key) 来予以指定。如果消息没有包含 key,此路由策略将从 Topic 分区中随机选择一个发送。可以使用 MessageKeyHash.JAVA_HASH 或者 MessageKeyHash.MURMUR3_32_HASH 两种不同的哈希算法来计算消息 key 的哈希值。使用 PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH 配置项来指定想要的哈希算法。
  • RoundRobinRouter:轮换使用用户给定的 Topic 分区。消息将会轮替地选取 Topic 分区,当往某个 Topic 分区里写入指定数量的消息后,将会轮换至下一个 Topic 分区。使用 PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES 指定向一个 Topic 分区中写入的消息数量。

还可以通过实现 TopicRouter 接口来自定义消息路由策略,请注意 TopicRouter 的实现需要能被序列化。

TopicRouter 内可以指定任意的 Topic 分区(即使这个 Topic 分区不在 setTopics() 指定的列表中)。因此,当使用自定义的 TopicRouter 时,PulsarSinkBuilder.setTopics 选项是可选的。

@PublicEvolving
public interface TopicRouter<IN> extends Serializable {

    String route(IN in, List<String> partitions, PulsarSinkContext context);

    default void open(SinkConfiguration sinkConfiguration) {
        // 默认无操作
    }
}

如前文所述,Pulsar 分区的内部被实现为一个无分区的 Topic,一般情况下 Pulsar 客户端会隐藏这个实现,并且提供内置的消息路由策略。Pulsar Sink 并没有使用 Pulsar 客户端提供的路由策略和封装,而是使用了 Pulsar 客户端更底层的 API 自行实现了消息路由逻辑。这样做的主要目的是能够在属于不同 Topic 的分区之间定义更灵活的消息路由策略。

详情请参考 Pulsar 的 partitioned topics

发送一致性

PulsarSink 支持三种发送一致性。

  • NONE:Flink 应用运行时可能出现数据丢失的情况。在这种模式下,Pulsar Sink 发送消息后并不会检查消息是否发送成功。此模式具有最高的吞吐量,可用于一致性没有要求的场景。
  • AT_LEAST_ONCE:每条消息至少有一条对应消息发送至 Pulsar,发送至 Pulsar 的消息可能会因为 Flink 应用重启而出现重复。
  • EXACTLY_ONCE:每条消息有且仅有一条对应消息发送至 Pulsar。发送至 Pulsar 的消息不会有重复也不会丢失。Pulsar Sink 内部依赖 Pulsar 事务和两阶段提交协议来保证每条记录都能正确发往 Pulsar。

消息延时发送

消息延时发送特性可以让指定发送的每一条消息需要延时一段时间后才能被下游的消费者所消费。当延时消息发送特性启用时,Pulsar Sink 会立刻将消息发送至 Pulsar Broker。但该消息在指定的延迟时间到达前将会保持对下游消费者不可见。

消息延时发送仅在 Shared 订阅模式下有效,在 ExclusiveFailover 模式下该特性无效。可以使用 MessageDelayer.fixed(Duration) 创建一个 MessageDelayer 来为所有消息指定恒定的接收时延,或者实现 MessageDelayer 接口来为不同的消息指定不同的接收时延。消息对下游消费者的可见时间应当基于 PulsarSinkContext.processTime() 计算得到。

Sink 配置项

可以在 builder 类里通过 setConfig(ConfigOption<T>, T)setConfig(Configuration) 方法给定下述的全部配置。

PulsarClient 和 PulsarAdmin 配置项

Pulsar Sink 和 Pulsar Source 公用的配置选项可参考

Pulsar 生产者 API 配置项

Pulsar Sink 使用生产者 API 来发送消息。Pulsar 的 ProducerConfigurationData 中大部分的配置项被映射为 PulsarSinkOptions 里的选项。详情参见文档 Pulsar 生产者 API 配置项

Pulsar Sink 配置项

下述配置主要用于性能调优或者是控制消息确认的行为。如非必要,可以不用考虑配置。详情参见文档 Pulsar Sink 配置项Sink 监控指标.

升级至最新的连接器

常见的升级步骤,请参阅升级应用程序和 Flink 版本。Pulsar 连接器没有在 Flink 端存储消费的状态,所有的消费信息都推送到了 Pulsar。所以需要注意下面的事项:io

  • 不要同时升级 Pulsar 连接器和 Pulsar 服务端的版本。
  • 使用最新版本的 Pulsar 客户端来消费消息。

问题诊断

使用 Flink 和 Pulsar 交互时如果遇到问题,由于 Flink 内部实现只是基于 Pulsar 的 Java 客户端管理 API 而开发的。用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 连接器的配置来尝试解决问题。更多相关内容请参考 Flink 文档 Pulsar Sink

© 北京原流科技有限公司Apache、Apache Pulsar、Apache BookKeeper、Apache Flink 及相关开源项目名称均为 Apache 软件基金会商标。条款隐私