假设我们的MQ使用都没有问题,但是如果消费者系统的数据库挂了呢?因为我们一直都是假设了一个场景,就是生产者在处理完自己的逻辑之后会推消息到MQ,然后下游消费者系统从MQ里获取消息去执行后续的处理。
那么如果这个时候,消费者系统的数据库宕机了,同样会使消费者从MQ里获取到消息之后,消费线程就会挂掉,没办法继续进行处理。
所以针对这样的场景,消费者系统要怎么处理?应该如何重试?
在下面代码片段中,我们注册了一个监听器回调函数,当consumer获取到消息之后,就会调用这个函数进行处理
consumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 在这里对获取到的msgs订单消息进行处理
// 比如增加积分、发送优惠券、通知发货,等等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
我们可以在这个回调函数中对消息进行处理,处理完之后,就可以告诉RocketMQ Consumer这批消息的处理结果。
比如,如果返回的是CONSUME_SUCCESS,那么Consumer就知道这批消息处理完成了,就会提交这批消息的offset到broker上去,然后下次就会继续从broker上获取下一批消息来处理。
但是如果此时我们在上面的回调函数中,对一批消息进行处理的时候,因为数据库宕机了,导致处理逻辑无法完成,此时我们还能返回CONSUME_SUCCESS吗?如果你返回的话,下次就会处理下一批消息,但是这批消息其实没有处理成功,此时必然就导致这批消息丢失了。
如果因为数据库宕机,导致对这批消息处理是异常的,就应该返回一个RECONSUME_LATER状态。
告诉RocketMQ这批消息处理有异常,过段时间再次给我这批消息让我重新试一下。
所以我们的代码应该改成下面这样:
consumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 在这里对获取到的msgs订单消息进行处理
// 比如增加积分、发送优惠券、通知发货,等等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 如果因为数据库宕机等问题,对消息处理失败了
// 此时返回一个稍后重试的状态
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
);
那么RocketMQ在收到你返回的RECONSUME_LATER状态之后,是如何让你进行消费重试的呢?
RocketMQ有一个针对这个ConsumerGroup的重试队列,如果返回了RECONSUME_LATER状态,他会把你这批消息放到这个消费组的重试队列中去。
比如你的消费者组的名称是"VoucherConsumerGroup",那么他会有一个“
%RETRY%VoucherConsumerGroup”这个名字的重试队列。
重试队列中的消息会按照配置的时间再次给消费者,让消费者进行处理,如果再次失败,那么会再过一段时间让消费者进行处理,默认最多是重试16次,每次重试之间的间隔时间是可以配置的:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
那么如果在16次重试范围内消息处理成功了,自然就没问题了,但是如果你对一批消息重试了16次还是无法处理成功呢?这个时候会把消息放到死信队列中。
其实就是一批消息交给消费者去处理,消费者重试了16次还一直没有处理成功,就不要继续重试这批消息了,就可以认为他们死掉了就可以了,然后这批消息会自动进入死信队列。
死信队列的名字是"%DLQ%VoucherConsumerGroup"
联系客服