Apache Pulsar 2.7.0 支持事务、Topic 级别策略配置、Azure Blob 二级存储、支持原生 protobuf schema、支持 Pulsar Functions 端到端加密等,修复了 2.6.2 版本中的诸多问题,改进了一些功能,进一步丰富了 Pulsar 作为云原生流数据平台的功能。
Pulsar 2.7.0 版本中合并了来自社区的 450+ commit,下面一起来看看 2.7.0 版本有哪些更新吧!
Pulsar 事务支持事件流应用程序实现原子操作,同时实现消费、处理、生产消息。采用事务语义,Pulsar 的单分区和多分区都可以实现 exactly-once 语义。因此 Pulsar 可以应用于新的使用场景,即客户端(作为 producer 或 consumer)处理多个 Topic 和分区中的消息,并保证将这些消息作为一个消息单元来处理。事务的引入不仅增强了 Pulsar 的消息语义,也会进一步增强 Pulsar Functions 的处理保证。
目前,Pulsar 事务还在开发阶段。Pulsar 社区将会继续开发并完善事务的特性,推进事务在生产环境中的使用。
在 Pulsar 中启用事务,需要在 broker.conf
文件中配置以下参数。
transactionCoordinatorEnabled=true
初始化事务协调组件的元数据后,事务协调组件可以利用分区 topic 的优势,例如负载平衡,可以是事务协调组件分布在多个 broker 中。
bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone
对于 Pulsar 客户端,也需要启用事务特性。
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.enableTransaction(true)
.build();
Pulsar 事务的示例如下:
// Open a transaction
Transaction txn = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
// Publish messages with the transaction
producer.newMessage(txn).value("Hello Pulsar Transaction".getBytes()).send();
// Consume and acknowledge messages with the transaction
Message<byte[]> message = consumer.receive();
consumer.acknowledgeAsync(message.getMessageId(), txn);
// Commit the transaction
txn.commit()
关于 Pulsar 事务的更多信息,参阅 transaction 文档。关于 Pulsar 事务设计的更多信息,参阅 Pulsar PIP31。
Pulsar 2.7.0 引入了系统 Topic,用于维护所有策略更改事件,进而实现 Topic 级别的策略。目前,命名空间级别的策略都可以在 Topic 级别使用,因此用户可以在 Topic 级别自定义策略,而无需使用大量的元数据服务资源。使用 Topic 级别策略,用户可以更灵活地管理 Topic,同时不会给 ZooKeeper 增加负担。
在 Pulsar 中启用 Topic 级别策略,需要在 broker.conf
文件中配置以下参数。
systemTopicEnabled=true
topicLevelPoliciesEnabled=true
启用 Topic 级别策略后,可以通过 Pulsar Admin 更新 Topic 策略。为特定 Topic 设置数据保留的代码示例如下。
bin/pulsar-admin topics set-retention -s 10G -t 7d persistent://public/default/my-topic
关于 Topic 系统和 Topic 级别策略的更多信息,参阅 Pulsar PIP39。
Pulsar 2.7.0 支持 Azure Blob 二级存储。使用此特性,用户可以将历史数据卸载到 Azure Blob 存储,不仅为 Azure 云用户提供极大便利,还可以降低在 BookKeeper 中管理大量历史数据的成本。Pulsar 将在后续版本中为 Azure 云添加更多支持。
在 Pulsar 中使用 Azure Blob 存储,需要在 broker.conf
文件中配置以下参数。
managedLedgerOffloadDriver=azureblob
更多详细信息,参阅 PR-8436。
Pulsar 2.7.0 支持原生 protobuf schema,简化了 protobuf 与 Pulsar 集成的操作,能让 protobuf 用户使用 Pulsar 的更多功能。下面的代码展示了如何在 Java 客户端中使用原生 protobuf schema。
Consumer<PBMessage> consumer = client.newConsumer(Schema.PROTOBUFNATIVE(PBMessage.class))
.topic(topic)
.subscriptionName("my-subscription-name")
.subscribe();
更多详细信息,参阅 PR-8372。
在 Pulsar 中,租户、命名空间和 Topic 是集群的核心资源。Pulsar 2.7.0 支持用户限制集群中租户的数量、租户中命名空间的数量、命名空间中 Topic 的数量,以及 Topic 中的订阅数量。
在 Pulsar 中配置资源限制,需要在 broker.conf
文件中配置以下参数。
maxTenants=0
maxNamespacesPerTenant=0
maxTopicsPerNamespace=0
maxSubscriptionsPerTopic=0
配置资源限制可以为 Pulsar 管理员管理资源提供极大便利。
Pulsar 2.7.0 支持为 Pulsar Functions 添加端到端(End-to-End,e2e)加密。用户可以使用配置应用程序加密的公钥和私钥对。只有当密钥有效时,用户才可以解密加密的消息。
通过在命令行指定 --producer-config 即可在 Functions Worker 上启用端到端加密。更多详细信息,参阅 Pulsar Encryption。
更多详细信息,参阅 PR-8432。
2.7.0 版本发布前,Pulsar 没有用于重平衡 worker 上 function 调度程序的机制。Function 之间的工作负载可能相差较多。Pulsar 2.7.0 支持手动触发 function 重平衡,也可以周期性进行自动重平衡。
如果您有任何关于 Pulsar 的疑问或建议,欢迎通过邮件或 Slack 与我们联系。