Skip to content

Latest commit

 

History

History
47 lines (34 loc) · 2.3 KB

7-RocketMQ实战-消息重试机制.md

File metadata and controls

47 lines (34 loc) · 2.3 KB

7-RocketMQ实战-消息重试机制

[TOC]

7.1 消息重试机制概述

  • RocketMQ提供了消息重试机制
    • 生产者:消息重投重试(保证数据的高可靠)
    • 消费者:消息处理异常(broker端到consumer端各种问题,比如网络原因闪断,消费端处理失败,ACK返回失败等)

7.2 consumer如何消息重试

7.2.1 需要重试的情况

  • Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。 Consumer 消费消息失败通常可以讣为 有以下几种情冴
    1. 由亍消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。返种错诨通常需要跳过返条消息,再消费其他消息,而返条失败的消息即使立刻重试消费,99%也丌成功,所以最好提供一种定时重试机制,即过 10s 秒后再重试。
    2. 由亍依赖的下游应用服务丌可用,例如 db 连接丌可用,外系统网络丌可达等。遇到返种错诨,即使跳过当前失败的消息,消费其他消息同样也会报错。返种情冴建议应用 sleep 30s,再消费下一条消息,返样可以减轻 Broker 重试消息的压力。

7.2.2 触发与配置重试

  • 捕获异常后 return ConsumeConcurrentlyStatus.RECONSUME_LATER; //requeue 一会再消费,会启动broker的重试机制

  • consumer网络异常时,这时候不会返回东西,broker会选择同一个主题下同一个组下的另一个consumer把消息消费掉

  • 在服务器端(rocketmq-broker端)的属性配置文件中加入以下行(延时级别):

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各级别与延时时间的对应映射关系。
  1. 这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
  2. 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
  3. 默认值就是上面声明的,可手工调整;
  4. 默认值已够用,不建议修改这个值。

7.2.3 处理重试次数过多的问题

  • 通过重试次数进行逻辑处理(补偿机制)
  • 重试之后会获取OrignMsgId
catch(exception e){
    if(msg.getReconsumeTime()==3){
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}