RocketMQ消息丢失解决方案

消息丢失的场景分析

消息丢失的场景分析

发送消息零丢失方案:RocketMQ事务消息

 

基于消息key来定位消息是否丢失

那么如果消息真的丢失了,我们是不是要排查?此时是不是要从MQ里查一下,这个消息是否丢失了? 那么怎么从MQ里查消息是否丢失呢?
可以基于消息key来实现,比如通过下面的方式设置一个消息的key为订单id:message.setKeys(orderId),这样这个消息就具备一个key了。
接着这个消息到broker上,会基于key构建hash索引,这个hash索引就存放在IndexFile索引文件里。
然后后续我们可以通过MQ提供的命令去根据key查询这个消息,类似下面这样:mqadmin queryMsgByKey -n 127.0.0.1:9876 -t SCANRECORD -k orderId
 

消息零丢失方案流程

消息零丢失方案流程

事务消息机制的底层实现原理

事务消息机制的底层实现原理

消息丢失的其他解决方案分析

MQ集群彻底故障下解决方案

在消息零丢失方案中还有一个问题,那就是MQ集群彻底故障了,此时就是不可用了,那么怎么办呢?
其实对于一些金融级的系统,或者跟钱相关的支付系统,或者是广告系统,类似这样的系统,都必须有超高级别的高可用保障机制。
一般假设MQ集群彻底崩溃了,你生产者就应该把消息写入到本地磁盘文件里去进行持久化,或者是写入数据库里去暂存起来,等待MQ恢复之后,然后再把持久化的消息继续投递到MQ里去。
 

重试机制原理

重试机制的基于MQ是消息最终写入磁盘才会返回发送成功的,因此在收不到返回结果或者返回异常的情况下不断重试,
理论上似乎在一些短暂网络异常的场景下,我们是可以通过不停的重试去保证消息到达MQ的,因为如果短时间网络异常了消息一直没法发送,我们只要不停的重试,网络一旦恢复了,消息就可以发送到MQ了。
如果要是反复重试多次发现一直没法把消息投递到MQ,此时我们就可以直接让订单系统回滚之前的流程,比如发起退款流程,判定本次订单支付交易失败了。
看起来这个简单的同步发送消息 + 反复重试的方案,也可以做到保证消息一定可以投递到MQ中,
确实如此,而且在基于Kafka作为消息中间件的消息零丢失方案中,对于发送消息这块,因为Kafka本身不具备RocketMQ这种事务消息的高级功能,所以一般我们都是对Kafka会采用同步发消息 + 反复重试多次的方案,去保证消息成功投递到Kafka的。
notion image
 

重试机制的缺点

如果是在类似我们目前这个较为复杂的订单业务场景中,仅仅采用同步发消息 + 反复重试多次的方案去确保消息绝对投递到MQ中,似乎还是不够的,接下来我们分析一下在复杂业务场景下,这里有什么问题。

先执行订单本地事务,接着再发送消息到MQ

notion image
上面那段伪代码看着似乎天衣无缝,先执行订单本地事务,接着发送消息到MQ,如果订单本地事务执行失败了,则不会继续发送消息到MQ了;
如果订单事务执行成功了,发送MQ失败了,自动进行几次重试,重试如果一直失败,就回滚订单事务。
但是这里有一个问题,假设你刚执行完成了订单本地事务了,结果还没等到你发送消息到MQ,结果你的订单系统突然崩溃了!
这就导致你的订单状态可能已经修改为了“已完成”,但是消息却没发送到MQ去!这就是这个方案最大的隐患。
notion image
如果出现这种场景,那你的多次重试发送MQ之类的代码根本没机会执行!而且订单本地事务还已经执行成功了,你的消息没发送出去,红包系统没机会派发红包,必然导致用户支付成功了,结果看不到自己的红包!

把订单本地事务和重试发送MQ消息放到一个事务

