sharetwitterlinkedIn

在个推使用 Apache Pulsar 构建基于优先级的推送通知系统

head img

背景

个推是国内最大的第三方推送通知服务商之一。 它通过利用对用户配置文件的数据驱动分析,帮助移动应用程序开发人员跨 iOS、Android 和其他平台设置并向用户发送通知。

自2010年以来,个推已成功支持了超过数十万个应用程序和数十亿用户,包括滴滴、京东、微博、网易、人民日报、新华社、中央电视台等。

作为通知服务提供商,消息队列系统在个推中发挥着极其重要的作用。

Figure 1

图 1 展示了个推推送通知服务的概览。 当一个格推客户需要向其最终用户发送推送通知时,它首先向格推的推送通知服务发送消息。 推送通知根据其优先级在服务中排队。

但是,当消息队列中等待的推送通知数量增加时,资源争用也会增加。 它推动了对基于优先级的推送通知设计的需求,因为我们需要将更多资源分配给具有高优先级的客户。

Kafka 解决方案

概述

我们的第一个基于优先级的推送通知解决方案是使用 Apache Kafka 实现的。

Kafka是LinkedIn开发的高性能分布式流媒体平台,在个推内部也广泛使用,从日志聚合到线上线下消息分发等诸多用例。

Figure 2

在本方案中,我们将消息的优先级设置为三个级别:高、正常和低。 每个优先级的消息存储在一组主题中。 推送通知任务根据其优先级发送到不同的主题。 下游消费者根据他们的优先级接收消息。 具有相同优先级的推送通知任务以循环方式轮询。 它保证了优先级较高的推送通知可以尽早发送,而低优先级的推送通知最终也可以发送。

问题

随着业务的增长和使用我们服务的应用程序数量的增加,Kafka 解决方案遇到了以下问题:

  • 对于相同优先级的客户,他们同时推送的通知任务越来越多。 后面的任务(下图中的taskN)由于较早的任务(下图中的task1、task2、task3)等待处理而延迟。 如果 task1 有大量消息,则 taskN 将一直等到 task1 完成。

Figure 3

  • 当 Topic 数量从 64 增加到 256 时,Kafka 的吞吐量急剧下降。 由于在 Kafka 中,每个主题和分区都存储为一个或几个物理文件,因此当 Topic 数量增加时,随机 IO 访问会引入大量 IO 争用并消耗大量 I/O 资源。 因此,我们不能仅仅通过增加 Topic 的数量来解决第一个问题。

为了解决前面提到的问题,我们需要评估另一个支持大量主题的消息系统,同时保持与 Kafka 一样高的吞吐量。 在做了一些调查之后,Apache Pulsar 引起了我们的注意。

为什么 Pulsar 最适用

Apache Pulsar 是 Yahoo 开发的下一代分布式消息传递系统,它从头开始开发,以解决现有开源消息传递系统的几个缺点,并已在 Yahoo 的生产环境中运行了三年,为 Mail、Finance 等关键应用程序提供支持 、Sports、Flickr、Gemini Ads Platform 和 Sherpa(雅虎的分布式键值存储)。 此外,Pulsar 于 2016 年开源,并于 2018 年 9 月作为 Apache 顶级项目(TLP)从 Apache 孵化器毕业。

在与 Pulsar 社区密切合作并深入研究 Pulsar 之后,我们决定采用 Pulsar 作为新的基于优先级的推送通知解决方案,原因如下:

  • Pulsar 可以通过高性能扩展到数百万个主题,其基于分段的架构提供了更好的可扩展性。
  • Pulsar 提供了一个简单灵活的消息传递模型,统一了队列和流,因此它可以用于工作队列和发布-订阅消息传递用例。
  • Pulsar 采用以段为中心的设计,将消息服务与消息存储分开,允许各自独立扩展。 这种分层架构提供了更好的弹性,并避免了机器崩溃或集群扩展时复杂的数据重新平衡。
  • Pulsar 提供出色的 I/O 隔离,适用于消息传递和流式工作负载。

Pulsar 解决方案

经过广泛的讨论,我们确定了一个使用 Apache Pulsar 的新解决方案。

