sharetwitterlinkedIn

使用 Apache Pulsar 为腾讯的联邦学习提供动力

head img

执行摘要

腾讯 AngelPowerFL 是一个分布式联邦学习平台,可支持万亿级并发训练。 Angel PowerFL已广泛应用于腾讯金融云、广告联合建模等业务。 该平台需要一个稳定可靠的消息传递系统,并保证高性能和数据隐私。 在研究了不同的解决方案并比较了几个消息队列之后,Angel PowerFL 采用了 Apache Pulsar 作为联邦学习 (FL) 中的数据同步解决方案。

在这篇博客中,腾讯 Angel PowerFL 团队分享了他们如何基于 Pulsar 构建联邦通信,他们在使用 Pulsar 时遇到的挑战,以及他们如何解决这些问题并为 Pulsar 社区做出贡献。 腾讯在生产中使用 Pulsar 证明它提供了机器学习平台所需的稳定性、可靠性和可扩展性。

内容

关于腾讯 Angel PowerFL

Federated learning(FL) 是一种机器学习技术,可在多个分散的边缘设备、服务器或孤立的数据中心训练统计模型,同时保持数据本地化。这些去中心化设备协同学习共享预测模型,同时将训练数据保留在设备上,而不需要将数据上传并存储在中央服务器上。因此,金融行业和医院等需要在严格的隐私约束下运营的组织也可以参与模型训练。

Angel是一个基于Parameter Servers哲学的分布式机器学习平台(类似于数据库,这是机器学习应用的核心部分,用于存储机器学习模型的参数并将它们提供给客户)。 Angel结合腾讯大数据进行性能调优,获得了广泛的适用性和稳定性,在处理高维模型方面表现出越来越大的优势。

腾讯 Angel PowerFL 基于天使机器学习平台构建。 Angel Parameter Server (Angel-PS) 可以支持数万亿个模型同时训练,所以 Angel PowerFL 将计算从 Worker(一个在不同线程上处理接收到的任务的逻辑组件,并通过回调方法给你反馈)迁移到参数服务器 (PS)。 Angel PowerFL 为联邦学习算法提供计算、加密、存储、状态同步等基本操作接口,并与进程调度器模型协调参与者。 Angel PowerFL 已广泛应用于腾讯金融云、腾讯广告联合建模等业务。  图1:腾讯 Angel PowerFL系统架构

通信服务要求

在联合培训课程期间,参与者通过通信模型传输大量加密数据。 因此,Angel PowerFL 平台需要一个稳定可靠的消息传递系统,以提供高性能并确保数据隐私。

稳定可靠

联邦学习任务持续几分钟到几小时。 学习算法需要准确的数据,不同算法的数据传输峰值不同。 所以我们需要稳定、健壮的通信模型服务来避免数据丢失。

高吞吐量和低延迟

Angel PowerFL 使用 Spark 处理计算。 Executor 的并发执行,即为您的应用程序运行计算和存储数据的进程,会生成大量中间数据。 为了有效地将加密数据传输给其他方,通信模型必须支持低延迟和高吞吐量。

数据隐私

我们的联邦学习参与者分布在不同的公司。 尽管所有数据都使用加密模型加密,但在公共网络上传输它们会带来风险。 因此,我们需要一个安全可靠的通信模型来保护数据在公共网络中不受攻击。

为什么选择 Apache Pulsar

当我们研究联合通信服务的解决方案时,我们考虑了 RPC(远程过程调用)直接连接、HDFS(Hadoop 分布式文件系统)同步和 MQ(消息队列)同步。 由于我们对安全性和性能有很高的要求,所以我们决定采用 MQ 同步方案。 Apache Pulsar、Kafka、RabbitMQ 和 TubeMQ 等多个 MQ 选项可用。 我们咨询了腾讯数据平台部的 MQ 团队,他们推荐了 Pulsar。 然后,我们对 Pulsar 进行了进一步的研究,发现 Pulsar 的内置特性完美地满足了我们对消息系统的要求。

下面,我们总结了为什么 Pulsar 最适合我们的联邦通信。

分层和以细分为中心的架构

Apache Pulsar 是一个云原生的分布式消息传递和事件流平台,采用分层架构,将计算与存储解耦。 Apache Pulsar 集群由两层组成:无状态服务层和有状态存储层。 服务层由一组接收和传递消息的代理组成,存储层由一组称为 bookie 的 Apache BookKeeper 存储节点组成,用于持久存储消息。