如果把订单本地事务代码和发送MQ消息的代码放到一个事务代码中呢? 我们看下面的伪代码示例:
notion image
上面这个代码看起来似乎解决了我们的问题,就是在这个方法上加入事务,在这个事务方法中,我们哪怕执行了orderService.finishOrderPay(),但是其实也仅仅执行了一些增删改SQL语句,还没提交订单本地事务。
如果发送MQ消息失败了,而且多次重试还不奏效,则我们抛出异常会自动回滚订单本地事务;如果你刚执行了orderService.finishOrderPay(),结果订单系统直接崩溃了,此时订单本地事务会回滚,因为根本没提交过。
但是对于这个方案,还是非常的不理想,原因就出在那个MQ多次重试的地方
假设用户支付成功了,然后支付系统回调通知你的订单系统说,有一笔订单已经支付成功了,这个时候你的订单系统卡在多次重试MQ的代码那里,可能耗时了好几秒种,此时回调通知你的系统早就等不及可能都超时异常了
而且你把重试MQ的代码放在这个逻辑里,可能会导致订单系统的这个接口性能很差
notion image
除了我们上面说的那个问题之外,我可能还不得不给很多寄希望于订单事务和发送MQ消息包裹在一个事务代码中的朋友,泼一盆冷水,大家觉得我们一定可以依靠本地事务回滚吗?这还真的未必,我们看下面的代码:
notion image
虽然在方法上加了事务注解,但是代码里还有更新Redis缓存和Elasticsearch数据的代码逻辑,如果你要是已经完成了订单数据库更新、Redis缓存更新、ES数据更新了,结果没法送MQ呢订单系统崩溃了。
虽然订单数据库的操作会回滚,但是Redis、Elasticsearch中的数据更新会自动回滚吗?不会的,因为他们根本没法自动回滚,此时数据还是会不一致的。所以说,完全寄希望于本地事务自动回滚是不现实的。
 

案例场景代码实现

RocketMQ事务消息:案例场景代码实现

消息事务局限性

消息事务仅保证生产者投递消息到MQ的这一过程是最终一致的,但是MQ自己丢失数据这种情况无法避免

MQ自己丢失消息的情况

假设咱们现在订单系统已经通过事务消息的机制,通过half消息 + commit的方式,把消息在MQ里提交了 也就是说,现在对于MQ而言,那条消息已经进入到他的存储层了,可以被红包系统看到了 我们看下图
notion image
这条消息在commit之后,会从half topic里进入OrderPaySuccessTopic中,但是此时仅仅是消息进入了这个你预定的Topic而已,仅仅是可以被红包系统看到而已,此时可能你的红包系统还没来得及去获取这条消息。
然后恰巧在此时,你的这条消息又仅仅停留在os cache中,还没进入到ConsumeQueue磁盘文件里去,然后此时这台机器突然宕机了,os cache中的数据全部丢失,此时必然会导致你的消息丢失,红包系统再没机会读到这条消息了。
notion image
那我们接着看,就算我们很走运,比如你的消息已经进入了OrderPaySuccessTopic的ConsumeQueue磁盘文件了,不是停留在os cache里了,此时消息就一定不会丢失了吗? 这也未必即使消息已经进入磁盘文件了,但是这个时候红包系统还没来得及消费这条消息,然后此时这台机器的磁盘突然就坏了,就会一样导致 消息丢失,而且可能消息再也找不回来了,同样会丢失数据。 我们看下图的示意。
notion image
所以看到这里,我们需要明确一个前提,我们无论是通过比较简单的同步发送消息 + 反复多次重试的方案,还是事务消息的方案,哪怕我们确保消息已经写入MQ成功了,此时也未必消息就不会丢失了。 因为即使你写入MQ成功了,这条消息也大概率是仅仅停留在MQ机器的os cache中,一旦机器宕机内存里的数据都会丢失,或者哪怕消息已经进入了MQ机器的磁盘文件里,但是磁盘一旦坏了,消息也会丢失。

异步刷盘 vs 同步刷盘

