在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。
为什么需要保证幂等性呢?是因为消息会重复消费。
为什么消息会重复消费?
明明已经消费了,为什么消息会被再次被消费呢?
不同的MQ产生的原因可能不一样
本文就以RocketMQ为例,来扒一扒RocketMQ中会导致消息重复消息的原因,最终你会发现,其实消息重复消费算是RocketMQ无奈的“bug”。

消息发送异常时重复发送
首先,我们来瞅瞅RocketMQ发送消息和消费消息的基本原理。

如图,简单说一下上图中的概念:
- Broker,就是RocketMQ的服务端,如上图就有两个服务实例
- Topic就是一类消息集合的名字
- Queue就是Topic的对应的队列,消息都存在Queue上,每个Topic都会有自己的几个Queue
所以,整个消息发送和消费过程大致如下:
- 生产者在发送消息之前根据负载均衡策略(默认是轮询)选择一个Queue,然后跟这个Queue所在的机器建立连接,把消息发送到这个Queue上
- 消费者只要消费这个Queue,那么就能消费到消息
在正常情况下,生产者的确是按照这个方式来发送消息的
但是当出现了异常时,这种异常包括消息发送超时、响应超时等等,RocketMQ为了保证消息成功发送,会进行消息发送的重试操作,默认情况下会最多会重试两次

重试操作比较简单,就是选择另一台机器的Queue来发送。
虽然重试操作可以很大程度保证消息能够发送成功,但是同时也会带来消息重复发送的问题。
举个例子,假设生产者向A机器发送消息,发生了异常,响应超时了,但是就一定代表消息没发成功么?
不一定,有可能会出现服务端的确接受到并处理了消息,但是由于网络波动等等,导致生产者接收不到服务端响应的情况,此时消息处理成功了,但是生成者还是以为发生了异常
此时如果发生重试操作,那么势必会导致消息被发送了两次甚至更多次,导致服务端存了多条相同的消息,那么就一定会导致消费者重复消费消息。
消费消息抛出异常
在RocketMQ的并发消费消息的模式下,需要用户实现MessageListenerConcurrently接口来处理消息

当消费者获取到消息之后会调用MessageListenerConcurrently的实现,传入需要消费的消息集合 msgs,这里提到的msgs很重要

如上代码,当消息消费出现异常的时候,status就会为null,后面就会将status设置成为RECONSUME_LATER。
RECONSUME_LATER翻译成功中文就是稍后重新消费的意思
所以从这可以看出,一旦抛出异常,那么消息之后就可以被重复消息。
到这其实可能有小伙伴觉得消息消费失败重新消费很正常,保证消息尽可能消费成功。
对,这句话不错,的确可以在一定程度上保证消费异常的消息可以消费成功。
但是坑不在这,而是前面提到的消费时传入的整个集合中的消息都需要被重新消费。
具体的原因我们接着往下看
当消息处理之后,不论是成功还是异常,都需要对结果进行处理,代码如下

当处理结果为RECONSUME_LATER的时候(异常会设置为RECONSUME_LATER),此时ackIndex会设置成-1,后面循环遍历的时候就会遍历到所有这次消费的消息,然后调用sendMessageBack方法,sendMessageBack方式是用来实现消息重新消费的逻辑,这里就不展开说了。
所以,一旦被消费的一批消息中出现一个消费异常的情况,那么就会导致整批消息被重新消费,从而会导致在出现异常之前的成功处理的消息都会被重复消费,非常坑。
不过好在消费时传入的消息集合中的消息数量是可以设置的,并且默认就是1

也就说默认情况下那个集合中就一条消息,所以默认情况下不会出现消费成功的消息被重复消费的情况。
所以这个参数不要轻易设置,一旦设置大了,就可能导致消息被重新消费。
除了并发消费消息的模式以外,RocketMQ还支持顺序消费消息的模式,也会造成重复消费,逻辑其实差不多,但是在实现消息重新消费的逻辑不一样。
消费者提交offset失败
首先来讲一讲什么是offset。
前面说过,消息在发送的时候需要指定发送到,消息最后会被放到Queue中,其实真正的消息不是在Queue中,Queue存的是每个消息的位置,但是你可以理解为Queue存的是消息。
而消息在Queue中是有序号的,这个序号就被称为offset,从0开始,单调递增1。

