RocketMQ的Broker的主从架构原理

Broker主从架构以及多副本策略

Broker是有Master和Slave两种角色的
notion image
Master Broker收到消息之后会同步给Slave Broker,这样Slave Broker上就能有一模一样的一份副本数据!
这样同一条消息在RocketMQ整个集群里不就有两个副本了,一个在Master Broker里,一个在Slave Broker里!
这个时候如果任何一个Master Broker出现故障,还有一个Slave Broker上有一份数据副本,可以保证数据不丢失,还能继续对外提供服务,保证了MQ的可靠性和高可用性
为了保证MQ的数据不丢失而且具备一定的高可用性,所以一般都是得将Broker部署成Master-Slave模式的,也就是一个Master Broker对应一个Slave Broker
然后Master需要在接收到消息之后,将数据同步给Slave,这样一旦Master Broker挂了,还有Slave上有一份数据。
同时 Slave Broker也会向所有的NameServer进行注册Slave Broker也会向所有的NameServer每30s发送心跳 !
 

普通主从同步

主从同步的方式

Pull模式拉取消息

 
RocketMQ的Master-Slave模式采取的是Slave Broker不停的发送请求到Master Broker去拉取消息
RocketMQ自身的Master-Slave模式采取的是Pull模式拉取消息
 

消费者获取数据的方式

既然Master Broker主要是接收系统的消息写入,然后会同步给Slave Broker,那么其实本质上Slave Broker也应该有一份一样的数据。
所以这里提出一个疑问,作为消费者的系统在获取消息的时候,是从Master Broker获取的?还是从Slave Broker获取的?

主从都有可能

其实都不是。答案是:有可能从Master Broker获取消息,也有可能从Slave Broker获取消息
作为消费者的系统在获取消息的时候会先发送请求到Master Broker上去,请求获取一批消息,此时Master Broker是会返回一批消息 给消费者系统的
然后Master Broker在返回消息给消费者系统的时候,会根据当时Master Broker的负载情况和Slave Broker的同步情况,向消费者系 统建议下一次拉取消息的时候是从Master Broker拉取还是从Slave Broker拉取。主从延迟过高只能从Master拉取, 主从一致的情况,选负载较小的拉取
notion image
举个例子,要是这个时候Master Broker负载很重,本身要抗10万写并发了,你还要从他这里拉取消息,给他加重负担,那肯定是不合适的。
所以此时Master Broker就会建议你从Slave Broker去拉取消息。
或者举另外一个例子,本身这个时候Master Broker上都已经写入了100万条数据了,结果Slave Broke不知道啥原因,同步的特别慢,才同步了96万条数据,落后了整整4万条消息的同步,这个时候你作为消费者系统可能都获取到96万条数据了,那么下次还是只能从Master Broker去拉取消息
因为Slave Broker同步太慢了,导致你没法从他那里获取更新的消息了。
在写入消息的时候,通常来说肯定是选择Master Broker去写入的 但是在拉取消息的时候,有可能从Master Broker获取,也可能从Slave Broker去获取,一切都根据当时的情况来定

如果Slave Broke挂掉了有什么影响

如果Slave Broker挂掉了,会对整个系统有影响吗?答案是:有一点影响,但是影响不太大
因为消息写入全部是发送到Master Broker的,然后消息获取也可以走Master Broker,只不过有一些消息获取可能是从Slave Broker去走的
所以如果Slave Broker挂了,那么此时无论消息写入还是消息拉取,还是可以继续从Master Broker去走,对整体运行不影响。
只不过少了Slave Broker,会导致所有读写压力都集中在Master Broker上。

如果Master Broker挂掉了该怎么办

现在假设出现了一个故障,Master Broker突然挂了,这样会怎么样?
这个时候就对消息的写入和获取都有一定的影响了。但是其实本质上而言,Slave Broker也是跟Master Broker一样有一份数据在的,只不过Slave Broker上的数据可能有部分没来得及从Master Broker同步。

