sharetwitterlinkedIn

Kafka-on-Pulsar 突破性进展:2.8.0 及更高版本的连续偏移量实现

December 29, 2021
head img

协议处理器是 2020 年一月份发布的 Pulsar 2.5.0 所引入的新功能,目的是将 Pulsar 的能力扩展到其他消息领域。默认情况下 Pulsar Broker 仅支持 Pulsar 协议。而通过协议处理器,Pulsar Broker 就可以支持其他消息协议,包括 Kafka、AMQP 以及 MQTT(现已新增 RocketMQ)。这使得 Pulsar 可以与基于其他消息技术的应用进行交互,从而扩展 Pulsar 生态系统。

Kafka-on-Pulsar (KoP) 就是一种协议处理协议,它将原生 Kafka 协议引入了 Pulsar,使得开发人员能够使用现有的 Kafka 应用将数据发布到 Pulsar 或从 Pulsar 读取数据,而无需更改代码。KoP 极大降低了 Kafka 用户使用 Pulsar 的壁垒,这让 KoP 成为最受欢迎的协议处理器之一。

KoP 解析 Kafka 协议,并通过 Pulsar 提供的流式存储抽象接口直接访问 BookKeeper。虽然 Kafka 和 Pulsar 有许多通用的概念,例如主题和分区,但在 Pulsar 中没有对应 Kafka 偏移量的概念。KoP 的早期版本通过一种简单的转换来应对这个问题,但这种转换不支持连续偏移量,同时也容易出现问题。

为了解决这个痛点,KoP 2.8.0 引入了 Broker Entry Metadata,以实现连续偏移量。这个更新使得 KoP 可用并且生产就绪。需要特别注意的是,这个更新破坏了向后兼容性。本文将深入探讨 KoP 2.8.0 之前和之后分别是如何实现偏移量的,并解释该突破性变化背后的基本原理。

版本兼容性说明

Pulsar 2.6.2 版本之后,KoP 版本即随着相应的 Pulsar 版本而更新。KoP x.y.z.m 版本对应 Pulsar x.y.z 版本,其中 m 是补丁版本号。例如,最新的 KoP 2.8.1.22 版本与 Pulsar 2.8.1 版本兼容。本文中 2.8.0 同时指代 Pulsar 2.8.0 和 KoP 2.8.0。

Kafka 和 Pulsar 的消息标识符

Kafka 偏移量

在 Kafka 中,偏移量是一个 64 位整数,表示消息在特定分区中的位置。Kafka 消费者可以向分区提交偏移量。如果偏移量提交成功,那么消费者重启后就能够从已提交的偏移量位置继续消费。

Kafka 偏移量是连续的,遵守如下约束:

  1. 第一条消息的偏移量为 0。
  2. 如果最后一条消息的偏移量为 N,那么下一条消息的偏移量将会是 N + 1。

Kafka 将消息存储在每个 broker 的文件系统中:

  • 每个分区切分为分片
  • 每个分片是一个文件,存储特定偏移量范围内的一组消息
  • 每个偏移量有一个位置,即消息的起始文件偏移量(文件偏移量是指字符在文件中的位置,而 Kafka 偏移量是消息在一个分区中的索引。)

由于每条消息的头部都记录了消息大小,所以对于给定偏移量,Kafka 可以很容易地找到其分片文件以及位置。

Pulsar 消息 ID

Kafka 将消息存储到每个 Broker 上的文件系统,而 Pulsar 则不同,它使用 BookKeeper 作为其存储系统。在 BookKeeper 中:

  • 每个日志单元称为一个 *Entry**
  • 日志 Entry 流称为 Ledger
  • 存储 Entry Ledger 的单独的服务器称为 Bookies

Bookie 可以通过 64 位 Ledger ID 和 64 位 Entry ID 找到任何 Entry。Pulsar 可以在一个 Entry 中存储单条消息或一批消息。因此,Pulsar 的消息 ID 由 Ledger ID、Entry ID、 批索引(如果不是批量消息则为 -1)以及分区编号组成,Pulsar 可通过这种消息 ID 找到一条消息。

就像 Kafka 消费者可以提交偏移量来记录消费位置一样,Pulsar 消费者可以确认消息 ID 来记录消费位置。

KoP 如何处理 Kafka 偏移量

KoP 需要如下 Kafka 请求来处理 Kafka 偏移量:

  • PRODUCE:当 Kafka 生产者生产的消息被持久化之后,KoP 需要告诉 Kafka 生产者第一条消息的偏移量。然而 BookKeeper 客户端只返回一个消息 ID。
  • FETCH:当 Kafka 消费者想要从指定偏移量开始获取消息时,KoP 需要找到对应的消息 ID 并从 Ledger 中读取相应的 Entry。
  • LIST_OFFSET:查找最早或最新的可用消息,或者按时间戳查找消息。