比如说,如上图,消息1的offset就是0,消息2的offset就是1,依次类推。
这个offset的一个作用就是用来管理消费者的消费进度。
当消费者在成功消费消息之后,需要将所消费的消息的offset提交给RocketMQ服务端,告诉RocketMQ,这个Queue的消息我已经消费到了这个位置了。
提交offset的代码就在上述第二节提到的处理结果的后面

这样有一个好处,那么一旦消费者重启了或者其它啥的要从这个Queue拉取消息的时候,此时他只需要问问RocketMQ服务端上次这个Queue消息消费到哪个位置了,之后消费者只需要从这个位置开始消费消息就行了,这样就解决了接着消费的问题。

但是RocketMQ在设计的时候,当消费完消息的时候并不是同步告诉RocketMQ服务端offset,而是定时发送。

如图,当消费者消费完消息的时候,会将offset保存到内存中的一个Map数据结构中,所以上面截图的那段代码其实是更新内存中的offset

而在消费者启动的时候会开启一个定时任务,默认是5s一次,会通过网络请求将内存中的每个Queue的消费进度offset发送给RocketMQ服务端。

由于是定时任务,所以就可能出现服务器一旦宕机,导致最新消费的offset没有成功告诉RocketMQ服务端的情况
此时,消费进度offset就丢了,那么消费者重启的时候只能从RocketMQ中获取到上一次提交的offset,从这里开始消费,而不是最新的offset,出现明明消费到了第8个消息,RocketMQ却告诉他只消费到了第5个消息的情况,此时必然会导致消息又出现重复消费的情况。
服务端持久化offset失败
上一节说到,消费者会有一个每隔5s钟的定时任务将每个队列的消费进度offset提交到RocketMQ服务端
当RocketMQ服务端接收到提交请求之后,会将这个消费进度offset保存到内存中

同时为了保证RocketMQ服务端重启消费进度不会丢失,也会开启一个定时任务,默认也是5s一次,将内存中的消费进度持久化到磁盘文件中

所以,整个消费进度offset的数据流转过程如下

当RocketMQ服务端重启之后,会从磁盘中读取文件的数据加载到内存中。
跟消费者产生的问题一样,一旦RocketMQ发生宕机,那么offset就有可能丢失5s钟的数据,RocketMQ服务端一旦重启,消费者从RocketMQ服务端获取到的消息消费进度就比实际消费的进度低,同样也会导致消息重复消费。
主从同步offset失败
在RocketMQ的高可用模式中,有一种名叫主从同步的模式,当主节点挂了之后,从节点可以手动升级为主节点对外提供访问,保证高可用。
在主从同步模式下,从节点默认每隔10s会向主节点发送请求,同步一些元数据,这些元数据就包括消费进度

当从节点获取到主节点的消费进度之后,会将主节点的消费进度设置到自己的内存中,同时也会持久化到磁盘。
所以整个消费进度offset的数据的流转过程就会变成如下

同样,由于也是定时任务,那么一旦主节点挂了,从节点就会丢10s钟的消费进度,此时如果从节点升级为主节点对外提供访问,就会出现跟上面提到的一样的情况,消费者从这个新的主节点中拿到的消费进度比实际的低,自然而然就会重复消费消息。
所以,总的来说,在消费进度数据流转的过程中,只要某个环节出现了问题,都有很有可能会导致消息重复消费。
重平衡
先来讲一讲什么是重平衡,其实重平衡很好理解,我说一下你就明白了。
前面说到,消费者是从队列中获取消息的

在RocketMQ中,有个消费者组的概念,一个消费者组中可以有多个消费者,不同消费者组之间消费消息是互不干扰的,所以前面提到的消费者其实都在消费组下

