RocketMQ的消费者原理

和Broker通信原理

消费者系统其实跟生产者系统原理是类似的,他们也会跟NameServer建立长连接,然后拉取路由信息,接着找到自己要获取消息的Topic在哪几台Broker上,就可以跟Broker建立长连接,从里面拉取消息了
notion image
这里唯一要注意的一点是,消费者系统可能会从Master Broker拉取消息,也可能从Slave Broker拉取消息,都有可能,一切都看具体情况。
 
在理解了Broker数据存储机制以及Broker高可用主从同步机制之后,我们就可以来看一下消费者是如何从Broker获取消息,并且进行处理以及维护消费进度的。

消费者组

首先,我们需要了解的第一个概念,就是消费者组
消费者组的意思,就是让你给一组消费者起一个名字。比如我们有一个Topic叫“TopicOrderPaySuccess”,然后假设有库存系统、积分系统、营销系统、仓储系统他们都要去消费这个Topic中的数据。
此时我们应该给那四个系统分别起一个消费组的名字,比如说:stock_consumer_groupmarketing_consumer_groupcredie_consumer_groupwms_consumer_group
设置消费组的方式是在代码里进行的,类似下面这样:
notion image
然后比如库存系统部署了4台机器,每台机器上的消费者组的名字都是“stock_consumer_group”,那么这4台机器就同属于一个消费者组,以此类推,每个系统的几台机器都是属于各自的消费者组的。
 
看下面的图,里面示意了两个系统,每个系统都有2台机器,每个系统都有一个自己的消费组。
然后给大家先解释一下不同消费者之间的关系,假设库存系统和营销系统作为两个消费者组,都订阅了“TopicOrderPaySuccess”这个订单支付成功消息的Topic,此时假设订单系统作为生产者发送了一条消息到这个Topic,如下图所示。
notion image

消息在多个消费组中分配方式

此时这条消息是怎么被消费的呢?
正常情况下来说,这条消息进入Broker之后,库存系统和营销系统作为两个消费组,每个组都会拉取到这条消息。
也就是说这个订单支付成功的消息,库存系统会获取到一条,营销系统也会获取到一条,他们俩都会获取到这条消息。
 
但是下一个问题来了,库存系统这个消费组里有两台机器,是两台机器都获取到这条消息?还是说只有一台机器会获取到这条消息?
答案是,正常情况下来说,库存系统的两台机器中只有一台机器会获取到这条消息,营销系统也是同理。
我们看下面的图,示意了对于一条订单支付成功的消息,库存系统的一台机器获取到了,营销系统的一台机器也获取到了。
notion image
不同的系统应该设置不同的消费组,如果不同的消费组订阅了同一个Topic,对Topic里的一条消息,每个消费组都会获取到这条消息。
 

同个消费组内的消费方式

集群模式消费 vs 广播模式消费

默认情况下我们都是集群模式,也就是说,一个消费组获取到一条消息,只会交给组内的一台机器去处理,不是每台机器都可以获取到这条消息的。 但是我们可以通过如下设置来改变为广播模式: consumer.setMessageModel(MessageModel.BROADCASTING)
如果修改为广播模式,那么对于消费组获取到的一条消息,组内每台机器都可以获取到这条消息
但是相对而言广播模式其实用的很少,常见基本上都是使用集群模式来进行消费的。
 
 

一个消费组内的多台机器是如何分配MessageQueue的

Topic中的多个MessageQueue会分散在多个Broker上,在每个Broker机器上,一个MessageQueue就对应了一个ConsumeQueue,当然在物理磁盘上其实是对应了多个ConsumeQueue文件的,但是我们大致也理解为一一对应关系。
 