主从切换

4.5之前无法自动切换,需要手工运维

在RocketMQ 4.5版本之前,都是用Slave Broker同步数据,尽量保证数据不丢失,但是一旦Master故障了,Slave是没法自动切换成Master的
所以在这种情况下,如果Master Broker宕机了,这时就得手动做一些运维操作,把Slave Broker重新修改一些配置,重启机器给调整为Master Broker,这是有点麻烦的,而且会导致中间一段时间不可用。
notion image
所以这种Master-Slave模式不是彻底的高可用模式,他没法实现自动把Slave切换为Master

4.5之后基于Dledger实现RocketMQ高可用自动切换

在RocketMQ 4.5之后,这种情况得到了改变,因为RocketMQ支持了一种新的机制,叫做Dledger是基于Raft协议实现的一个机制
把Dledger融入RocketMQ之后,就可以让一个Master Broker对应多个Slave Broker,也就是说一份数据可以有多份副本,比如一个Master Broker对应两个Slave Broker。
然后依然会在Master和Slave之间进行数据同步
notion image
此时一旦Master Broker宕机了,就可以在多个副本,也就是多个Slave中,通过Dledger技术和Raft协议算法进行leader选举,直接将一个Slave Broker选举为新的Master Broker,然后这个新的Master Broker就可以对外提供服务了。
整个过程也许只要10秒或者几十秒的时间就可以完成,这样的话,就可以实现Master Broker挂掉之后,自动从多个Slave Broker中选举出来一个新的Master Broker,继续对外服务,一切都是自动的。
notion image
 

基于DLedger技术的Broker主从同步原理

producer写入消息到broker之后,broker会将消息写入本地CommitLog磁盘文件里去,然后还有一些ConsumeQueue会存储Topic下各个MessageQueue的消息的物理位置。
notion image
如果要让Broker实现高可用,那么必须有一个Broker组,里面有一个是Leader Broker可以写入数据,然后让Leader Broker接收到数据之后,直接把数据同步给其他的Follower Broker
我们看下面的图
notion image
这样的话,一条数据就会在三个Broker上有三份副本,此时如果Leader Broker宕机,那么就直接让其他的Follower Broker自动切换为新的Leader Broker,继续接受客户端的数据写入就可以了。

基于DLedger技术管理CommitLog

 
DLedger技术实际上首先他自己就有一个CommitLog机制,你把数据交给他,他会写入CommitLog磁盘文件里去,这是他能干的第一件事情。
如果基于DLedger技术来实现Broker高可用架构,实际上就是用DLedger先替换掉原来Broker自己管理的CommitLog,由DLedger来管理CommitLog
notion image
所以首先第一步大家要知道的是,我们需要使用DLedger来管理CommitLog,然后Broker还是可以基于DLedger管理的CommitLog去构建出来机器上的各个ConsumeQueue磁盘文件。

基于Raft协议完成Leader选举

既然我们现在知道首先基于DLedger替换各个Broker上的CommitLog管理组件了,那么就是每个Broker上都有一个DLedger组件了
接着我们思考一下,如果我们配置了一组Broker,比如有3台机器,DLedger是如何从3台机器里选举出来一个Leader的?
 
实际上DLedger是基于Raft协议来进行Leader Broker选举的,那么Raft协议中是如何进行多台机器的Leader选举的呢?
这需要发起一轮一轮的投票,通过三台机器互相投票选出来一个人作为Leader。

第一轮Leader选举

简单来说,三台Broker机器启动的时候,他们都会投票自己作为Leader,然后把这个投票发送给其他Broker。
我们举一个例子,Broker01是投票给自己的,Broker02是投票给自己的,Broker03是投票给自己的,他们都把自己的投票发送给了别人。
此时在第一轮选举中,Broker01会收到别人的投票,他发现自己是投票给自己,但是Broker02投票给Broker02自己,Broker03投票给Broker03自己,似乎每个人都很自私,都在投票给自己,所以第一轮选举是失败的。