在同一个消费者组中,消息消费有两种模式:
- 集群消费模式
- 广播消费模式
由于RocketMQ默认是集群消费模式,并且绝大多数业务场景都是使用集群消费模式,所以这里就不讨论广播消费模式了。
集群消费模式是指同一条消息只能被这个消费者组消费一次,这就叫集群消费。
并且前面提到提交消费进度给RocketMQ服务端的情况只会集群消费模式下才会有,在广播消费模式不会提给到RocketMQ服务端,仅仅持久化到本地磁盘
同时前面说的消费者提交消费进度真正提交的是消费者组对于这个Queue的消费进度,而不是指具体的某个消费者对于Queue消费进度。
虽然说这里将前面提到的一些含义更深一步,但是并不妨碍前面的理解。
集群消费的实现就是将队列按照一定的算法分配给消费者,默认是按照平均分配的。

如图所示,假设某个topic有4个Queue,有个消费者组订阅了这个topic,这个消费者组有两个消费者1和消费者2,此时每个消费者就可以被分配两个队列,这样就能保证消息正常情况下只会被消费一次。如果只有一个消费者,那么这个消费者就会消费所有队列,很好理解。
接着后面又启动了一个消费者3,此时为了保证刚上线的消费者3能够消费消息,就要进行重平衡操作,重新分配每个消费者消费的队列。
在重平衡之后就可能会出现下面这种情况

如上图,原本被消费者2消费的Queue4被分配给消费者3,此时消费者3就能消费到消息了,这就是重平衡。
除了新增消费者会导致重平衡之外,消费者数量减少,队列的数量增加或者减少都会触发重平衡。
在了解了重平衡概念之后,接下来分析一下为什么重平衡会导致消息的重复消费。
假设在进行重平衡时,还未重平衡完之前,消费者2此时还是会按照上面第二节提到的消费消息的逻辑来消费Queue4的消息
当消费者2已经重平衡完成了,发现Queue4自己已经不能消费了,那么此时就会把这个Queue4设置为dropped,就是丢弃的意思

但是由于重平衡进行时消费者2仍然在消费Queue4的消息,但是当消费完之后,发现队列被设置成dropped,那么此时被消费者2消费消息的offset就不会被提交,原因如下代码

这段代码前面已经出现过,一旦dropped被设置成true,这个if条件就通不过,消费进度就不会被提交。
成功消费消息了,但是却不提交消费进度,这就非常坑了。。
于是当消费者3开始消费Queue4的消息的时候,他就会问问RocketMQ服务端,我消费者3所在的消费者组对于Queue4这个队列消费到哪了,我接着消费就行了。
此时由于没有提交消费进度,RocketMQ服务端告诉消费者3的消费进度就会比实际的低,这就造成了消息重复消费的情况。
清理长时间消费的消息
在RocketMQ中有这么一个机制,会定时清理长时间正在消费的消息。

如图,假设有5条消息现在正在被消费者处理,这5条消息会被存在一个集合中,并且是按照offset的大小排序,消息1的offset最小,消息5的offset最大。
RocketMQ消费者启动时会开启一个默认15分钟执行一次的定时任务

这个定时任务会去检查正在处理的消息的第一条消息,也就是图中的消息1,一旦发现消息1已经处理了超过15分钟了,那么此时就会将消息1从集合中移除,之后会隔一定时间再次消费消息1。
这也会有坑,虽然消息1从集合中被移除了,但是消息1并没有消失,仍然被消费者继续处理,但是消息1隔一定时间就会再次被消费,就会出现消息1被重复消费的情况。
这就是清理长时间消费的消息导致重复消费的原因。
但此时又会引出一个新的疑问,为什么要移除这个处理超过15分钟的消息呢?
这就又跟前面提到的消费进度提交有关!
前面说过消息被消费完成之后会提交消费进度,提交的消费进度实际会有两种情况:
第一种就是某个线程消费了所有的消息,当把所有的消息都消费完成之后,就会把消息从集合中全部移除,此时提交的消费进度offset就是图中消息5的offset+1
加1的操作是为了保证如果发生重启,那么消费者下次消费的起始位置就是消息5后面的消息,保证消息5不被重复消费
第二种情况就不太一样了
假设现在有两个线程来处理这5条消息,线程1处理前2条,线程2处理后3条,如图

