sharetwitterlinkedIn

Apache Pulsar 发布 2.4.0 版本

July 9, 2019
head img

Apache Pulsar 正式发布了 2.4.0 版本!

在 2.3.2 版本发布后的 1 个月,Pulsar 正式发布了 2.4.0 版本。

2.4.0 版本新增了诸多功能,并修复了大量漏洞,覆盖存储端、Broker 端、Schema、安全、客户端、Pulsar Functions、Pulsar IO、Pulsar Kafka、Pulsar Flink 和 Pulsar Storm 等多方面,一如既往地丰富和完善了 Pulsar 作为一个云原生流数据平台的能力。

2.4.0 版本总共接受了来自社区约 500 个 PR,越来越多的代码贡献来自于中国开发者,中国力量越发迅猛。

以下是 2.4.0 版本的详细信息。

存储端

优化改进

  • 当前有速率限制控制 Managed Ledger 的缓存换出,但缓存换出的频率被限制为 1 次每秒(最大),理应设置的更大。因此,2.4.0 版本允许用户配置 Manager Ledger 的缓存换出频率,并将默认频率提高到 100 次每秒。
  • 支持缓存换出频率后,通过对同一缓冲区增加引用计数的方式,避免增加对副本的拷贝。
  • 实现 Broker 缓存未确认的 MessageId 到 OpenRangeSet 中,并提供 managedLedgerUnackedRangesOpenCacheSetEnabled 选项打开或关闭该功能。
  • Ledger Handles Cache 中 ConcurrentLongHashMap 默认使用 16 并行度,这在 Ledger Handles Cache 的 context 中是不必要的,因此降低该值,减少每个 Topic 使用的内存和对象。

修复漏洞

  • 修复在 Managed Ledger 上读取 Entry 的超时任务出现的竞争条件。
  • 默认关闭粘性读(Sticky Read),粘性读可能会导致出现大量 ArrayIndexOutOfBoundException 异常。
  • 如果在配置时间内未完成 AddEntry,则会出现超时。超时任务可以通过配置超时任务或 BookKeeper 的客户端线程来完成。2.4.0 版本确保 AddEntry 的超时任务只能由一个线程来完成,避免出现竞争条件。

Broker

新增功能

PIP-26:支持延迟投递消息和定时调度消息

延迟投递消息和定时调度消息是消息系统的常见功能,生产者可以在消息中定义延迟时间或指定调度时间。当延迟时间或调度时间到达,消息将被投递。

示例

  • 延迟投递:消息将在 3 分钟后投递。
producer.newMessage()
    .deliverAfter(3L, TimeUnit.Minute)
    .value("Hello Pulsar!").send();
  • 定时调度:消息将在 2019/7/4 14:54:56 投递,需要转换成相应的 unix 时间戳。
producer.newMessage()
     .deliverAt(1562223296)
     .value("Hello Pulsar!").send()

在 Producer 端,如果开启延迟投递消息,则不能使用批量发送消息;在 Consumer 端,对于延迟投递消息,目前暂时仅支持 Shared 订阅模式。

更多信息,参阅 PIP-26

PIP-28:Pulsar Proxy 网关提升

Pulsar Proxy 几乎是 Pulsar 所有请求的网关,在这里可以记录一些关键信息,例如,每个请求的数据来源、目的地、Session ID、响应时间甚至是请求的消息体等。

2.4.0 版本对 Proxy 网关的优化包括:

  • 记录信息。例如,客户端 IP 和端口、代理 IP 和端口、broker IP 和端口,sesson_id、响应时间、Topic 名称和消息体。
  • 设置开关。记录信息对性能有损耗,用户可以开启或关闭该功能。

更多信息,参阅 PIP-28

PIP-33: 支持复制订阅

目前,Topic 在不同集群上只是一个虚拟的全局实例,对于订阅而言是在本地集群上,不同集群之间并没有联系,这就导致当某个消费者重连到一个新的地域时,消费者会创建一个名字相同但与上一个连接毫无关联的订阅。这个订阅会通过配置中消息的位置来创建。与此同时,原来的订阅将被留在上一个地域,这导致了不同地域之间消息不连贯的问题。