我们必须支持计算特定消息的偏移量,或通过给定的偏移量定位消息。

KoP 2.8.0 之前版本如何实现偏移量

实现细节

如前文所述,Kafka 通过分区编号和偏移量来定位消息,而 Pulsar 通过消息 ID来定位消息。在 Pulsar 2.8.0 之前,KoP 简单地在 Kafka 偏移量和 Pulsar 消息 ID 之间进行一个转换。将 64 位的偏移量映射为 20 位 Ledger ID、32 位 Entry ID 以及 12 位批索引。如下是一个简单的 Java 实现。

    public static long getOffset(long ledgerId, long entryId, int batchIndex) {
        return (ledgerId << (32 + 12) | (entryId << 12)) + batchIndex;
    }

    public static PositionImpl getPosition(long offset) {
        long ledgerId = offset >>> (32 + 12);
        long entryId = (offset & 0x0F_FF_FF_FF_FF_FFL) >>> BATCH_BITS;
        // BookKeeper only needs a ledger id and an entry id to locate an entry
        return new PositionImpl(ledgerId, entryId);
    }

在本文中,我们使用 (ledger id, entry id, batch index) 来表示一个消息 ID。例如,假设一个消息的 ID 是 (10, 0, 0),则转换后的偏移量为 175921860444160。在一些情况下这样的数值能正常工作,因为偏移量是单调递增的。然而当发生 Ledger 翻转,或应用程序想要手动管理偏移量时,就会出现问题。下面详细介绍这种简单转换方法存在的问题。

简单转换存在的问题

转换后的偏移量不连续,这会导致许多严重问题。

例如,假设当前消息 ID 是 (10, 5, 100)。如果发生 Ledger 翻转,则下一条消息的 ID 可能是 (11, 0, 0)。在这种情况下,两条消息的偏移量分别为 175921860464740 和 193514046488576,两者差了 17,592,186,023,836。

KoP 利用 Kafka 的 MemoryRecordBuilder 将多条消息合并为一个批量消息。 MemoryRecordBuilder 必须确保批量大小小于 32 位整数的最大值 (4,294,967,296)。在上文示例中,两个连续偏移量的差值远大于 4,294,967,296。这将导致抛出 Maximum offset delta exceeded 异常。

为了避免该异常,在使用 KoP 2.8.0 之前版本时,我们必须配置 maxReadEntriesNum 为 1 (此配置限制 BookKeeper 客户端读取的最大 Entry 条数)。如此一来,每个 FETCH 请求只读取一个 Entry,会显著降低性能。

然而,即使使用 maxReadEntriesNum=1 这种变通方法,这种转换实现在某些场景下也不能正常工作。例如,Kafka 与 Spark 的集成依赖于 Kafka 偏移量的连续性。当消费偏移量为 N 的消息后,Spark 会寻找下一个偏移量 (N + 1)。但是偏移量 N + 1 可能无法转换为有效的消息 ID。

转换方法还存在其他问题。而在 2.8.0 之前版本,没有好办法实现连续偏移量。

自 KoP 2.8.0 版本的连续偏移量实现

实现连续偏移量的解决方案是将偏移量记录到消息的元数据中。然而,偏移量是由 Broker 端在将消息发布到 Bookie 之前决定的,而消息的元数据则是在客户端构建的。为了解决这个问题,我们需要在 Broker 端做一些额外的工作:

  1. 反序列化元数据
  2. 设置元数据的“偏移量”属性
  3. 再次序列化元数据,包括重新计算校验和值

这会导致 Broker 端的 CPU 开销显著增加。

轻量级 Broker Entry 元数据

PIP 70 引入了轻量级 Broker Entry 元数据。它是 BookKeeper Entry 的元数据,并且只在 Broker 内部可见。

默认的消息流如下图所示:

如果配置了 brokerEntryMetadataInterceptors,即配置一组 Broker Entry 元数据拦截器,那么消息流将会是:

可以看到 Broker Entry 元数据存储在 Bookie 上,但对 Pulsar 消费者不可见。

2.9.0 版本之后,可以将 Pulsar 消费者配置为可以读取 Broker Entry 元数据。

每个 Broker Entry 元数据拦截器都在消息元数据前面加上特定的元数据(称之为 “Broker Entry 元数据”)。由于 Broker Entry 元数据和消息元数据是独立的,所以 Broker 无需反序列化消息元数据。此外,BookKeeper 客户端支持发送包含多个 ByteBuf 的 Netty CompositeByteBuf,而无需任何复制操作。从 BookKeeper 客户端角度看,只是将一些额外字节发送到套接字缓冲区。因此,额外的开销会很低且可接受。

索引元数据

我们需要配置 AppendIndexMetadataInterceptor (即索引元数据截器) 来支持 Kafka 偏移量。

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