与 RabbitMQ 和 Kafka 等传统消息系统相比,Pulsar 具有独特且差异化的架构。 Pulsar 架构的一些独特方面包括:

  • 将 brokers 与bookies 分开,并允许独立的可扩展性和容错性,从而提高系统可用性。
  • 基于Segment的存储架构和分层存储,数据在所有bookie之间均匀分布和平衡,容量不受单个bookie节点的限制。
  • BookKeeper 安全可靠,确保不丢失数据。 此外,BookKeeper 支持批量刷写和更高的吞吐量。

 图 2:Pulsar 架构

异地复制

Pulsar 提供内置的异地复制,用于在多个数据中心之间同步或异步复制数据,允许我们有选择地限制复制。 默认情况下,消息会复制到为命名空间配置的所有集群。 如果我们想将消息复制到一些指定的集群,我们可以指定一个复制列表。

 图 3:Pulsar 消息架构

在上图中,每当 P1、P2 和 P3 生产者分别在 Cluster-A、Cluster-B 和 Cluster-C 集群中向 T1 主题发布消息时,这些消息都会立即跨集群复制。 一旦 Pulsar 复制了消息,C1 和 C2 消费者就可以从各自的集群中消费这些消息。

可扩展性

借助基于分段的存储架构,Pulsar 将主题分区划分为更小的块,称为片段。 每个段将数据存储为 Apache BookKeeper 分类帐,构成分区的段集分布在 Apache BookKeeper 集群中。 这种设计使管理容量和可扩展性变得更加容易,并且满足了我们对高吞吐量的需求。 让我们仔细看看这些元素:

  • Easy to manage capacity: 主题分区的容量可以扩展到整个BookKeeper集群,不受单个节点容量的限制。
  • Easy to scale out: 我们不需要重新平衡或复制数据以进行扩展。 添加新的 bookie 节点时,它仅用于新的 segment 或其副本。 此外,Pulsar 重新平衡了段分布和集群中的流量
  • High throughput: 写入流量分布在存储层,没有分区写入竞争单个节点的资源。 Apache Pulsar 的多层架构以及计算和存储层的解耦提供了稳定性、可靠性、可扩展性和高性能。 此外,其内置的异地复制使我们能够在不同公司的各方之间同步消息队列。 最后,Pulsar 的身份验证和授权有助于确保传输中的数据隐私。 这些都是 Angel PowerFL 所需的功能,也是我们决定在 Angel PowerFL 平台中采用 Apache Pulsar 的原因。

基于 Apache Pulsar 的联合通信解决方案

在 Angel PowerFL 中,我们将每个业务都标识为一个 Party,每个 Party 都有一个唯一的 ID,例如 10000/20000。 这些缔约方分布在同一公司的不同部门(没有网络隔离)或不同的公司(跨公共网络)。 来自各方的数据通过 Pulsar 异地复制同步。 以下是我们基于 Apache Pulsar 的通信服务设计。  图 4:基于 Pulsar 的 Angel PowerFL 通信服务

FL 训练任务通过消息的生产者和消费者连接到 Party 的 Pulsar 集群。 集群名称遵循 fl-pulsar-[partyID] 模式。 训练任务生成中间数据后,生产者将数据发送到本地 Pulsar 集群,然后 Pulsar 集群通过 Pulsar 代理同步复制网络将数据发送到消费方。 消费方的消费者监控训练主题,消费数据,并对其进行处理。  图 5:Angel PowerFL 联合通信数据流

在训练期间,驱动程序和每个分区创建一个“通道”变量,它映射 Pulsar 中的特定主题。 生产者将所有交换数据发送到主题。

Angel PowerFL 支持多方联合,因此数据将在两个以上的集群中同步复制。 每个 FL 任务在任务参数中指定参与者,生产者通过调用 setReplicationClusters 接口确保数据仅在参与方之间传输。

我们在 Angel PowerFL 通信模型中充分利用了 Pulsar 地理复制、主题限制和令牌认证。 接

移除对 Global ZooKeeper 的依赖

在 Angel PowerFL 平台中,我们依靠 Local ZooKeeper 和 Global ZooKeeper 来部署 Pulsar 集群。 Local ZooKeeper 用于存储元数据,类似于 Kafka 中使用的方法。 Global ZooKeeper 在多个 Pulsar 集群之间共享配置信息。  图 6:脉冲星集群

每次我们向 Angel PowerFL 添加一个新的 Party 时,我们都必须为 Global ZooKeeper 部署一个子节点,或者在不同的公司或区域之间共享公共 ZooKeeper。 因此,添加新的 Party 会使部署集群和保护数据免受攻击变得更加困难。

Global ZooKeeper 中存储的元数据包括集群名称、服务地址、命名空间权限等。 Pulsar 支持创建和添加新集群。 我们在以下步骤中将联邦 Pulsar 集群注册到本地 ZooKeeper,从而消除对 Global ZooKeeper 的依赖。

  • Step 1: 为新加入的Party注册Pulsar集群