PIP-33 通过创建快照解决了这个问题。有了该功能后,应用程序将消费者从一个有故障的数据中心转移到另一个正常的数据中心时会非常方便。在故障转移期间,消费者可以从之前消费停止的地方重启消费。

示例

启用复制订阅功能。

Consumer<String> consumer = client.newConsumer(Schema.STRING)            
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .replicateSubscriptionState(true)
            .subscribe();

更多信息,参阅 PIP-33

PIP-34:新增 Key_Shared 订阅模式

此前,Pulsar 有三种订阅模式,分别是 Exclusive、Failover 和 Shared。消费者可以通过 Shared 订阅模式并行消费同一分区的消息,该模式被大量使用。

但在 Shared 模式下,相同 Key 下的消息有可能会丢失顺序。用户希望既能使用 Shared 模式并发地消费消息,又能保证同一 Key 下的消息顺序。

在该需求的推动下,2.4.0 版本新增了 Key_Shared 订阅模式,它扩展了 Shared 订阅模式,使一个分区可以有几个消费者并行地消费消息,但是具有相同 Key 的消息将只被路由给一个消费者。

更多信息,参阅 PIP-34

PIP-36:支持在 Broker 端设置最大消息大小

MaxMessageSize 限制了每条消息的大小,该参数被硬编码进 Pulsar,不能通过服务器端的配置修改,但某些场景可能会出现消息超出阈值,例如,在数据库中,当一条记录超过了该配置的大小,将导致消息不能被正确地发送。

2.4.0 版本支持修改 MaxMessageSize参数,用户能自定义最大消息大小。

更多信息,参阅 PIP-36

修复漏洞

  • 修复了使用 peer-cluster 功能并改变从 peer1 集群复制至 peer2 集群时出现数据丢失的问题。
  • 修复了在 Pulsar standalone 下不能从配置文件读取 ZooKeeper 端口号的问题。
  • 修复了卸载不存在的 Topic 时导致的空指针异常的问题。
  • 修复了删除全局 Topic 出现竞争条件的问题。

客户端从复制集群列表移除一个本地集群时,Broker 会删除该集群下的 Topic。然而, Broker 收到多个策略更新的 zk-watch 时,Broker 不能安全地隔离 Topic,从而允许多个线程删除相同的 Topic。在这种情况下,第一个线程成功删除 Topic,第二个线程会失败并且重试,导致错误。通过安全地隔离 Topic 并处理 Topic 已经被删除的异常,2.4.0 版本修复了该问题。

  • 修复了在调用 skip-message 时可能会出现的死锁问题。
  • 修复了在初始化 ZkIsolatedBookieEnsemblePlacementPolicy 时出现的类型强制转化异常以及在调用该类时出现的反序列化失败的问题。

优化改进

  • 此前,由于消费者未正确地确认消息,造成大量积压和随机未确认的消息重新投递速度慢。当 Broker 尝试重新投递被积压的消息时,会导致在 Bookie 上的跨多个 Ledger 的随机读取,使消息发送速度变慢,影响对消费者的发送速度,最终成为整个服务的瓶颈。2.4.0 版本尝试优化消息的重新投递,减少 Broker 随机读取的次数,尽量减少跨多个 Ledger 读取,尽可能地使用 Bookie 的读缓存。
  • 如果磁盘容量较小,但又需要在系统中保留更多数据,很容易写满磁盘,此时需要手动处理。2.4.0 版本增加了 ttlDurationDefaultInSeconds 参数,当命名空间策略未配置 ttl 时,Broker 将应用该参数使消息过期,从而防止磁盘被写满。设置 ttlDurationDefaultInSeconds=0 将关闭该功能。
  • 使用封装在 BookKeeper 4.9 中引入的 ByteBufAllocatorBuilder 处理可能出现的 OOM 的问题。
  • 支持自动刷新 Jetty Webserver 的 TLS 证书。
  • 允许通过 Rest API 创建一个非分区的 Topic。
  • 支持在租户级别对 Bookie 进行隔离,有利于提高性能和分配专用资源。
  • 对 Broker 而言,当主要的 Bookie 隔离组没有足够的 Bookie 时,可以使用备份的 Bookie,因此增加了一个用于备份的 Bookie 隔离策略。
  • 为命名空间增加备份 Bookie 隔离组,如果主要的隔离组没有足够的 Bookie 时,可以使用备份 Bookie 隔离组。
  • 为集群之间的复制增加一个速率限制,从而更好地控制集群之间的复制速率。 跨地域复制时,当集群 A 下线一段时间后再上线,会导致其他集群的同步消息将集群的网络带宽耗尽,2.4.0 版本通过增加速率限制解决了这一问题。
  • 默认关闭检查 backlog 配额。
  • 支持多主机访问。