到底怎么去确保消息写入MQ之后,MQ自己不要随便丢失数据呢?解决这个问题的第一个关键点,就是将异步刷盘调整为同步刷盘。 所谓的异步刷盘,就是之前我们一直说的那种模式。 也就是说,你的消息即使成功写入了MQ,他也就在机器的os cache中,没有进入磁盘里,要过一会儿等操作系统自己把os cache里的数据实际刷入磁盘文件中去
notion image
所以在异步刷盘的模式下,我们的写入消息的吞吐量肯定是极高的,毕竟消息只要进入os cache这个内存就可以了,写消息的性能就是写内存的性能,那每秒钟可以写入的消息数量肯定更多了,但是这个情况下,可能就会导致数据的丢失。
所以如果一定要确保数据零丢失的话,可以调整MQ的刷盘策略,我们需要调整broker的配置文件,将其中的flushDiskType配置设置为:SYNC_FLUSH,默认他的值是ASYNC_FLUSH,即默认是异步刷盘的。
如果调整为同步刷盘之后,我们写入MQ的每条消息,只要MQ告诉我们写入成功了,那么他们就是已经进入了磁盘文件了! 比如我们发送half消息的时候,只要MQ返回响应是half消息发送成功了,那么就说明消息已经进入磁盘文件了,不会停留在os cache 里。 我们看下图的示意,如果我们使用同步刷盘的策略,那么可以确保写入MQ的消息一定是已经进入磁盘文件了
notion image

通过主从架构模式避免磁盘故障导致的数据丢失

如何避免磁盘故障导致的数据丢失?其实道理也很简单,我们必须要对Broker使用主从架构的模式
也就是说,必须让一个Master Broker有一个Slave Broker去同步他的数据,而且你一条消息写入成功,必须是让Slave Broker也写入成功,保证数据有多个副本的冗余。
notion image
这样一来,你一条消息但凡写入成功了,此时主从两个Broker上都有这条数据了,此时如果你的Master Broker的磁盘坏了,但是Slave Broker上至少还是有数据的,数据是不会因为磁盘故障而丢失的。
对于主从同步的架构,我们本来就是讲解了基于DLedger技术和Raft协议的主从同步架构,你如果采用了这套架构,对于你所有的消息写入,只要他写入成功,那就一定会通过Raft协议同步给其他的Broker机器
 

Consumer消息零丢失方案

现在已经知道了如何确保订单系统发送出去的消息一定会到达MQ中,而且也能确保了如果消息到达了MQ如何确保一定不会丢失
如下这个场景
notion image
只要我们能做到这一点,那么我们必然可以保证红包系统可以获取到一条订单支付成功的消息,然后一定可以去尝试把红包派发出去。

消费者丢失消息场景

但是现在的问题在于,即使红包系统拿到了这条消息,就一定可以成功的派发红包吗?
答案是未必
我们之前也给大家分析过这个问题,如果红包系统已经拿到了这条消息,但是消息目前还在他的内存里,还没执行派发红包的逻辑,此时他就直接提交了这条消息的offset到broker去说自己已经处理过了,我们看下图。
如果红包系统已经拿到了这条消息,但是消息目前还在他的内存里,还没执行派发红包的逻辑,此时他就直接提交了这条消息的offset到broker去说自己已经处理过了,我们看下图。
notion image
接着红包系统在上图这个状态的时候就直接崩溃了,内存里的消息就没了,红包也没派发出去,结果Broker已经收到他提交的消息offset了,还以为他已经处理完这条消息了。
等红包系统重启的时候,就不会再次消费这条消息了。
首先要明确一点,那就是即使你保证发送消息到MQ的时候绝对不会丢失,而且MQ收到消息之后一定不会把消息搞丢失,但是你的消费者在获取到消息之后还是可能会搞丢。

Kafka消费者的数据丢失问题

因为Kafka的消费者采用的消费的方式跟RocketMQ是有些不一样的,如果按照Kafka的消费模式,就是会产生数据丢失的风险。
也就是说Kafka消费者可能会出现上图说的,拿到一批消息,还没来得及处理呢,结果就提交offset到broker去了,完了消费者系统就挂掉了,这批消息就再也没机会处理了,因为他重启之后已经不会再次获取提交过offset的消息了。

RocketMQ消费者的与众不同的地方