但是对于一个Broker机器而言,存储在他上面的所有Topic以及MessageQueue的消息数据都是写入一个统一的CommitLog的
对于Topic的各个MessageQueue而言,就是通过各个ConsumeQueue文件来存储属于MessageQueue的消息在CommitLog文件中的物理地址,就是一个offset偏移量,
我在下面的图中标识出来了这个地址应用的关系
notion image
接着我们来想一个问题,对于一个Topic上的多个MessageQueue,是如何由一个消费组中的多台机器来进行消费的呢?
其实这里的源码实现细节是较为复杂的,但我们可以简单的理解为,他会均匀的将MessageQueue分配给消费组的多台机器来消费
举个例子,假设我们的TopicOrderPaySuccess有4个MessageQueue,这4个MessageQueue分布在两个Master Broker上,每个Master Broker上有2个MessageQueue
然后库存系统作为一个消费组里有两台机器,那么正常情况下,当然最好的就是让这两台机器每个都负责2个MessageQueue的消费了
比如库存系统的机器01从Master Broker01上消费2个MessageQueue,然后库存系统的机器02从Master Broker02上消费2个MessageQueue,这样不就把消费的负载均摊到两台Master Broker上去了?
所以你大致可以认为一个Topic的多个MessageQueue会均匀分摊给消费组内的多个机器去消费,这里的一个原则就是,一个MessageQueue只能被一个消费机器去处理,但是一台消费者机器可以负责多个MessageQueue的消息处理。
 

消费者机器是如何从broker拉取消息的

Push消费模式 vs Pull消费模式

现在我们已经知道了一个消费组内的多台机器是分别负责一部分MessageQueue的消费的,那么既然如此,每台机器都必须去连接到对应的Broker,尝试消费里面的MessageQueue对应的消息了。
此时就要涉及到两种消费模式了,之前我们也提到过,一个是Push,一个是Pull
实际上,这两个消费模式本质是一样的,都是消费者机器主动发送请求到Broker机器去拉取一批消息下来。
Push消费模式本质底层也是基于这种消费者主动拉取的模式来实现的,只不过他的名字叫做Push而已,意思是Broker会尽可能实时的把新消息交给消费者机器来进行处理,他的消息时效性会更好。
一般我们使用RocketMQ的时候,消费模式通常都是基于他的Push模式来做的,因为Pull模式的代码写起来更加的复杂和繁琐,而且Push模式底层本身就是基于消息拉取的方式来做的,只不过时效性更好而已。

Push模式的实现原理

Push模式的实现思路:当消费者发送请求到Broker去拉取消息的时候,如果有新的消息可以消费那么就会立马返回一批消息到消费机器去处理,处理完之后会接着立刻发送请求到Broker机器去拉取下一批消息。
所以消费机器在Push模式下会处理完一批消息,立马发起请求拉取下一批消息,消息处理的时效性非常好,看起来就跟Broker一直不停的推送消息到消费机器一样。 另外Push模式下有一个请求挂起和长轮询的机制,也要给大家简单介绍一下。
当你的请求发送到Broker,结果他发现没有新的消息给你处理的时候,就会让请求线程挂起,默认是挂起15秒,然后这个期间他会有后台线程每隔一会儿就去检查一下是否有的新的消息给你,另外如果在这个挂起过程中,如果有新的消息到达了会主动唤醒挂起的线程,然后把消息返回给你。
当然其实消费者进行消息拉取的底层源码是非常复杂的,涉及到大量的细节,但是他的核心思路大致就是如此,我们只要知道,哪怕是用常见的Push模式消费,本质也是消费者不停的发送请求到broker去拉取一批一批的消息就行了
 

具体代码

notion image
注意里面Consumer的类名:DefaultMQPushConsumer
从类名中我们可以提取出来一个关键的信息:Push。其实从这里我们就能看出来,当前我们使用的消息消费实际上是Push模式。
那么什么是Push消费模式呢?
其实很简单,就是Broker会主动把消息发送给你的消费者,你的消费者是被动的接收Broker推送给过来的消息,然后进行处理。这个就是所谓的Push模式,意思就是Broker主动推送消息给消费者。
 
notion image
在上述代码中,我们可以看到使用的Consumer类是DefaultMQPullConsumer,从名字里就可以看到使用了Pull消费模式。也就是说,Broker不会主动推送消息给Consumer,而是消费者主动发送请求到Broker去拉取消息过来。
 
 

Broker读取消息返回消费者的流程