Pulsar 可以通过 Pulsar Proxy 进行多主机访问,同时也能通过提供多地址访问多个 broker,并实现了自动重试。2.4.0 版本支持对 Pulsar Admin 的多地址重试,用户需要提供多地址的访问路径。

SCHEMA

优化改进

  • 为 Schema 增加多版本支持。
  • 由于当前 AutoConsume Schema 仅支持使用最新的 Schema 版本来反序列化消息,未使用其他的 Schema 版本,从而使 AutoConsume Schema 未完全发挥它的作用,尤其是在 CDC 场景下,因此加入一个在 AutoConsume Schema 下的多版本 Schema 的实现机制的介绍,后续会基于该机制实现在 AutoConsume Schema 的多版本支持。
  • 为 KeyValue Schema 增加版本支持。
  • AutoConsume Schema 支持使用与消息相关联的 Schema 来反序列化消息。
  • 存储消息的 Key 作为 KeyValue Schema 的一部分,用户能通过选项,决定是否存储。
  • 将 KeyValue Schema 的序列化与反序列化拆分成多部分,方便后续支持 AutoConsume,使 KeyValue Schema 的显示更加友好。
  • 支持对所有已存在的 Schema 版本做兼容性检查,增加向前、向后以及全兼容的兼容性策略。
  • 当前 Schema 仅在兼容性检查时被验证,通过 pulsar-admin schemas 上传的 Schema 不被验证,因此,增加通过 Rest API 上传 Schema 时对 Schema 的验证。
  • 支持在删除 Topic 的同时删除绑定在该 Topic 的 Schema。
  • 提供一种机制来动态地构建一个 Schema,当前支持对 Avro Schema 的动态构建。

安全

新增功能

PIP-30:改变认证 API ,支持双向认证。

Pulsar 支持可插拔的认证机制和若干认证方式,但这些认证是单向身份认证。在当前 API 下,它不支持客户端和服务器之间的双向身份验证(例如,SASL)。因此,该功能改变接口,支持双向认证。

更多信息,参阅 PIP-30

  • 目前,在 ZooKeeper 和 BookKeeper 中都已支持通过 Kerberos 认证,因此,2.4.0 版本对 Pulsar 的 Broker 和 Client 之间也支持通过 Kerberos 认证。
  • 支持 Pulsar Proxy 的 Kerberos 认证。
  • 支持 Pulsar Web Resource 的 Kerberos 认证,使 Pulsar Admin Java Client 和 Pulsar Admin CLI 支持 Kerberos。
  • 当前只能使用与 RSA 兼容的 JWT 签名算法,这限制了 ECDSA 系列 JWT 密钥的使用。2.4.0 版本增加了对其他签名算法的支持。

修复漏洞

在 Pulsar Proxy 上支持 Kerberos 认证。

监控

优化改进

在Prometheus的统计中,增加 backlog size、backlog quota 和 offloaded size 三个指标。

客户端

JAVA

新增功能

PIP-29: 由于用户同时使用 pulsar-client 和 pulsar-client-admin 的需求较大,2.4.0 版本将它们打成一个包,方便用户使用。

更多信息,参阅 PIP-29

  • 在命令行中为 Websocket 增加 Produce/Consume 的支持,用户可在命令行中测试 Websocket 。
  • 根据消费者的优先级决定故障转移。

当前 Broker 根据消费者的名称排序进行故障转移,但也有用户希望通过确定的方式(例如,优先级)进行故障转移,该功能实现了这种方式,Broker 在选择活跃的消费者时会考虑消费者的优先级。

  • 在 Java 客户端支持 Negative Acknowledge。

许多情况下,应用程序很难处理特定消息的失败,目前有一些解决方案:

(1)ackTimeout

如果消息被正常投递至应用程序,但在 timeout 的时间内并未收到 ack,此时将触发重新传递消息。ackTimeout 的主要问题在于它和应用程序是强绑定的,在大多数情况下,1 分钟的 ack timeout 是一个非常好的值,但如果处理消息需要 3 分钟,则会多次触发每条消息的重新发送。

(2)阻塞消费并且处理重试

可能只有一条消息失败(可能是临时的),而其余的消息都正常。在这种情况下,需要阻塞 Consumer 来处理所有消息,直到这个出错的消息通过。可能只是其中一个 Consumer 出现故障,其它 Consumer 处于正常状态。

基于上述问题,理想情况是当消息出错需要处理时,用户可以将该条消息 nack,并触发重传机制。所以,在启用死信队列之后,用户可以通过 ack-timeout 或 negative acknowledge 两种方式将消息路由到死信队列中。negative acknowledge 和 ack-timeout 的重传机制相同,二者可以结合使用。需要注意的是,negative acknowledge 的操作需要发生在 ack-timeout 之前。

  • 支持用户配置回退策略。
  • 与 ProducerBuilder 和 ConsumerBuilder 类似,允许用户在使用 TypedMessageBuilder 时传入一个 Map 结构参数,这个在 Pulsar Function 中使用较多。
  • 为 negative acknowledge 增加拦截器。
  • 将 seek 操作添加至 Reader。
  • 为由于 ack 超时的消息添加合适的拦截器,用户可以自定义在消息确认超时情况下的处理逻辑。

引入批消息处理框架,支持通过多种方式实现批处理。

该版本引入了一个批消息处理框架,根据该框架可以快速实现其他类型的批消息处理,当前也可以定制自己的批处理方案。添加了一个叫做 BatchMessageKeyBasedContainer 的批处理实现来支持在 Key_Shared 模式下的批消息处理。

  • 为 Java Client 支持 Snappy 压缩类型。

修复漏洞

修复重连时由于关闭 batch 而导致的空指针异常问题。

优化改进

通过使用信号量和 ConcurrentLinkedQueue 减少 ClientCnx 在 lookup 请求时的内存使用量。

Python

优化改进

  • 支持 negative acknowledge。
  • 支持对消息的 Snappy 压缩模式。

C++

优化改进

  • 支持 negative acknowledge。
  • 支持 Key_Shared 订阅模式。

修复漏洞

修复在订阅正则表达式匹配的 Topic ack 超时后没有重新投递消息的问题,这是因为 unack message tracker 没有被调用。

GO

优化改进

  • 支持 negative acknowledge。
  • 在 Go client 中增加对 Schema 的支持,即能在 Go 中使用 Json、Avro、Protobuf 和一些原生 Schema。
  • 增加 Snappy 压缩模式。
  • 支持 Key_Shared 订阅模式。

Functions

新增功能

  • PIP-32:增加对 Go Function 的支持。

此前,Pulsar Functions 支持使用 Java 和 Python 开 发,但随着 Go 语言自身优势与 Go 社区的不断壮大,用户对 Go Function 的需求也在不断增加。因此,2.4.0 版本新增对 Go Function 的支持。

Pulsar Functions 的架构设计中采用了 protobuf 协议解耦了客户端与服务端,这为 Pulsar Functions 实现多语言支持提供了可能。Go Function 在设计上结合 Go 语言本身的特性,采用了 SDK 的形式。为了减少用户的学习成本,在部署 Go Function 时,使用方式与 Java Function 和 Python Function 相同;在编写 Go Function 时,为了保证 Pulsar Functions 设计的理念,用户同样只需关注计算逻辑本身,而无需重新学习新的 SDK 接口。

更多信息,参阅 PIP-32

示例

开发一个 Go Function。

package main
import (
    "fmt"
    pulsarfunc "github.com/apache/pulsar/pulsar-function-go"
)
func hello() {
    fmt.Println("hello pulsar function")
}
func main(){
    pulsarfunc.Start(hello)
}
  • 在 Function 级别对认证功能进行扩充。

当前 Function 的认证使用静态凭证,并且每个 Function 都有相同的凭证。 2.4.0 版本支持在不同用户提交的 Function 中有不同的凭证,这暂时是一个比较初始的实现。