Pulsar Broker 中有个名为 “Managed Ledger” 的组件,它管理分区中的所有 Ledger。索引元数据拦截器维护了一个从 0 开始的索引。Pulsar 使用术语“索引”而不是“偏移量”。

每次将 Entry 写入 Bookie 之前,都会发生如下两件事:

  1. 将索引序列化到 Broker Entry 元数据中。
  2. 将索引自增 Entry 中的消息数目。

之后,每个 Entry 记录第一条消息的索引,相当于 Kafka 中的“基础偏移量”概念。

现在,我们需要保证即使分区的 owner Broker 宕机,索引元数据拦截器也能从某个地方恢复索引。

在某些场景下,Managed Ledger 需要将其元数据存储起来(通常存储到 ZooKeeper)。例如,当一个 Ledger 发生翻转,Managed Ledger 需要将所有 Ledger ID 归档到一个 z-node。这里我们不深入研究元数据的格式,只需要知道在 Managed Ledger 元数据中有一个属性映射。

在将元数据存储到 ZooKeeper (或其他元数据存储) 之前:

  1. 从索引元数据拦截器中检索索引,该索引代表了最新消息的索引。
  2. 向属性映射中添加一条属性,属性名为 “index”,属性值为索引值。

每次初始 Managed Ledger 时,都会从元数据存储中恢复元数据。那时,我们可以将索引元数据拦截器中的索引设置为“index”键关联值。

KoP 如何实现连续偏移量

让我们回顾一下KoP 如何处理 Kafka 偏移量一节,看看在如下 Kafka 请求中如何处理偏移量。

  • PRODUCE

    当 KoP 处理 PRODUCE 请求时,它利用 Managed Ledger 将消息写入 Bookie。相关 API 有一个回调可以访问 Entry 数据。

        @Override
        public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
    

    我们只需要从 entryData 中解析出 Broker Entry 元数据,然后检索索引即可。该索引就是返回给 Kafka 生产者的基础偏移量。

  • FETCH

    FETCH 是通过给定偏移量找到消息位置 (Ledger ID 和 Entry ID)。KoP 实现了一个回调,从 Entry 中读取索引并与给定的偏移量进行比较。然后将回调传给 OpFindNewest 类,该类使用二分查找算法来查找 Entry。

    二分查找可能要花一些时间。但它仅发生在初始搜索中,除非 Kafka 消费者断开连接。当找到位置后,会创建一个非持久化的游标来记录该位置。随着 fetch 偏移量的增加,游标会移动到更新的位置。

  • LIST_OFFSET

    • 最早:获得 Managed Ledger 中的第一个有效位置,然后读取该位置的 Entry,并解析索引。
    • 最新:从索引元数据拦截器中检索索引,并加一。需要注意的是,Kafka 中的最新偏移量(也被称为 LEO)是下一个将要分配给消息的偏移量,而索引元数据拦截器中的索引则是分配给最新消息的偏移量。
    • 按时间戳:首先利用 Broker 的基于时间戳的二分查找找到目标 Entry,然后从 Entry 中解析出索引。

从 KoP 2.8.0 之前的版本升级到 2.8.0 或更高版本

KoP 2.8.0 实现的连续偏移量是有折衷的 —— 向后兼容性被破坏。KoP 2.8.0 之前版本存储的偏移量无法被 KoP 2.8.0 或更高版本识别。

如果在此之前你还没有使用过 KoP,需将 Pulsar 升级到 2.8.0 或更高版本后使用相应版本的 KoP。

如果你在此之前已经使用过 2.8.0 之前版本的 KoP,则需要知道从低于 2.8.0 版本到 2.8.0 或更高版本有突破性变化。使用新版本前,你必须删除 __consumer_offsets 主题以及 KoP 之前使用过的所有主题。

KoP 中有一个最新的功能,可以通过启用配置来跳过这些旧消息。这个功能将包含在 2.8.1.23 或更高版本。注意:旧消息仍将无法访问,这个功能只是节省了删除旧主题的工作量。

总结

本文首先解释了 Kafka 偏移量的概念,以及 Pulsar 类似的消息 ID 概念。然后讲了 KoP 在 2.8.0 版本之前是如何实现 Kafka 偏移量的及其带来的相关问题。

为了解决这些问题,Pulsar 2.8.0 引入了 Broker Entry 元数据。基于此特性,通过相应的拦截器实现了索引元数据。之后,KoP 可以利用索引元数据拦截器来实现连续偏移量。

最后,由于这是一个突破性的变化,我们谈了从 2.8.0 之前版本到 2.8.0 或更高版本的升级。强烈建议直接尝试 KoP 2.8.0 或更高版本。

更多资源

© StreamNative, Inc. 2022Apache, Apache Pulsar, Apache BookKeeper, Apache Flink, and associated open source project names are trademarks of the Apache Software Foundation.TermsPrivacy