第二轮Leader选举

因为大家都投票给自己,怎么选举出来一个Leader呢?接着每个人会进入一个随机时间的休眠,比如说Broker01休眠3秒,Broker02休眠5秒,Broker03休眠4秒。
此时Broker01必然是先苏醒过来的,他苏醒过来之后,直接会继续尝试投票给自己,并且发送自己的选票给别人。
接着Broker03休眠4秒后苏醒过来,他发现Broker01已经发送来了一个选票是投给Broker01自己的,此时他自己因为没投票,所以会尊重别人的选择,就直接把票投给Broker01了,同时把自己的投票发送给别人。
接着Broker02苏醒了,他收到了Broker01投票给Broker01自己,收到了Broker03也投票给了Broker01,那么他此时自己是没投票的,直接就会尊重别人的选择,直接就投票给Broker01,并且把自己的投票发送给别人。
此时所有人都会收到三张投票,都是投给Broker01的,那么Broker01就会当选为Leader。
其实只要有(3台机器 / 2) + 1个人投票给某个人,就会选举他当Leader,这个(机器数量 / 2) + 1就是大多数的意思
这就是Raft协议中选举leader算法的简单描述,简单来说,他确保有人可以成为Leader的核心机制就是一轮选举不出来Leader的话,就让大家随机休眠一下,先苏醒过来的人会投票给自己,其他人苏醒过后发现自己收到选票了,就会直接投票给那个人。
依靠这个随机休眠的机制,基本上几轮投票过后,一般都是可以快速选举出来一个Leader。
因此我们看下图,在三台Broker机器刚刚启动的时候,就是靠这个DLedger基于Raft协议实现的leader选举机制,互相投票选举出来一个Leader,其他人就是Follower,然后只有Leader可以接收数据写入,Follower只能接收Leader同步过来的数据。

基于Raft协议进行多副本同步

接着我们来说一下,Leader Broker收到消息之后,是如何基于DLedger把数据同步给其他Broker的。
DLedger在进行同步的时候是采用Raft协议进行多副本同步的,我们接下来聊一下Raft协议中的多副本同步机制。

两阶段提交

简单来说,数据同步会分为两个阶段,一个是uncommitted阶段,一个是commited阶段
首先Leader Broker上的DLedger收到一条数据之后,会标记为uncommitted状态,然后他会通过自己的DLedgerServer组件把这个uncommitted数据发送给Follower Broker的DLedgerServer。 我们看下面的图,就显示了这个过程。
notion image
接着Follower BrokerDLedgerServer收到uncommitted消息之后,必须返回一个ack给Leader BrokerDLedgerServer如果Leader Broker收到超过半数的Follower Broker返回ack之后,就会将消息标记为committed状态。
然后Leader Broker上的DLedgerServer就会发送commited消息给Follower Broker机器的DLedgerServer,让他们也把消息标记为comitted状态。
这个就是基于Raft协议实现的两阶段完成的数据同步机制。
 

日志一致性如何保证

 
假设 现在是1leader,4follower. 按照raft协议需要5/2+1节点ack,就认为消息投递成功,也就是3个follower需要ack就认为成功。
那么极端情况下,比如消息1被f1,f2,f3 ack,消息2被f2,f3,f4 ack,消息3被f1,f3,f4 ack,消息2被f1,f2,f4 ack,这样每个follower都没有完整的数据,这时候leader挂了,是不是数据就有部分丢失?
当主节点给从节点发送rpc请求的时候,从节点发现自己的日志没有leader的全会把leader发送过来的这部分数据进行补齐不会存在丢失的问题
具体过程:
DLedger 的解决思路是,DLedger 会按照日志序号向从节点源源不断的转发日志,从节点接收后将这些待追加的数据放入一个待写队列中。
关键中的关键:从节点并不是从挂起队列中处理一个一个的追加请求,而是首先查阅从节点当前已追加的最大日志序号,用 ledgerEndIndex 表示,然后尝试追加(ledgerEndIndex + 1)的日志,用该序号从代写队列中查找,如果该队列不为空,并且没有 (ledgerEndIndex + 1)的日志条目,说明从节点未接收到这条日志,发生了数据缺失。
然后从节点在响应主节点 append 的请求时会告知数据不一致,然后主节点的日志转发线程其状态会变更为COMPARE,将向该从节点发送COMPARE命令,用来比较主从节点的数据差异,根据比较的差异重新从主节点同步数据或删除从节点上多余的数据,最终达到一致。
于此同时,主节点也会对PUSH超时推送的消息发起重推,尽最大可能帮助从节点及时更新到主节点的数据。