工作流程大致如下:

(1)用户提交具备 Auth Data 的 Function Worker。
(2)Worker 基于 Auth Data 缓存这些信息,将 Function Meta Topic 分发给每个 Worker。
(3)Worker 用来运行一个 Function 的实例,可以从 Function Meta Topic 中获取认证信息来设置 Function 实例以支持认证。
  • 在对 Function 的认证功能实现后,实现了对 Function 的授权。
  • 允许用户更新认证信息。
  • 当前对 Function 的状态操作只有一个同步操作,2.4.0 版本添加对函数状态的异步操作方法。
  • 在 Function 启动时,用户能对 Function 打标签,这些标签有利于与 Kubernetes 结合使用。
  • 支持在使用 Function publish 时为 message 设置 key。
  • 在 Pulsar Functions 中支持 negative acknowledge。

Pulsar Functions 中设置了 ack 超时,这可能导致无法正确处理耗时较长的消息。2.4.0 版本支持 negative acknowledge,出现异常时,发送 negative acknowledge 能重新投递和处理消息。

修复漏洞

  • 修复 stats manager 未被初始化导致的空指针异常问题。
  • 修复在 effectively-once 模式下更新 Function 失败的问题。
  • 修复认证功能向后兼容不工作的问题。

Connector

新增功能

  • Flume Source 和 Sink Connector,同步 Flume 的数据到 Pulsar,同步 Pulsar 的数据到 Flume。
  • Redis Sink Connector 支持将 Pulsar 中的数据同步到 Redis 中。
  • Solr Sink Connector 支持将 Pulsar 中的数据同步到 Solr 中。
  • RabbitMQ Sink Connector 支持将 Pulsar 中的数据同步到 RabbitMQ 中。
  • InfluxDB Sink Connector 支持将 Pulsar 中的数据同步到 InfluxDB 中。

优化改进

  • 增加 RabbitMQ Source Connector 配置,支持设置 basicQos 和 ack。

CDC

优化改进

  • 隐藏 kafka-connector 的实现细节,方便用户使用 Debezium。
  • 在 Pulsar Debezium 中增加对 Postgresql 的支持。

Pulsar SQL

优化改进

增加对查询层级存储的数据的支持。

生态系统

Pulsar Kafka Client

优化改进

  • 支持 Kafka 的 ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG 配置。
  • 支持 Kafka 的 ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG 配置。
  • 支持 Kafka 的 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG 配置。
  • 支持 Kafka 的 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 配置,能配置消息数量。
  • 支持 Kafka 的 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG 配置。

新增功能

  • 允许用户设置自定义的 Pulsar Producer。
  • 支持 puslar-flink 连接器使用基于 token 的认证。
  • 对于 Flink Connector,增加支持接收 ClientConfigurationData、ProducerConfigurationData 和 ConsumerConfigurationData 的配置。

Pulsar Storm

新增功能

  • 新增两个指标,用于调试用户的拓扑,messagesFailed 统计失败的消息数量,messageNotAvailableCount 统计 spout 线程没能成功接收到的消息的次数。
  • 在 spout 以较高的速率处理消息时,允许配置 pulsar-spout-consumer 的大小。
  • 在 pulsar-spout 中增加一个选项,允许用户自动取消订阅。
  • 在 pulsar-spout 中增加对 reader 的支持。
  • 在 pulsar-bolt 添加对 client 配置和 producer 配置的初始化,传递额外的元数据,例如,加密方式、批处理等。
  • 在 puslar-bolt 中添加 consumer 配置的初始化,获取加密配置、队列大小以及优先级等信息。

修复漏洞

在 Pulsar Storm 连接器中,修复了使用 PulsarSpout 发送下一个元组时抛出空指针异常的问题。

  • Pulsar 2.4.0 版本发行注记,请点击查看详情.
  • 如果你对 Pulsar 示例、Demo、工具和扩展等感兴趣,欢迎了解 StreamNative GitHub.
© StreamNative, Inc. 2022Apache, Apache Pulsar, Apache BookKeeper, Apache Flink, and associated open source project names are trademarks of the Apache Software Foundation.TermsPrivacy