但是对于RocketMQ的消费者而言,他是有一些与众不同的地方的,至少跟Kafka的消费者是有较大不同的
我们再来回顾一下之前我们写过的RocketMQ消费者的代码,如下所示。
notion image
RocketMQ的消费者中会注册一个监听器,就是上面小块代码中的MessageListenerConcurrently这个东西,当你的消费者获取到一批消息之后,就会回调你的这个监听器函数,让你来处理这一批消息。
然后当你处理完毕之后,你才会返ConsumeConcurrentlyStatus.CONSUME_SUCCESS作为消费成功的示意,告诉RocketMQ,这批消息我已经处理完毕了。
所以对于RocketMQ而言,其实只要你的红包系统是在这个监听器的函数中先处理一批消息,基于这批消息都派发完了红包,然后返回了那个消费成功的状态,接着才会去提交这批消息的offset到broker去。
所以在这个情况下,如果你对一批消息都处理完毕了,然后再提交消息的offset给broker,接着红包系统崩溃了,此时是不会丢失消息的
notion image
那么如果是红包系统获取到一批消息之后,还没处理完,也就没返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS这个状态呢,自然没提交这批消息的offset给broker呢,此时红包系统突然挂了,会怎么样?
我们看下图示意的这个场景。
notion image
其实在这种情况下,你对一批消息都没提交他的offset给broker的话,broker不会认为你已经处理完了这批消息,此时你突然红包系统的一台机器宕机了,他其实会感知到你的红包系统的一台机器作为一个Consumer挂了。
接着他会把你没处理完的那批消息交给红包系统的其他机器去进行处理,所以在这种情况下,消息也绝对是不会丢失的 我们看下图的示意
notion image

需要警惕的地方:不能异步消费消息

所以大家也看到了,在默认的Consumer的消费模式之下,必须是你处理完一批消息了,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS这个状态标识消息都处理结束了,去提交offset到broker去。
在这种情况下,正常来说是不会丢失消息的,即使你一个Consumer宕机了,他会把你没处理完的消息交给其他Consumer去处理。
但是这里我们要警惕一点,就是我们不能在代码中对消息进行异步的处理,如下错误的示范,我们开启了一个子线程去处理这批消息,然后启动线程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态了
notion image
如果要是用这种方式来处理消息的话,那可能就会出现你开启的子线程还没处理完消息呢,你已经返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态了,就可能提交这批消息的offset给broker了,认为已经处理结束了。
然后此时你红包系统突然宕机,必然会导致你的消息丢失了!因此在RocketMQ的场景下,我们如果要保证消费数据的时候别丢消息,你就老老实实的在回调函数里处理消息,处理完了你再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态表明你处理完毕了!

基于 RocketMQ 设计的全链路消息零丢失方案总结

发送消息到MQ的零丢失

方案一(同步发送消息 + 反复多次重试) 方案二(事务消息机制),两者都有保证消息发送零丢失的效果,但是经过分析,事务消息方案整体会更好一些

MQ收到消息之后的零丢失

开启同步刷盘策略 + 主从架构同步机制,只要让一个Broker收到消息之后同步写入磁盘,同时同步复制给其他Broker,然后再返回响应给生产者说写入成功,此时就可以保证MQ自己不会弄丢消息

消费消息的零丢失

采用RocketMQ的消费者天然就可以保证你处理完消息之后,才会提交消息的offset到broker去,只要记住别采用多线程异步处理消息的方式即可
如果大家想要保证在一个消息基于MQ流转的时候绝对不会无缘无故的丢失,那么可以采取上述一整套的方案,

消息零丢失方案的优势与劣势

如果在系统中落地一套消息零丢失方案,不管是哪个系统,不管是哪个场景,都可以确保消息流转的过程中不会丢失,看起来似乎很有吸引力,这也是消息零丢失方案的优势所在,可以让系统的数据都是正确的,不会有丢失的。
 
但是他的劣势在哪里呢?显而易见的是,你用了这套方案之后,会让你整个从头到尾的消息流转链路的性能大幅度下降,让你的MQ的吞吐量大幅度的下降比如本身你的系统和MQ配合起来,每秒可以处理几万条消息的,结果当你落地消息零丢失方案之后,可能每秒只能处理几千条消息了。

消息零丢失方案会导致吞吐量大幅度下降的原因