其实这里要涉及到两个概念,分别是ConsumeQueueCommitLog
假设一个消费者机器发送了拉取请求到Broker了,他说我这次要拉取MessageQueue0中的消息,然后我之前都没拉取过消息,所以就从这个MessageQueue0中的第一条消息开始拉取好了。
于是,Broker就会找到MessageQueue0对应的ConsumeQueue0,从里面找到第一条消息的offset,如下图所示。
notion image
接着Broker就需要根据ConsumeQueue0中找到的第一条消息的地址,去CommitLog中根据这个offset地址去读取出来这条消息的数据,然后把这条消息的数据返回给消费者机器,如下图所示。
notion image
所以其实消费消息的时候,本质就是根据你要消费的MessageQueue以及开始消费的位置,
去找到对应的ConsumeQueue读取里面对应位置的消息在CommitLog中的物理offset偏移量,然后到CommitLog中根据offset读取消息数据,返回给消费者机器

费者机器如何处理消息、进行ACK以及提交消费进度

接着消费者机器拉取到一批消息之后,就会将这批消息回调我们注册的一个函数,如下面这样子:
notion image
当我们处理完这批消息之后,消费者机器就会提交我们目前的一个消费进度到Broker上去,
然后Broker就会存储我们的消费进度比如我们现在对ConsumeQueue0的消费进度假设就是在offset=1的位置,那么他会记录下来一个ConsumeOffset的东
西去标记我们的消费进度,如下图。
notion image
那么下次这个消费组只要再次拉取这个ConsumeQueue的消息,就可以从Broker记录的消费位置开始继续拉取,不用重头开始拉取了。

消费者cash或者扩容的处理

最后我们来看一下,如果消费组中出现机器宕机或者扩容加机器的情况,他会怎么处理?
这个时候其实会进入一个rabalance的环节,也就是说重新给各个消费机器分配他们要处理的MessageQueue
给大家举个例子,比如现在机器01负责MessageQueue0Message1,机器02负责MessageQueue2MessageQueue3,现在机器02宕机了,那么机器01就会接管机器02之前负责的MessageQueue2MessageQueue3
或者如果此时消费组加入了一台机器03,此时就可以把机器02之前负责的MessageQueue3转移给机器03,然后机器01就仅仅负责一个MessageQueue2的消费了,这就是负载重平衡的概念。
 

消费者到底是根据什么策略从Master或Slave上拉取消息的

消息消费,可以从Master Broker拉取,也可以从Slave Broker拉取,具体是要看机器负载来定。所以很多人都会有一个疑问,那到底什么时候从Master Broker拉取,什么时候从Slave Broker拉取。
所以我们先来简单回顾一下,之前我们对Broker的读写分离架构是怎么描述的。
RocketMQ的Broker的主从架构原理
消费者都是连接到Master Broker机器去拉取消息的,然后如果Master Broker机器觉得自己负载比较高,就会告诉消费者机器,下次可以从Slave Broker机器去拉取。
我们看下面的图,图里示意是说Master BrokerSlave Broker都可以拉取消息
notion image

读取ConsumeQueue文件优化

ConsumeQueue会被大量的消费者发送的请求给高并发的读取,所以ConsumeQueue文件的读操作是非常频繁的,而且同时会极大的影响到消费者进行消息拉取的性能和消费吞吐量。
所以实际上broker对ConsumeQueue文件同样也是基于page cache来进行优化的
也就是说,对于Broker机器的磁盘上的大量的ConsumeQueue文件,在写入的时候也都是优先进入page cache中的
而且os自己有一个优化机制,就是读取一个磁盘文件的时候,他会自动把磁盘文件的一些数据缓存到page cache中。
道ConsumeQueue文件主要是存放消息的offset,所以每个文件很小,30万条消息的offset就只有5.72MB而已。所以实际上ConsumeQueue文件们是不占用多少磁盘空间的,他们整体数据量很小,几乎可以完全被os缓存在内存cache里。
大家看下面的图,我们示意了ConsumeQueue文件几乎都是放在os cache里的。
notion image
所以实际上在消费者机器拉取消息的时候,第一步大量的频繁读取ConsumeQueue文件,几乎可以说就是跟读内存里的数据的性能是一样的,通过这个就可以保证数据消费的高性能以及高吞吐
 

