打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
RocketMQ - 如何用死信队列解决消费者异常

收藏小程序 IT藏经楼 大量资源免费分享

如果消费者系统的数据库宕机,会怎么样?

假设我们的MQ使用都没有问题,但是如果消费者系统的数据库挂了呢?因为我们一直都是假设了一个场景,就是生产者在处理完自己的逻辑之后会推消息到MQ,然后下游消费者系统从MQ里获取消息去执行后续的处理。

那么如果这个时候,消费者系统的数据库宕机了,同样会使消费者从MQ里获取到消息之后,消费线程就会挂掉,没办法继续进行处理。

所以针对这样的场景,消费者系统要怎么处理?应该如何重试?

数据库宕机的时候,你还可以返回CONSUME_SUCCESS吗?

在下面代码片段中,我们注册了一个监听器回调函数,当consumer获取到消息之后,就会调用这个函数进行处理

consumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context)
{
// 在这里对获取到的msgs订单消息进行处理
// 比如增加积分、发送优惠券、通知发货,等等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);

我们可以在这个回调函数中对消息进行处理,处理完之后,就可以告诉RocketMQ Consumer这批消息的处理结果。

比如,如果返回的是CONSUME_SUCCESS,那么Consumer就知道这批消息处理完成了,就会提交这批消息的offset到broker上去,然后下次就会继续从broker上获取下一批消息来处理。

但是如果此时我们在上面的回调函数中,对一批消息进行处理的时候,因为数据库宕机了,导致处理逻辑无法完成,此时我们还能返回CONSUME_SUCCESS吗?如果你返回的话,下次就会处理下一批消息,但是这批消息其实没有处理成功,此时必然就导致这批消息丢失了。

如果对消息的处理有异常,返回RECONSUME_LATER状态

如果因为数据库宕机,导致对这批消息处理是异常的,就应该返回一个RECONSUME_LATER状态。

告诉RocketMQ这批消息处理有异常,过段时间再次给我这批消息让我重新试一下。

所以我们的代码应该改成下面这样:

consumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context)
{
try {
// 在这里对获取到的msgs订单消息进行处理
// 比如增加积分、发送优惠券、通知发货,等等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 如果因为数据库宕机等问题,对消息处理失败了
// 此时返回一个稍后重试的状态
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

}
}
);

RocketMQ是如何让你进行消费重试的

那么RocketMQ在收到你返回的RECONSUME_LATER状态之后,是如何让你进行消费重试的呢?

RocketMQ有一个针对这个ConsumerGroup的重试队列,如果返回了RECONSUME_LATER状态,他会把你这批消息放到这个消费组的重试队列中去。

比如你的消费者组的名称是"VoucherConsumerGroup",那么他会有一个“
%RETRY%VoucherConsumerGroup”这个名字的重试队列。

重试队列中的消息会按照配置的时间再次给消费者,让消费者进行处理,如果再次失败,那么会再过一段时间让消费者进行处理,默认最多是重试16次,每次重试之间的间隔时间是可以配置的:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

如果连续重试16次还是无法处理消息,然后怎么办?

那么如果在16次重试范围内消息处理成功了,自然就没问题了,但是如果你对一批消息重试了16次还是无法处理成功呢?这个时候会把消息放到死信队列中。

其实就是一批消息交给消费者去处理,消费者重试了16次还一直没有处理成功,就不要继续重试这批消息了,就可以认为他们死掉了就可以了,然后这批消息会自动进入死信队列。

死信队列的名字是"%DLQ%VoucherConsumerGroup"

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
探索RocketMQ的重复消费和乱序问题
如何避免消息丢失
RocketMQ介绍
10分钟搞懂!消息队列选型全方位对比
Kafka 实现延迟队列、死信队列、重试队列
消息中间件应用的常见问题及优化方案
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服