近期,Apache Pulsar 社区发布了 Pulsar 2.8.1 版本!新版本涵盖 49 位贡献者提供的改进和错误修复,并提交了 213 次变更。
本博客介绍了 2.8.1 版本最值得关注的进展,如需了解所有性能升级和 bug 修复的完整列表,请查阅 Pulsar 2.8.1 发布注记。
问题:在此 PR 之前,在 topic 上设置精确的发布速率限制无效。
解决方案:通过 LeakingBucket
和 FixedWindow
算法实现了新的 RateLimiter
。
问题:当在 Key-shared 订阅上发生消息重新传递时,具有相同 key 的消息是乱序的。
解决方案:当向 messagesToRedeliver
发送消息时,broker 保存了 key 的哈希值。如果调度程序尝试将更新的消息发送给有与任何一个保存的哈希值对应的 key 的消费者,它们将被添加到 messagesToRedeliver
而不是被发送,防止具有相同 key 的消息出现乱序。
问题:在此 PR 之前,即使旧的生产者仍在写入 topic,同名的生产者会触发错误并删除旧生产者。
解决方案:基于连接 ID(本地和远程地址以及独有的 ID)和该连接中的生产者 ID 而非生产者名称验证生产者。
问题:在此 PR 之前,当生产者继续重新连接到 broker 时,topic 的隔离状态始终设置为 true,这导致 topic 无法恢复。
解决方案:当轮询操作不等于当前操作时,向 ManagedLedgerException
添加一条 entry。
问题:在此 PR 之前,当订阅位置最早的 topic 时会丢失数据,因为 ManagedLedger
使用了错误的位置来初始化游标。
解决方案:订阅最早位置的 topic 时,添加检查游标位置的测试。
hasMessageAvailableAsync
和 readNextAsync
时不再发生死锁。PR-11183问题:在此 PR 之前,将消息添加到传入队列时,在两种情况下可能会发生死锁:首先,在读取消息之前将消息添加到队列中。其次,readNextAsync
在调用 future.whenComplete
之前完成。
解决方案:使用内部线程处理 hasMessageAvailableAsync
的回调。
getLastMessageId
API 时不会发生内存泄漏。PR-10977问题:在此 PR 之前,调用 getLastMessageId
API 时 broker 会耗尽内存。
解决方案:将 entry.release()
调用添加到 PersistentTopic.getLastMessageId
。
问题:在此 PR 之前,当一个 topic 只有非持久订阅时,不会触发压缩,因为其估测 backlog 大小为 0。
解决方案:使用总 backlog 大小来触发压缩。在没有持久订阅的情况下更改行为以使用总 backlog 大小。
问题:使用 Key-Shared 订阅反复打开和关闭消费者偶尔会导致停止向所有消费者发送消息
解决方案:在调用 removeConsumer()
之前移动了标记删除位置,并从选择器中移除消费者。
问题:在此 PR 之前,未检查请求 ledger 是否属于消费者连接的 topic,使得消费者读取不属于连接 topic 的数据。
解决方案:在执行读取操作之前,添加对 ManagedLedger
级别的检查。
问题:在此 PR 之前,因未在 managedLedgerconfiguration
中设置,保留策略不起作用。
解决方案:将 managedLedger
配置中的保留策略设置为 onUpdate
监听 listner 方法。
问题:在此 PR 之前,如果未持久订阅 topic,可能会丢失数据。
解决方案:利用 topic 压缩游标来保留数据。
问题:在此 PR 之前,Pulsar proxy 传出 TCP 连接存在内存泄漏的问题。因为 ProxyConnectionPool
实例是在 PulsarClientImpl
实例之外创建的,并且在客户端关闭时并未关闭。
解决方案:正确关闭 ConnectionPool。
问题:在此 PR 之前,使用 Protobuf schema 时抛出的异常 GeneratedMessageV3
不可分配。
解决方案:向 Pulsar 实例添加相关依赖。
问题:在此 PR 之前,分区 topic 消费者在创建消费者失败时不清理资源。如果此故障与不可恢复的错误一起发生,则会引发内存泄漏,从而使应用程序不稳定。
解决方案:关闭和清理计时器任务引用。
问题:在此 PR 之前,当有独立消费者处于“暂停”状态且共享队列已满时,2 个线程之间存在竞争条件。
解决方案:在将消费者标记为“暂停”后验证共享队列的状态。如果另一个线程在此期间清空队列,则消费者不会被阻塞。
batchReceive
上被阻止。 PR-11691问题:在此 PR 之前,由于 ConsumerBase.java
中的竞争条件,当不同线程同时调用 Consumer.batchReceive()
时,消费者被阻塞。
解决方案:将 pinnedInternalExecutor
置于 ConsumerBase
中,以允许批处理计时器、ConsumerImpl
和 MultiTopicsConsumerImpl
在单个线程中提交工作。
问题:在此 PR 之前,在 Python 客户端中启用自定义日志记录时可能会发生死锁。
解决方案:分离工作线程并降低日志级别。
欢迎大家下载并使用新版本!
更多内容,可访问 Pulsar 官网,关注 Twitter @apache_pulsar,加入到 Pulsar Slack!