读取commitlog优化

在进行消息拉取的时候,先读os cache里的少量ConsumeQueue的数据,这个性能是极高的,然后第二步就是要根据你读取到的offset去CommitLog里读取消息的完整数据了。
那么大家可以思考一下,这个从Commit Log里读取消息完整数据是如何读取的?是从page cache里读取?还是从磁盘里读取?
答案是:两者都有
因为CommitLog是用来存放消息的完整数据的,所以内容量是很大的,毕竟他一个文件就要1GB,所以整体完全有可能多达几个TB。
这么多的数据,可能都放在os cache里吗?
明显是不可能的,因为page cache用的也是机器的内存,一般多也就几十个GB而已,何况Broker自身的JVM也要用一些内存,留个page cache的内存只是一部分罢了,比如10GB~20GB的内存,所以page cache对于Commit Log而言,是无法把他全部数据都放在里面给你读取的!
也就是说,page cache对于CommitLog而言,主要是提升文件写入性能,当你不停的写入的时候,很多最新写入的数据都会先停留在page cache里,比如这可能有10GB~20GB的数据。
之后os会自动把cache里的比较旧的一些数据刷入磁盘里,腾出来空间给更新写入的数据放在page cache里,所以大部分数据可能多达几个TB都是在磁盘上的
我们看下面图里的示意,就是这个意思。
notion image
所以最终结论来了,当你拉取消息的时候,可以轻松从page cache里读取少量的ConsumeQueue文件里的offset,这个性能是极高的,但是当你去CommitLog文件里读取完整消息数据的时候,会有两种可能。
  1. 如果你读取的是那种刚刚写入Commit Log的数据,那么大概率他们还停留在page cache中,此时你可以顺利的直接从oscache里读取CommitLog中的数据,这个就是内存读取,性能是很高的。
  1. 你也许读取的是比较早之前写入Commit Log的数据,那些数据早就被刷入磁盘了,已经不在page cache里了,那么此时你就只能从磁盘上的文件里读取了,这个性能是比较差一些的。
如果你的消费者机器一直快速的在拉取和消费处理,紧紧的跟上了生产者写入broker的消息速率,那么你每次拉取几乎都是在拉取最近人家刚写入CommitLog的数据,那几乎都在os cache里。
但是如果broker的负载很高,导致你拉取消息的速度很慢,或者是你自己的消费者机器拉取到一批消息之后处理的时候性能很低,处理的速度很慢,这都会导致你跟不上生产者写入的速率。 比如人家都写入10万条数据了,结果你才拉取了2万条数据,此时有5万条最新的数据是在os cache里,有3万条你还没拉取的数据是在磁盘里,那么当后续你再拉取的时候,必然很大概率是从磁盘里读取早就刷入磁盘的3万条数据。
接着之前在os cache里的5万条数据可能又被刷入磁盘了,取而代之的是更新的几万条数据在os cache里,然后你再次拉取的时候,又会从磁盘里读取刷入磁盘里的5万条数据,相当于你每次都在从磁盘里读取数据了!

Master Broker什么时候会让你从Slave Broker拉取数据

假设此时你的broker里已经写入了10万条数据,但是你仅仅拉取了2万条数据,下次你拉取的时候,是从第2万零1条数据开始继续往后拉取的,是不是? 也就是说,此时你有8万条数据是没有拉取的!
然后broker自己是知道机器上当前的整体物理内存有多大的,而且他也知道自己可用的最大空间占里面的比例,他是知道自己的消息最多可以在内存里放多少的!比如他心知肚明,他最多也就在内存里放5万条消息而已!
因为他知道,他最多只能利用10GB的os cache去放消息,这么多内存最多也就放5万左右的消息。
然后这个时候你过来拉取消息,他发现你还有8万条消息没有拉取,这个8万条消息他发现是大于10GB内存最多存放的5万条消息的,那么此时就说明,肯定有3万条消息目前是在磁盘上的,不在os cache内存里!
我们看下面的图,就分别示意了os cache里和磁盘上你没拉取的消息数量。
notion image
所以他经过上述判断,会发现此时你很大概率会从磁盘里加载3万条消息出来!他会认为,出现这种情况,很可能是因为自己作为master broker负载太高了,导致没法及时的把消息给你,所以你落后的进度比较多。
这个时候,他就会告诉你,我这次给你从磁盘里读取3万条消息,但是下次你还是从slave broker去拉取吧!
以上就是这个关键问题的解答,本质是对比你当前没有拉取消息的数量和大小,以及最多可以存放在os cache内存里的消息的大小,如果你没拉取的消息超过了最大能使用的内存的量,那么说明你后续会频繁从磁盘加载数据,此时就让你从slave broker去加载数据了!
 