Pulsar 方案与 Kafka 方案很接近,但它利用 Pulsar 的优势解决了我们在 Kafka 中遇到的问题。

  • 在 Pulsar 解决方案中,我们根据任务动态创建主题。 它保证以后的任务不会因为其他任务在队列中等待处理而等待。
  • 我们为每个具有普通和高优先级的任务创建一个 Pulsar 主题,并为具有低级优先级的任务创建固定数量的主题。
  • 具有相同优先级的任务轮询主题以读取消息,当配额被填满时,具有相同优先级的任务移动到下一个优先级读取消息。
  • 相同优先级的任务可以修改配额,保证可以收到更多消息。
  • 消费者可以使用 Pulsar 的共享订阅动态添加和删除,无需增加和重新平衡分区。
  • BookKeeper 提供了在线添加存储资源的灵活性,无需重新平衡旧分区。

Figure 4

Pulsar 最佳实践

Pulsar 已经在生产环境中成功运行了几个月,为新的基于优先级的推送通知系统提供服务。 在生产环境中采用和运行 Pulsar 的整个过程中,我们收集了一些关于如何使 Pulsar 在生产环境中顺利高效地运行的最佳实践。

  • 不同的订阅是相对独立的。 如果你想重复消费一个主题的一些消息,你需要使用不同的subscriptionName来订阅。 添加新订阅时监控您的积压工作。 Pulsar 使用基于订阅的保留机制。 如果您有未使用的订阅,请将其删除; 否则,您的积压工作将继续增长。
  • 如果没有订阅主题,则默认丢弃发送到该主题的消息。 因此,如果生产者先向主题发送消息,然后消费者稍后接收消息,则需要确保在生产者向主题发送消息之前已经创建了订阅; 否则一些消息将不会被消费。
  • 如果没有生产者向某个主题发送消息,或者没有消费者订阅某个主题,则该主题会在一段时间后被删除。 您可以通过将 brokerDeleteInactiveTopicsEnabled 设置为 false 来禁用此行为。
  • TTL 和其他策略适用于整个命名空间而不是主题。
  • 默认情况下,Pulsar 将元数据存储在 ZooKeeper 的根 znode 下。 建议配置带有前缀 zookeeper 路径的 Pulsar 集群。
  • Pulsar 的 Java API 与 Kafka 不同,即消息需要在 Pulsar 中显式确认。
  • Pulsar 仪表板中显示的存储大小与 Prometheus 中显示的存储大小不同。 Prometheus 中显示的存储大小是总的物理存储大小,包括所有副本。
  • 增加 dbStorage_rocksDB_blockCacheSize 以防止读取大量积压的速度变慢。
  • 更多的分区导致更高的吞吐量。
  • 在对生产集群中的问题进行故障排除时,使用 statsstats-internal 检索主题统计信息。
  • Pulsar 中默认的 backlogQuotaDefaultLimitGB 为 10 GB。 如果您使用 Pulsar 存储多天的消息,建议增加数量或为您的命名空间设置较大的配额。 为您的用例选择适当的 backlogQuotaDefaultRetentionPolicy,因为默认策略是 producer_request_hold,当您用完配额时拒绝生产请求。
  • 根据您的用例设置积压配额。
  • 由于 Pulsar 直接在 broker 的缓存中读取和分发消息,因此 Prometheus 中 BookKeeper 的读取时间指标大部分时间可以为空。
  • Pulsar 将消息写入日志文件并同步写入缓存,写入缓存异步刷新回日志文件和 RocksDB。 推荐使用 SSD 存储日志文件。

总结

我们已经为一些用例成功运行了基于 Pulsar 的新解决方案几个月。 Pulsar 表现出极大的稳定性。 我们会持续关注 Pulsar 社区中的新闻、更新和活动,并将新功能用于我们的用例。

2018 年从 ASF 孵化器毕业的顶级项目,Pulsar 与竞争对手相比具有许多吸引人的特性和优势,例如异地复制、多租户、无缝集群扩展、读写分离等。

Pulsar 社区还很年轻,但采用 Pulsar 替代许多遗留消息系统的趋势已经迅速增长。

在采用和运行 Pulsar 的过程中,我们遇到了一些问题,非常感谢 StreamNative 的 Jia Zhai 和 Sijie Guo 提供的优质支持。

© 北京原流科技有限公司Apache、Apache Pulsar、Apache BookKeeper、Apache Flink 及相关开源项目名称均为 Apache 软件基金会商标。条款隐私