Leader Broker 崩溃处理

如果Leader Broker挂了,此时剩下的两个Follower Broker就会重新发起选举,他们会基于DLedger还是采用Raft协议的算法,去选举出来一个新的Leader Broker继续对外提供服务,而且会对没有完成的数据同步进行一些恢复性的操作,保证数据不会丢失。
我们看下面的图,就是示意了Leader Broker挂了之后,Follower Broker成为了新的Leader Broker,然后生产者写入新的LeaderBroker的一个过程。
新选举出来的Leader会把数据通过DLedger同步给剩下的一个Follower Broker
notion image
 

DLedger技术的主从同步和普通主从同步差别

基于DLedger技术管理CommitLog之后,可以自动在一组Broker中选举出来一个Leader然后在Leader接收消息写入的时候,基于DLedger技术写入本地CommitLog中,这个其实跟之前让Broker自己直接写入CommitLog是没什么区别的。
但是有区别的一点在于,Leader Broker上的DLedger在收到一个消息,将uncommitted消息写入自己本地存储之后,还需要基于Raft协议的算法,去采用两阶段的方式把uncommitted消息同步给其他Follower Broker必须要超过一半的Follower Broker的DLedger对uncommitted消息返回ack,此时Leader Broker才能返回ACK给生产者,说这次写 入成功了。
当然很多人会有疑问,那么不需要等他们执行了commit操作之后再返回给生产者吗?
实际上在这里只要有超过半数的Follower Broker都写入uncommitted消息之后,就可以返回给生产者了。
因此哪怕此时Leader Broker宕机了,超过半数的Follower Broker上也是有这个消息的,只不过是uncommitted状态,但是新选举的Leader Broker可以根据剩余Follower Broker上这个消息的状态去进行数据恢复,比如把消息状态调整为committed。
也就是说,这样的一个架构对每次写入都平添了一个成本,每次写入都必须有超过半数的Follower Broker都写入消息才可以算做一次 写入成功
基于DLedger技术管理CommitLog之后,可以自动在一组Broker中选举出来一个Leader然后在Leader接收消息写入的时候,基于DLedger技术写入本地CommitLog中,这个其实跟之前让Broker自己直接写入CommitLog是没什么区别的。
但是有区别的一点在于,Leader Broker上的DLedger在收到一个消息,将uncommitted消息写入自己本地存储之后,还需要基于Raft协议的算法,去采用两阶段的方式把uncommitted消息同步给其他Follower Broker必须要超过一半的Follower Broker的DLedger对uncommitted消息返回ack,此时Leader Broker才能返回ACK给生产者,说这次写 入成功了。
当然很多人会有疑问,那么不需要等他们执行了commit操作之后再返回给生产者吗?实际上在这里只要有超过半数的Follower Broker都写入uncommitted消息之后,就可以返回给生产者了。
因此哪怕此时Leader Broker宕机了,超过半数的Follower Broker上也是有这个消息的,只不过是uncommitted状态,但是新选举的Leader Broker可以根据剩余Follower Broker上这个消息的状态去进行数据恢复,比如把消息状态调整为committed。
 
