本文来自社区用户投稿,作者侯盛鑫,来自伴鱼。
在很多在线的业务系统中,由于业务逻辑处理出现异常,一条消息没有被确认,我们需要尽可能准备好优雅地处理故障。重试是我们的常用做法,一般我们从以下三方面入手进行重试:
那么,难道我们不能简单地让这种默认行为接管一切,然后重试消息直到成功吗?问题是这条消息可能永远不会成功。至少没有某种形式的手动干预它是不会成功的。于是乎,消费者就永远不会继续处理后续的任何消息,并且我们的消息处理将陷入困境,所以在重试一定次数后将采取死信队列的方法存储为确认成功消息。
如上图,Pulsar 采用非阻塞请求重试队列和死信队列(DLQ)来扩展现有事件驱动架构作用,通过这样处理我们就可以在不中断实时流量的情况下实现解耦、可观察的错误处理。
但是 Pulsar 默认情况下,自动重试这个选项是关闭的,我们可以设置 enableRetry 选项为 true,这样可以在这个消费者中进行重试。如下例子所示,消费者会从重试主题消费消息:
package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"time"
)
func main() {
cp := pulsar.ClientOptions{
URL: "pulsar://xxx.xxx.xxx.xxx:6650",
OperationTimeout: 30 * time.Second,
}
client, err := pulsar.NewClient(cp)
if err != nil {
return
}
defer client.Close()
d := &pulsar.DLQPolicy{
MaxDeliveries: 3,
RetryLetterTopic: "persistent://group/server/xxx-RETRY",
DeadLetterTopic: "persistent://group/server/xxx-DLQ",
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://group/server/xxx",
SubscriptionName: "test",
Type: pulsar.Failover,
RetryEnable: true,
DLQ: d,
NackRedeliveryDelay: time.Second * 3,
})
if err != nil {
return
}
ctx := context.Background()
for {
msg, err := consumer.Receive(ctx)
if err != nil {
return
}
if msg.Key() == 0 {
// 确认的处理
consumer.Ack(msg)
} else {
// 不确认,等 NackRedeliveryDelay 后将被重新投递到主队列进行消费
consumer.Nack(msg)
// 稍后处理,等 xx 秒后将被重新投递到重试队列
consumer.ReconsumeLater(msg, time.Second * 5)
// 以上方法二选其一
}
}
}
首先,如上样例自动创建了一个重试队列,产生重试消息需要两个条件其中一个:
NackRedeliveryDelay
控制。重试行为中的重试队列的重试行为是和时间相关的。目前主要通过 consumer.ReconsumeLater()
方法触发,一旦触发到重试队列,重试次数会相应在重试中减少,这里的 DLQPolicy 结构中的 RetryLetterTopic
是 Pulsar 为了进行重试在原本基础上新建的 topic,默认情况下是:{TopicName}-{Subscription}-RETRY
,这是为了最大程度不干扰主 topic 的数据的做法。
Golang 的 sdk 并没有完成 java sdk 中那样丰富多样的重试机制,但是却简单粗暴直接开放了 NackRedeliveryDelay
原始延迟时间的参数,这样方便了各种策略的定制化开发。
其中 DLQPolicy.MaxDeliveries
这个参数在消息出错时,将决定最多继续尝试发送多少次,如到用户设置的最大值,消息还没有成功发送,此时 Pulsar 会将消息推送到死信队列中,也就是 DLQPolicy.DeadLetterTopic
。
注意:RLQ 是一个延迟队列,消费用 shared 模式!
当重试次数用完时,信息将被路由到死信队列中,注意:此时消息状态会变成已确认。死信队列是一个不分区的持久化队列,用户可以根据自己的需求对信息消息做相应的处理。sdk 提供 DLQPolicy.DeadLetterTopic
参数来设置 “死信队列” 的名字。默认情况下死信队列名称是 :{TopicName}-{Subscription}-DLQ 。
到此为止,我们梳理一下流程:
重试队列实际上是一个延迟队列,未确认消息将维护一个时间相关的优先级队列;
我叫侯盛鑫,也可以我叫大云,目前就职于伴鱼基础架构,负责消息队列的维护与相关开发,Rust 日报小组中的菜鸡成员,喜欢研究存储,服务治理等方向。初次接触 Pulsar 就对存储和计算分离的结构所吸引,顺滑的生产者消费者接入和高吞吐让我好奇这个项目的实现,期望之后能在 Pulsar 的相关功能中做些贡献。