现在线程1出现了长时间处理消息的情况。
此时线程2处理完消息之后,移除后面三条消息,准备提交offset的时候发现集合中还有元素,就是线程1正在处理的前两条消息,此时线程2提交的offset并不是消息5对应的offset,而是消息1的offset,代码如下

这么做的主要原因就是保证消息1和消息2至少被消费一次。
因为一旦提交了消息5对应的offset,如果消费者重启了,下次消费就会接着从消息5的后面开始消费,而对于消息1和消息2来说,并不知道有没有被消费成功,就有可能出现消息丢失的情况。
所以,一旦集合中最前面的消息长时间处理,那么就会导致后面被消费的消息进度无法提交,那么重启之后就会导致大量消息被重复消费。
为了解决这个问题,RocketMQ引入了定时清理的机制,定时清理长时间消费的消息,这样消费进度就可以提交了。
简单的消息去重解决方案
例如:假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存:
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';
要实现消息的幂等,我们可能会采取这样的方案:
select * from t_order where order_no = 'order123'
if(order != null) {
return ;//消息重复,直接返回
}
这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。
并发重复消息
假设这个消费的所有代码加起来需要1秒,有重复的消息在这1秒内(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),那么很可能,上面去重代码里面会发现,数据依然是空的(因为上一条消息还没消费完,还没成功更新订单状态),
那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题(如主键冲突抛出异常、库存被重复扣减而没释放等)
并发去重的解决方案之一
要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把select 改成 select for update语句,把记录进行锁定。
select * from t_order where order_no = 'THIS_ORDER_NO' for update //开启事务
if(order.status != null) {
return ;//消息重复,直接返回
}
但这样消费的逻辑会因为引入了事务包裹而导致整个消息消费可能变长,并发度下降。
当然还有其他更高级的解决方案,例如更新订单状态采取乐观锁,更新失败则消息重新消费之类的。但这需要针对具体业务场景做更复杂和细致的代码开发、库表设计,不在本文讨论的范围。
但无论是select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 一个业务系统里面很大部分的请求处理都是依赖MQ的,如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。本文希望探索出一个通用的消息幂等处理的方法,从而抽象出一定的工具类用以适用各个业务场景。
Exactly Once
在消息中间件里,有一个投递语义的概念,而这个语义里有一个叫”Exactly Once”,即消息肯定会被成功消费,并且只会被消费一次。以下是阿里云里对Exactly Once的解释:
Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执行,并且只被执行一次,那么我们可以认为是Exactly Once。
但这在分布式的场景下想找一个通用的方案几乎是不可能的。不过如果是针对基于数据库事务的消费逻辑,实际上是可行的。
基于关系数据库事务插入消息表
假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态:
update t_order set status = 'SUCCESS' where order_no= 'order123';
要实现Exaclty Once即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做:在这个数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。
开启事务
插入消息表(处理好主键冲突的问题)
更新订单表(原消费逻辑)
提交事务
说明:
这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。
如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。
事实上,阿里云ONS的EXACTLY-ONCE语义的实现上,就是类似这个方案基于数据库的事务特性实现的。更多详情可参考:https://help.aliyun.com/document_detail/102777.html
基于这种方式,的确这是有能力拓展到不同的应用场景,因为他的实现方案与具体业务本身无关——而是依赖一个消息表。
但是这里有它的局限性
消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不支持事务特性的数据源,则这些数据是不可回滚的。
数据库的数据必须是在一个库,跨库无法解决
注:业务上,消息表的设计不应该以消息ID作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。阿里云上的消息去重只是RocketMQ的messageId,在生产者因为某些原因手动重发(例如上游针对一个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同)。
更复杂的业务场景
如上所述,这种方式Exactly Once语义的实现,实际上有很多局限性,这种局限性使得这个方案基本不具备广泛应用的价值。并且由于基于事务,可能导致锁表时间过长等性能问题。
例如我们以一个比较常见的一个订单申请的消息来举例,可能有以下几步(以下统称为步骤X):
检查库存(RPC)
锁库存(RPC)
开启事务,插入订单表(MySQL)
调用某些其他下游服务(RPC)
更新订单状态
commit 事务(MySQL)
这种情况下,我们如果采取消息表+本地事务的实现方式,消息消费过程中很多子过程是不支持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原子性的。怎么说呢,就是说有可能第一条小在经历了第二步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务里被锁定了,这并不能被回滚。当然消息还会再次投递下来,要保证消息能至少消费一遍,换句话说,锁库存的这个RPC接口本身依旧要支持“幂等”。
再者,如果在这个比较耗时的长链条场景下加入事务的包裹,将大大的降低系统的并发。所以通常情况下,我们处理这种场景的消息去重的方法还是会使用一开始说的业务自己实现去重逻辑的方式,如前面加select for update,或者使用乐观锁。
那我们有没有方法抽取出一个公共的解决方案,能兼顾去重、通用、高性能呢?
拆解消息执行过程
其中一个思路是把上面的几步,拆解成几个不同的子消息,例如:
库存系统消费A:检查库存并做锁库存,发送消息B给订单服务
订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费
下游系统消费消息C:处理部分逻辑,发送消息D给订单系统
订单系统消费消息D:更新订单状态
注:上述步骤需要保证本地事务和消息是一个事务的(至少是最终一致性的),这其中涉及到分布式事务消息相关的话题,不在本文论述。
可以看到这样的处理方法会使得每一步的操作都比较原子,而原子则意味着是小事务,小事务则意味着使用消息表+事务的方案显得可行。
然而,这太复杂了!这把一个本来连续的代码逻辑割裂成多个系统多次消息交互!那还不如业务代码层面上加锁实现呢。
更通用的解决方案
上面消息表+本地事务的方案之所以有其局限性和并发的短板,究其根本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个消息消费的环节。
如果我们能不依赖事务而实现消息的去重,那么方案就能推广到更复杂的场景例如:RPC、跨库等。
例如,我们依旧使用消息表,但是不依赖事务,而是针对消息表增加消费状态,是否可以解决问题呢?
基于消息幂等表的非事务方案