我们先来看这个发送消息到MQ的环节,如果我们仅仅只是简单的把消息发送到MQ,那么不过就是一次普通的网络请求罢了,我们就是发送请求到MQ然后接收响应回来,这个性能自然很高,吞吐量也是很高的
我们看下面的图示
notion image
但是如果你改成了基于事务消息的机制之后呢?
那么此时这里的实现原理图如下所示,这里涉及到half消息、commit or rollback、写入内部topic、回调机制,等诸多复杂的环节不说别的,光是你成功发送一条消息,都至少要half + commit两次请求。
notion image
所以当你一旦上了如此复杂的方案之后,势必会导致你的发送消息的性能大幅度下降,同时发送消息到MQ的吞吐量大幅度下降。
接着我们再看MQ收到消息之后的行为,在MQ收到消息之后,一样会让性能大幅度下降。
首先MQ的一台broker机器收到了消息之后,必然直接把消息刷入磁盘里,这个性能就远远低于你写入os cache了,完全不是一个数量级的,比如你写入os cache相当于是内存,可能仅仅需要0.1ms,但是你写入磁盘文件可能就需要10ms!如下图。
notion image
接着你的这台broker机器还必须直接把消息复制给其他的broker,完成多副本的冗余,这个过程涉及到两台broker机器之间的网络通信,另外一台broker机器写数据到自己本地磁盘去,同样会比较慢,如下图。
notion image
在broker完成了上述两个步骤之后,接着才能返回响应告诉你说这次消息写入已经成功了,大家试想一下,写入一条消息需要强制同步刷磁盘,而且还需要同步复制消息给其他的broker机器这两个步骤一加入,可能原本10ms的事儿就会变成100ms了!所以这里也势必会导致性能大幅度下降,MQ的broker的吞吐量会大幅度下降。
最后看你的消费者,当你的消费者拿到消息之后,比如他直接开启一个子线程去处理这批消息,然后他就直接返回CONSUME_SUCCESS状态了,接着他就可以去处理下一批消息了!如果这样的话,你消费消息的速度会很快,吞吐量会很高!
但是如果为了保证数据不丢失,你必须是处理完一批消息再返回CONSUME_SUCCESS状态,那么此时你消费者处理消息的速度会降低,吞吐量 自然也会下降了!
notion image

消息零丢失方案适用场景

所以简单一句话,如果你一定要上消息零丢失方案,那么必然导致从头到尾的性能下降以及MQ的吞吐量下降。
所以一般大家不要轻易在随便一个业务里就上如此重的一套方案,要明白这背后的成本!那么消息零丢失方案到底适用于什么场景呢?一般我们建议,对于跟金钱、交易以及核心数据相关的系统和核心链路,可以上这套消息零丢失方案。
比如支付系统,他是绝对不能丢失任何一条消息的,你的性能可以低一些,但是不能有任何一笔支付记录丢失。
比如订单系统,公司一般是不能轻易丢失一个订单的,毕竟一个订单就对应一笔交易,如果订单丢失,用户还支付成功了,你轻则要给
用户赔付损失,重则弄不好要经受官司,特别是一些B2B领域的电商,一笔线上交易可能多大几万几十万。
所以对这种非常非常核心的场景和少数几条核心链路,才会建议大家上这套复杂的消息0丢失方案。而对于其他大部分没那么核心的场景和系统,其实即使丢失一些数据,也不会导致太大的问题,此时可以不采取这些方案,或者说你可以在其他的场景里做一些简化。
比如你可以把事务消息方案退化成“同步发送消息 + 反复重试几次”的方案,如果发送消息失败,就重试几次,但是大部分时候可能不需要重试,那么也不会轻易的丢失消息的!最多在这个方案里,可能会出现一些数据不一致的问题。
或者你把broker的刷盘策略改为异步刷盘,但是上一套主从架构,即使一台机器挂了,os cache里的数据丢失了,但是其他机器上还有数据。但是大部分时候broker不会随便宕机,那么异步刷盘策略下性能还是很高的。
所以说,对于非核心的链路,非金钱交易的链路,大家可以适当简化这套方案,用一些方法避免数据轻易丢失,但是同时性能整体很高,即使有极个别的数据丢失,对非核心的场景,也不会有太大的影响。