也就是说,这样的一个架构对每次写入都平添了一个成本,每次写入都必须有超过半数的Follower Broker都写入消息才可以算做一次写入成功
这样做是不是会对Leader Broker的写入性能产生影响?是不是会降低TPS?
这样的方式会带来性能影响,因为这里的操作是同步的,但是为了保证数据的高可用和不丢失,这里是有必要的。
是不是必须要在所有的场景都这么做?为什么?
对于允许部分数据丢失的业务场景,可以不用这种方式

金融级的系统如何针对RocketMQ集群崩溃设计高可用方案?

金融级的系统中如果依赖了RocketMQ集群,那么在RocketMQ集群彻底崩溃的时候,我们应该如何设计他的高可用方案呢?
比如跟金钱相关的一些系统,他可能需要依赖MQ去传递消息,如果你MQ突然崩溃了,可能导致很多跟钱相关的东西就会出问题。
针对这种场景,我们通常都会在你发送消息到MQ的那个系统中设计高可用的降级方案,这个降级方案通常的思路是,你需要在你发送消息到MQ代码里去try catch捕获异常,如果你发现发送消息到MQ有异常,此时你需要进行重试。
如果你发现连续重试了比如超过3次还是失败,说明此时可能就是你的MQ集群彻底崩溃了,此时你必须把这条重要的消息写入到本地存储中去,可以是写入数据库里,也可以是写入到机器的本地磁盘文件里去,或者是NoSQL存储中去,几种方式我们都做过,具体要根据你们的具体情况来决定。
之后你要不停的尝试发送消息到MQ去,一旦发现MQ集群恢复了,你必须有一个后台线程可以把之前持久化存储的消息都查询出来,然后依次按照顺序发送到MQ集群里去,这样才能保证你的消息不会因为MQ彻底崩溃会丢失。
这里要有一个很关键的注意点,就是你把消息写入存储中暂存时,一定要保证他的顺序,比如按照顺序一条一条的写入本地磁盘文件去暂存消息。
而且一旦MQ集群故障了,你后续的所有写消息的代码必须严格的按照顺序把消息写入到本地磁盘文件里去暂存,这个顺序性是要严格保证的。
只要有这个方案在,那么哪怕你的MQ集群突然崩溃了,你的系统也是不会丢失消息的,对于一些跟金钱相关的金融系统、广告系统来说,这种高可用的方案设计,是非常的有必要的。

为什么要给RocketMQ增加消息限流功能保证其高可用性?

notion image
上面是简化以后的代码,实际上当时那段代码里混杂了很多的业务逻辑,但是当时出现的一个问题,就是业务代码报错了然后进入了catch代码块,结果那个工程师居然在catch代码块里写了一个while死循环,不停的发送消息。
而且上述系统当时是部署在10多台机器上的一个系统,所以相当于10多台机器都频繁的开足CPU的马力,拼命的往MQ里写消息,瞬间就导致MQ集群的TPS飙升,里面混入了大量的重复消息,而且MQ集群都快挂了。
所以针对这种场景,其实站在MQ的角度而言,你是没有办法去避免各种系统用上述的白痴方法来使用你的,毕竟公司大了,什么样的人都有,什么样的情况都可能出现,所以对MQ而言,你就必须去改造一下开源MQ的内核源码。
在接收消息这块,必须引入一个限流机制,也就是说要限制好,你这台机器每秒钟最多就只能处理比如3万条消息,根据你的MQ集群的压测结果来,你可以通过压测看看你的MQ最多可以抗多少QPS,然后就做好限流。
一般来说,限流算法可以采取令牌桶算法,也就是说你每秒钟就发放多少个令牌,然后只能允许多少个请求通过。关于限流算法的实现,不在我们的讨论范围内,大家可以自己查阅一下资料,也并不是很难。
我们这里主要是给大家讲一下,很多互联网大厂其实都会改造开源MQ的内核源码,引入限流机制,然后只能允许指定范围内的消息被在一秒内被处理,避免因为一些异常的情况,导致MQ集群挂掉。