以上是去事务化后的消息幂等方案的流程,可以看到,此方案是无事务的,而是针对消息表本身做了状态的区分:消费中、消费完成。只有消费完成的消息才会被幂等处理掉。而对于已有消费中的消息,后面重复的消息会触发延迟消费(在RocketMQ的场景下即发送到RETRY TOPIC),之所以触发延迟消费是为了控制并发场景下,第二条消息在第一条消息没完成的过程中,去控制消息不丢(如果直接幂等,那么会丢失消息(同一个消息id的话),因为上一条消息如果没有消费完成的时候,第二条消息你已经告诉broker成功了,那么第一条消息这时候失败broker也不会重新投递了)
上面的流程不再细说,后文有github源码的地址,读者可以参考源码的实现,这里我们回头看看我们一开始想解决的问题是否解决了:
消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
支持上游业务生产者重发的业务重复的消息幂等问题。
关于第一个问题已经很明显已经解决了,在此就不讨论了。
关于第二个问题是如何解决的?主要是依靠插入消息表的这个动作做控制的,假设我们用MySQL作为消息表的存储媒介(设置消息的唯一ID为主键),那么插入的动作只有一条消息会成功,后面的消息插入会由于主键冲突而失败,走向延迟消费的分支,然后后面延迟消费的时候就会变成上面第一个场景的问题。
关于第三个问题,只要我们设计去重的消息键让其支持业务的主键(例如订单号、请求流水号等),而不仅仅是messageId即可。所以也不是问题。
此方案是否有消息丢失的风险?
如果细心的读者可能会发现这里实际上是有逻辑漏洞的,问题出在上面聊到的个三问题中的第2个问题(并发场景),在并发场景下我们依赖于消息状态是做并发控制使得第2条消息重复的消息会不断延迟消费(重试)。但如果这时候第1条消息也由于一些异常原因(例如机器重启了、外部异常导致消费失败)没有成功消费成功呢?也就是说这时候延迟消费实际上每次下来看到的都是消费中的状态,最后消费就会被视为消费失败而被投递到死信Topic中(RocketMQ默认可以重复消费16次)。
有这种顾虑是正确的!对于此,我们解决的方法是,插入的消息表必须要带一个最长消费过期时间,例如10分钟,意思是如果一个消息处于消费中超过10分钟,就需要从消息表中删除(需要程序自行实现)。所以最后这个消息的流程会是这样的:
更灵活的消息表存储媒介
我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。使用Redis有两个好处:
性能上损耗更低
上面我们讲到的超时时间可以直接利用Redis本身的ttl实现
当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。
源码:RocketMQDedupListener
以上方案针对RocketMQ的Java实现已经开源放到Github中,具体的使用文档可以参考https://github.com/Jaskey/RocketMQDedupListener ,
以下仅贴一个Readme中利用Redis去重的使用样例,用以意业务中如果使用此工具加入消息去重幂等的是多么简单:
//利用Redis做幂等表
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
consumer.subscribe("TEST-TOPIC", "*");
String appName = consumer.getConsumerGroup();// 大部分情况下可直接使用consumer group名
StringRedisTemplate stringRedisTemplate = null;// 这里省略获取StringRedisTemplate的过程
DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
DedupConcurrentListener messageListener = new SampleListener(dedupConfig);
consumer.registerMessageListener(messageListener);
consumer.start();
以上代码大部分是原始RocketMQ的必须代码,唯一需要修改的仅仅是创建一个DedupConcurrentListener示例,在这个示例中指明你的消费逻辑和去重的业务键(默认是messageId)。
更多使用详情请参考Github上的说明。
这种实现是否一劳永逸?
实现到这里,似乎方案挺完美的,所有的消息都能快速的接入去重,且与具体业务实现也完全解耦。那么这样是否就完美的完成去重的所有任务呢?
很可惜,其实不是的。原因很简单:因为要保证消息至少被成功消费一遍,那么消息就有机会消费到一半的时候失败触发消息重试的可能。还是以上面的订单流程X:
检查库存(RPC)
锁库存(RPC)
开启事务,插入订单表(MySQL)
调用某些其他下游服务(RPC)
更新订单状态
commit 事务(MySQL)
当消息消费到步骤3的时候,我们假设MySQL异常导致失败了,触发消息重试。因为在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进入消费代码,那么步骤1和步骤2就会重新再执行一遍。如果步骤2本身不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。
本实现方式的价值?
那么既然这个并不能完整的完成消息幂等,还有什么价值呢?价值可就大了!虽然这不是解决消息幂等的银弹(事实上,软件工程领域里基本没有银弹),但是他能以便捷的手段解决:
1.各种由于Broker、负载均衡等原因导致的消息重投递的重复问题
2.各种上游生产者导致的业务级别消息重复问题
3.重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑
一些其他的消息去重的建议
也就是说,使用这个方法能保证正常的消费逻辑场景下(无异常,无异常退出),消息的幂等工作全部都能解决,无论是业务重复,还是rocketmq特性带来的重复。
事实上,这已经能解决99%的消息重复问题了,毕竟异常的场景肯定是少数的。那么如果希望异常场景下也能处理好幂等的问题,可以做以下工作降低问题率:
消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。
消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。
一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等)
在#3做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好#1的回滚,使得下次重试消费成功。

最后
总得来说,RocketMQ中还是存在很多种导致消息重读消费的情况,并且官方也说了,只是在大多数情况下消息不会重复

所以如果你的业务场景中需要保证消息不能重复消费,那么就需要根据业务场景合理的设计幂等技术方案。