提高消费者的吞吐量

如果消费的时候发现消费的比较慢,那么可以提高消费者的并行度,常见的就是部署更多的consumer机器但是这里要注意,你的Topic的MessageQueue得是有对应的增加,因为如果你的consumer机器有5台,然后MessageQueue只有4个,那么意味着有一个consumer机器是获取不到消息的。
然后就是可以增加consumer的线程数量,可以设置consumer端的参数:consumeThreadMin、consumeThreadMax,这样一台consumer机器上的消费线程越多,消费的速度就越快。
此外,还可以开启消费者的批量消费功能,就是设置consumeMessageBatchMaxSize参数,他默认是1,但是你可以设置的多一些,那么一次就会交给你的回调函数一批消息给你来处理了,此时你可以通过SQL语句一次性批量处理一些数据,比如:update xxx setxxx where id in (xx,xx,xx)。
通过批量处理消息的方式,也可以大幅度提升消息消费的速度。

消费历史消息

consumer是支持设置从哪里开始消费消息的,常见的有两种:一个是从Topic的第一条数据开始消费,一个是从最后一次消费过的消息之后开始消费。对应的是:CONSUME_FROM_FIRST_OFFSETCONSUME_FROM_LAST_OFFSET
一般来说,我们都会选择CONSUME_FROM_FIRST_OFFSET,这样你刚开始就从Topic的第一条消息开始消费,但是以后每次重启,你都是从上一次消费到的位置继续往后进行消费的。

消费者常见问题汇总

  • 一般我们获取到一批消息之后,什么时候才可以认为是处理完这批消息了?是刚拿到这批消息就算处理完吗?还是说要对这批消息执行完一大堆的数据库之类的操作,才算是处理完了?
    • 只有提交了消费进度,才会被认为消费完成。
  • 如果获取到了一批消息,还没处理完呢,结果机器就宕机了,此时会怎么样?这些消息会丢失,再也无法处理了吗?
    • 没有处理完,宕机了,因为没有提交消费进度, 此时mq会重新投递消息,消息不会丢失
  • 如果获取到了一批消息,已经处理完了,还没来得及提交消费进度,此时机器宕机了,会怎么样呢?
    • 没有处理完,宕机了,此时mq会重新投递消息,消息不会丢失,但是存在重复消费的问题,业务上要做幂等设计
  • 消费者机器到底是跟少数几台Broker建立连接,还是跟所有Broker都建立连接?
    • 消费者订阅的Topic中的MessageQueue分布在几个Broker上,就跟几个Broker连接
  • Kafka、RabbitMQ他们支持主从架构下的读写分离吗?支持Slave Broker的读取吗?
    • kafka不支持,因为kafka根据leader patition分布到不同的机器的方式来实现负载均衡的,而且考虑到从节点负载均衡读会有数据不一致和延时的问题
  • 消费吞吐量似乎是跟你的处理速度有很大关系,如果你消费到一批数据,处理太慢了,会导致你严重跟不上数据写入的速度,这会导致你后续几乎每次拉取数据都会从磁盘上读取,而不是os cache里读取,所以你觉得你在拉取到一批消息处理的时候,应该有哪些要点需要注意的?
      1. 尽量保证消费者和生产者的速率一致。
      1. 消费速度太慢,可以使用多线程处理
      1. 多开几个消费者,假如说消费者和生产者的速率为1:4,那么可以使用4个消费者,一个生产者,来达到平衡