# OTHER_CLUSTER_NAME is the Pulsar cluster name of the Party to be registered
# OTHER_CLUSTER_BROKER_URL is the broker address of the Pulsar cluster

./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} \
    --url http://${OTHER_CLUSTER_HTTP_URL} \
    --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}
  • Step 2: 授权用于训练的命名空间访问集群
./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} \
     -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}

我们使用其 Pulsar 集群名称/服务地址注册新添加的 Party,并通过 geo-replication 与注册信息同步复制数据。

为客户端添加令牌认证

作为 Angel PowerFL 的通信模型,Pulsar 没有用户级别的权限控制。 为了保证客户端安全生产和消费数据,我们根据【Pulsar Client authentication using tokens based on JSON Web Tokens】添加了token认证(https://pulsar.apache.org/docs/en/security-jwt/#token- 身份验证概述)。 然后,我们需要为训练任务配置当前 Party 的服务地址和 admin 令牌。 由于 Angel PowerFL 部署在 Kubernetes 上,我们在容器中生成 Pulsar 集群所需的 Public/Private 密钥,然后将它们注册到 K8S secret。

# generate fl-private.key and fl-public.key
docker run --rm -v "$(pwd)":/tmp \
     apachepulsar/pulsar-all:2.5.2 \
     /pulsar/bin/pulsar tokens create-key-pair --output-private-key \
     /tmp/fl-private.key --output-public-key /tmp/fl-public.key 

# generate `admin-token.txt token` file
echo -n `docker run --rm -v \
     "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 \
     /pulsar/bin/pulsar tokens create --private-key \
     file:///tmp/fl-private.key --subject admin`
# register authentication to K8S
kubectl create secret generic token-symmetric-key \
     --from-file=TOKEN=admin-token.txt \
     --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}

在多集群中启用 Topic 自动回收

为 Pulsar 集群启用异地复制后,我们无法直接删除与命令一起使用的主题。 Angel PowerFL 训练任务是一次性的,所以我们需要在使用后及时回收这些主题并释放空间。 因此,我们配置 brokerDeleteInactivetopicsEnabled 参数以回收通过异地复制复制的主题,并确保:

  • 该 Topic 未连接到任何生产者或消费者。
  • 该 topic 未订阅。
  • 该 topic 没有消息保留。 我们通过配置 brokerDeleteInactivetopicsEnabledbrokerDeleteInactivetopicsFrequencySeconds 参数每三个小时在 Pulsar 集群中自动回收主题。

Enable topic throttling

在联合训练期间,数据流量峰值因数据集、算法和执行而异。 生产环境任务的最大数据量超过200G/h。 如果 Pulsar 断开连接或者生产或消费过程中出现异常,我们必须重新启动整个训练过程。

为了降低这种风险,我们采用了 Pulsar 节流。 Pulsar 在生产者端支持 message-ratebyte-rate 节流策略。 消息速率限制限制每秒产生的消息数量,字节速率限制限制每秒产生的消息大小。 在 Angel PowerFL 中,我们将消息大小设置为 4M,并通过消息速率限制(低于 30*4 = 120 M/s)将命名空间的消息数量限制为 30。

./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30

