本篇博文将深入分析 Apache Pulsar 生产者和消费者的内部细节,列举一些应用程序开发人员在使用 Pulsar 时遇到的常见问题,最后会介绍一些开发流/消息应用程序的最佳实践。
Apache Pulsar 是一个云原生分布式消息和事件流处理平台,支持发布/订阅及事件流应用场景。下文介绍了一些关键的 Pulsar 术语。
图注:
- 生产者(Producer):向主题发布消息的进程。
- 主题(Topic):在生产者和消费者传递之间传递消息的通道。
- 消息(Message):存在主题中的任意有效负载信息。
- 消费者(Consumer):订阅主题并从中接收已发布消息的进程。
- 订阅(Subscription):决定消息如何发送给消费者的规则。
- 游标(Cursor):作为偏移量记录订阅中最后一条确认消费的消息。
在 Pulsar 中,Broker 处理所有客户端的交互,负责存储和传递多个主题(Topic)中的消息(Message)。上图演示了生产者(Producer)和消费者(Consumer)在单个主题上的交互情况。
生产者连接 Broker 并向主题发送消息,这些消息按照 Broker 接收的顺序存储。消费者通过不同的订阅(Subscription)模式确定从主题中传递哪些消息,每个订阅通过主题消息跟踪消费的进度,并借助游标(Cursor)确保每个消息只传递一次。游标用于记录当前订阅的消费位置,通过游标可知主题中哪个是第一个尚未消费的信息。
图注:
- 生产者队列(Producer Queue):生产者有内部队列,它用来缓存即将发送给 Broker 并确认的消息,一旦队列满了,所有的消息会被一起分批发送给 Broker。
- 接收队列(Receiver Queue):消费者也有内部队列,它用来缓存从 Broker 接收到需要被应用代码处理的消息。
- Available Permits:当消息从接收队列中被消费时,消费者会请求足够的新消息来重新填充队列。这个值就称为 Available Permits。
名词 | 描述 |
---|---|
分区主题 | 由多个 Pulsar Broker 处理的同一个主题,这能拥有更高的吞吐量 |
命名空间 | 相关主题的分组机制 |
命名空间 Bundle | 属于相同命名空间的一组虚拟主题,命名空间 Bundle 定义为两个 32 位哈希之间的范围,比如 0x00000000 和 0xffffffff |
确认(Ack) | 消费者发送给 Pulsar Broker 的消息响应,表示消息已经成功处理。由此,Pulsar 可知消息现已可从系统中删除 |
否定应答(Nack) | 当应用程序无法处理某些消息时,它可以向 Pulsar 发送“否认 Ack”表示稍后应重发该消息 |
未确认(Unack) | 已经传递给消费者进行处理,但是还没有被消费者确认已处理的消息 |
Backlog | 订阅中未被确认的消息数 |
消费端是最有可能踩坑的地方,我们先从这里开始讲。其中还会谈到创建应用程序时的一些最佳实践,供大家参考。
当消费者启动时,它会向 Broker 发送一个“Flow”命令请求消息,然后 Broker 发送消息,数量最大为 availablePermits 的值。availablePermits 的最大值等于 receiverQueue 的大小,默认为 1000 条消息。这就好比消费者对 Broker 说:“我队列中有 1000 个位置,所以你可以向我发送最多 1000 条消息。”
Broker 收到该消息并知晓该消费者有 1000 的 availablePermits,在跟踪每个活动实例的同时及时分发数据。例如,如果有 10 条消息要发送,它会发送这 10 条消息并继续发送 990 条,直至 availablePermits 变为 0 才会停止发送。
消费者从 receiverQueue 中接收这些消息,只要你还在调用 receive() 方法,就会将这些消息从队列中弹出。当这 1000 条数据大约处理到一半时,客户端会向 Broker 发送更多的消息许可(比如,它会发送 500 个或者更多)。
消息消费的目的是保持消息的流动性。在最理想的情况下,消费者队列中应有可用的消息,以便应用程序可持续读取并处理消息。
上述流程中一个重要的机制就是消息确认。为了让消费端能增加 availablePermits 的数量并且请求更多的消息,它需要向 Broker 确认消息,验证单个消息或一组消息已被成功消费。如果发生异常,它可以做出否定应答(手动或自动,需要提供 ackTimeout,例如 consumer.receive(500, TimeUnit.MILLISECONDS))。在这种情况下,它会重新接收到消息。Pulsar 支持两种类型的确认:单条(Individual)和累积(Cumulative)。
顾名思义,单条确认会在每条消息成功处理之后向 Broker 发送确认信息。另一方面,累积确认针对一批消息发送确认信息,即在特定偏移量之前的所有消息都将被确认。
注意:共享订阅模式不支持累积确认。
Backlog 指消息积压量(等同 Kafka 中的消费者滞后),代表订阅中未确认消息的数量,描述了消费者落后生产者还有多少消息未消费。
例如,生产者刚向 Broker 发送了 1000 条消息,而消费者已成功确认了 800 条消息,这意味着 Backlog(或消费者滞后)为 200,即订阅中尚有 200 条未确认的消息。在 Pulsar 2.10 版本中,可通过内置功能检索此指标,即使用 Pulsar admin CLI 运行 pulsar-admin topics stats –etb true
命令。该指标显示 Backlog 中最早消息的发布时间到当前时间的时间戳。
根据应用程序的负载和设计,可能会出现 Broker 不发送消息或者消费者不处理消息的情况。本节将概述这类情况的常见问题并提供可能的解决方案。
如上文所述,一个成功的消费流程需要有 availablePermits 以对 Broker 请求消息、消费者的 receiverQueue、消息的成功处理,并确保给 Broker 返回确认信息。那么,哪里可能会出现问题?
如前所述,首先要检查的是确保应用程序在处理之后有确认收到消息。
除此之外,这种情况可能是因为你的消费程序无法足够快地处理消息。回到我们的消费流程中,默认情况下消费者要求在 receiverQueue 中保存 1000 条消息。根据程序的处理逻辑,这可能会导致客户端请求的消息多于可处理的消息,队列中缓冲的消息会最终超时并且 Backlog 也会增加。因此,建议根据实际情况降低这个值。
在这种情况下,availablePermits 往往等于 0 并且有未确认的消息,这表明 Pulsar 已经将消息发送给了消费者,但是消费者没有向 Broker 返回确认。
注意:如果未确认消息数超过阈值(应该在 50,000 条消息左右),消费者消费会受阻,这类似于将 blockedConsumerOnUnackedMsgs 设为 true。你可以使用 pulsar-admin stats <主题名称> 命令来检索这里描述的指标。
如果 availablePermits 大于 0,但是消息传递速度慢,那瓶颈可能出现在 Broker 或 Bookie 上。这说明应用程序处理消息的速度快于消息的发送速度。
如果消息传递速度为 0,通常是 Broker 出现了问题,这可能表示 Broker 此时的负载非常高,比如 Broker 可能由于处理太多的高负载主题而导致分发消息缓慢。
可以尝试拆分 Bundle 并卸载主题以实现更好的负载均衡。有关更多信息,请参阅卸载主题和 bundles 的文档。
Key_Shared 订阅模式的目的是保证每个 Key 的有序。如果启动一个新的消费者,但还有旧的消息没有确认,你必须等待旧的消费者处理完在此之前所有的数据,这样新的消费者才能获取新的 Key。创建新消费者后,如果它不处理任何消息,可以查一下是不是属于这类原因。
这类情况往往和 Key 的分布不均匀有关。设计应用程序时,需要考虑把 Key 尽可能地在消费者之间均匀分布(可能还要考虑每个 Key 的消息量)。否则,一些消费者可能会承担太多工作而其他消费者却处于闲置状态。
假设你只有两个 Key:key1 和 key2,并且只启动了一个消费者。之后你启动了第二个消费者,但是 Key 的分发方式并不能确保新的消费者能收到这两个 Key 中的任何一个,所以最后可能会有一个消费者处于闲置状态。
另一方面,假设 key1 分配给了消费者 consumer1,key2 分配给了消费者 consumer2。假设 key1 是一个 userId,该用户每天 24 小时都在使用系统,而 key2 是另外一个用户的 userId,该用户每周只上线一次。在这种情况下,consumer1 会处理大量消息,而 consumer2 看起来则处于闲置状态。实际上,consumer2 在一周的某一天之前,它没有任何消息需要处理。这就是为什么提供一个能使消费者处理消息更平均的 Key 是如此重要的原因。
到这里,我们已经讨论了很多关于消费端的内容,这里需要强调的一点是你应该始终确保关闭你的客户端资源,这点适用于所有的应用程序。生产者和消费者/读取者是长期存在的资源,通常创建一次后,想保存多久都行。但是,某些情况可能是按需创建生产者或消费者/读取者来执行功能并退出。在这两种情况下,你都需要确保在应用程序退出之前关闭所有资源,以免资源泄漏。
例如,如果你按需启动了一个消费者去只执行一个任务并退出,但是没有关闭它。默认情况下,消费者的 receiverQueue 大小为 1000,这意味着当消费者启动时就会预获取 1000 条消息,执行一些计算,然后退出。如果不关闭这些资源,消息将会保留在缓冲区中,并且总是积压 1000 条消息,因为该“泄漏”的消费者持有这些消息而没有消费它们。
重要提示:处理完每条消息之后,务必向 Broker 确认。否则,Backlog 量会根据可用的方法而不断增加。对于消费者,建议使用相同的消费者实例来确认消息。如果不这样做(无论出于何种原因),那你可以创建一个接收队列大小等于 1 的消费者,来模拟该消费者需要预获取的消息数量。
在生产端,有一些数据源生成消息并且想将其发送到 Pulsar。典型的数据源包括从文件中获取数据,连接一些 IoT 消息传递协议(例如 MQTT),接收来自变更数据捕获(Change Data Capture,即 CDC)系统的更新等。当每条消息到达,你创建一条新的 Pulsar 消息,并且使用 Pulsar 生产者将该消息发送到 Broker,生产者则需要提前准备好。在生产者的创建过程中,请注意以下几点。
首先,你需要确定消息的 Schema。默认情况下,所有内容都以字节形式发送,但是也支持所有的原始数据类型和更复杂的数据格式,如 Json、Avro 和 Protobufs 等。有关消息 Schema 的更多信息,请参阅 Understand schema。
其次,你需要正确配置生产者:
默认情况下,Pulsar 启用了批处理,你可以调整缓冲区可以容纳的最大消息数以及字节大小。当满足这两个阈值中的任何一个时,一批消息被视为“已满”,并准备好发送至 Pulsar。例如,如果你有大量消息并且想创建 1000 条消息的批处理,则可能需要增加 batchingMaxBytes 的限制(默认为 128 kb)。 另外,当你使用 sendAsync() 方法时,由于异步特性,生产者可能会在确认消息时不堪重负。在这种情况下,你需要启用另一个配置,即 blockIfQueueFull(true),采取背压(Backpressure)做法,向 Broker 发送“减速”的信号。以下是一个使用批处理、背压和调整批处理缓冲区大小的生产者示例。
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.producerName("test-producer")
.enableBatching(true)
.blockIfQueueFull(true)
.batchingMaxMessages(10000)
.batchingMaxBytes(10000000)
.create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.producerName("test-producer")
.enableBatching(false)
.enableChunking(true)
.sendTimeout(120, TimeUnit.SECONDS)
.create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.producerName("test-producer")
.blockIfQueueFull(true)
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.hashingScheme(HashingScheme.Murmur3_32Hash)
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
String key = msg.getKey();
return Integer.parseInt(key) % metadata.numPartitions();
}
})
.create();
可见,生产端更为直接,没有很多隐藏的坑,大多数是进行微调以满足应用程序的要求。但是,有一个很重要并需要强调之处是你在应用程序中创建的生产者数量。例如,你可能想从一个目录中提取上百或上千个文件,又或者你有一个 Web 应用并且想为每个登录用户启动一个生产者。生产者是长期存在的程序,因此应避免创建成百上千个生产者。相反,你可以创建具有固定数量的生产者的 ProducerCache,并在应用程序中进行复用。
何城波,就职于深信服 PaaS 平台部门,从事 Kafka、Pulsar 等消息中间件基础设施建设。