当我们最初测试消息速率限制时,它并没有很好地工作。 与腾讯数据平台部 MQ 团队调试后发现,如果配置了 topicPublisherThrottlingTickTimeMillis 参数,限流没有生效。 然后,我们在代理端启用了精确的主题发布率限制,并将这一改进贡献给了 Pulsar 社区。 详情参考【PR-7078:引入精准主题发布速率限制】(https://github.com/apache/pulsar/pull/7078)。

Configure topic unloading

Pulsar 根据集群中 broker 的负载动态地将 topic 分配给 broker。 如果拥有该主题的代理崩溃或过载,该主题会立即重新分配给另一个代理; 这个过程被称为主题卸载。 主题卸载意味着关闭主题,释放所有权,并将主题重新分配给负载较小的代理。 主题卸载通过负载均衡调整,客户端会遇到轻微抖动,一般持续10ms左右。 但是,我们在前期开始训练的时候,由于主题卸载,出现了很多连接异常。 以下是部分日志信息。

[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s

为了解决这个问题,我们进一步探索了代理、命名空间、捆绑包和主题。 Bundle 是 Pulsar 命名空间的一种分片机制。 命名空间被分割成一个捆绑列表,每个捆绑包含命名空间的一部分。 主题不直接分配给代理。 取而代之的是,每个主题都通过主题的安静分配给特定的捆绑包。 捆绑包相互独立,并分配给不同的代理。

我们没有在早期重用培训主题。 为了训练 LR 算法,创建了 2,000 多个主题,每个主题产生的数据负载各不相同。 我们怀疑在短时间内创建和使用多个主题会导致负载不平衡和频繁的主题卸载。 为了减少主题卸载,我们调整了 Pulsar 包的以下参数。

# increase the maximum number of topics that can be distributed by the broker
loadBalancerBrokerMaxTopics=500000
# enable automatic namespace bundle split
loadBalancerAutoBundleSplitEnabled=true
# increase the maximum number of topics that triggers bundle split
loadBalancerNamespaceBundleMaxTopics=10000
# increase the maximum number of messages that triggers bundle split
loadBalancerNamespaceBundleMaxMsgRate=10000 

同时,我们在创建命名空间时将包的默认数量设置为 64。

./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64

调整配置后,我们完美解决了频繁卸载主题的问题。

Pulsar on Kubernetes

Angel PowerFL 的所有服务都通过 Helm 部署在 Kubernetes 上。 作为图表之一,Pulsar 利用了 K8S 的资源隔离、可扩展性和其他优势。 在使用 Helm 部署 Pulsar 时,我们使用 Local Persistent Volume 作为存储,在 geo-replication 中使用 NodeSelector,在 bookies 中配置 useHostNameAsBookieID

Use Local Persistent Volume as storage

Pulsar 对 IO 很敏感,尤其是 bookie。 建议在生产环境中使用 SSD 或单独的磁盘。 当我们在 Angel PowerFL 中使用大数据集运行任务时,由于高 IO 实用性,经常出现“No Bookies Available”异常。 使用 Local Persistent Volume,我们将 bookie、ZooKeeper 和其他组件安装到单独的磁盘上并减少了 IO 竞争。 我们尝试用 Ceph 和 NFS 替换 Pulsar PV 存储,我们发现使用 Local Persistent Volume 时性能最好。

Use NodeSelector

代理需要访问对方的 Pulsar 代理容器,同时与异地复制同步复制数据。 在 Angel PowerFL 中,我们将网关机器单独标记,并在网关机器上安装代理,通过 NodeSelector 访问外部网络。

Configure useHostNameAsBookieID

Bookie 是有状态的。 我们在重建 bookie pod 后配置useHostNameAsBookieID,确保在 ZooKeeper 上注册的 ID 是 pod 的主机名。

未来的规划

我们已经在 Angel PowerFL 中使用 Apache Pulsar 一年了,我们在生产环境中运行 Pulsar 集群已经超过 8 个月了。 它稳定可靠,我们希望升级我们的 Pulsar 集群并改进 K8S 上的 Pulsar。

升级 Pulsar 到 2.6.x

目前,我们正在使用 Pulsar 2.5.2,并希望使用 Pulsar Key_Shared 订阅模式备份 Angel-PS 故障转移恢复。 Key_Shared 订阅模式在 Pulsar 2.6.0 (https://github.com/apache/pulsar/pull/5928) 中得到了增强,所以我们希望将 Pulsar 升级到 2.6.x。

支持 K8S 上 Pulsar 的多磁盘挂载

除了 YARN 计算资源,所有 Angel PowerFL 服务都在 Kubernetes 上运行。 作为图表之一,Pulsar 与其他服务一起部署,并使用 Local Persistent Volume 作为存储。 目前,bookie 上只能挂载一个磁盘(目录),所以我们无法充分利用多磁盘的机器。 为了解决这个需求,我们计划在 bookie 上挂载多个磁盘。

结论

我已经介绍了我们如何在 Angel PowerFL 平台中采用 Pulsar。 我们利用 Pulsar 的特性并根据我们的需求改进 Pulsar 的功能和性能。

作为云原生的分布式消息和事件流平台,Pulsar 具有许多突出的特性,已广泛应用于直播和短视频平台、零售和电子商务业务、媒体、金融等行业。 我们相信 Pulsar 的采用和社区将继续扩大。

特别感谢

感谢腾讯数据平台部MQ团队的支持和指导。 MQ 团队在 Apache Pulsar 和 TubeMQ 方面经验丰富,为 Apache Pulsar 社区做出了巨大贡献。 Apache Pulsar 是一个年轻、活跃、发展迅速的社区。 我们愿意与 Pulsar 社区合作,做出贡献,并建立一个更加繁荣的社区。

© 北京原流科技有限公司Apache、Apache Pulsar、Apache BookKeeper、Apache Flink 及相关开源项目名称均为 Apache 软件基金会商标。条款隐私