Broker数据持久化

MQ中的数据模型 :Topic

Topic 实际意义上讲是 数据集合的意思。
举个例子,现在你的订单系统需要往MQ里发送订单消息,那么此时你就应该建一个Topic,他的名字可以叫做:topic_order_info,也就是一个包含了订单信息的数据集合。
然后你的订单系统投递的订单消息都是进入到这个topic_order_info里面去的,如果你的仓储系统要获取订单消息,那么他可以指定从topic_order_info这里面去获取消息,获取出来的都是他想要的订单消息了。
一句话:Topic其实就是一个数据集合的意思,不同类型的数据你得放不同的Topic里去。
要是你有一些商品数据要发送消息到MQ里,你就应该创建一个Topic叫做topic_product_info,代表里面都是商品数据,那些想要从MQ里获取商品数据的系统就可以从topic_product_info里获取了。
所以简单来说,你的系统如果要往MQ里写入消息或者获取消息,首先得创建一些Topic,作为数据集合存放不同类型的消息,比如说订单Topic,商品Topic,等等。

Topic在Broker集群里的存储方式

我们创建的那些Topic是怎么存储在Broker集群里的呢?这里就体现出来一个分布式存储的概念了。
首先我们来想一下,比如我们有一个订单Topic,可能订单系统每天都会往里面投递几百万条数据,然后这些数据在MQ集群上还得保留好多天,那么最终可能会有几千万的数据量,这还只是一个Topic。
那么如果有很多的Topic,并且里面都有大量的数据,最终加起来的总和也许是一个惊人的数字,此时这么大量的数据本身是不太可能存放在一台机器上的。 如果一台机器没法放下那么多的数据,应该怎么办呢?很简单,分布式存储。
我们可以在创建Topic的时候指定让他里面的数据分散存储在多台Broker机器上,比如一个Topic里有1000万条数据,此时有2台Broker,那么就可以让每台Broker上都放500万条数据。这样就可以把一个Topic代表的数据集合分布式存储在多台机器上了。
 
notion image
每个Broker在进行定时的心跳汇报给NameServer的时候,都会告诉NameServer自己当前的数据情况,比如有哪些Topic的哪些数据在自己这里,这些信息都是属于路由信息的一部分。

MessageQueue

MessageQueue是什么

而要明白MessageQueue是什么,就必须把他跟Topic以及Broker综合起来看,才能搞明白。
如果我们要使用RocketMQ,你先部署出来一套RocketMQ集群这个肯定是必须的,在有了集群之后,就必须根据你的业务需要去创建一些Topic。
比如之前我们看到,我们需要一个“TopicOrderPaySuccess”的Topic去存放订单支付成功的消息
像这些Topic就可以在之前我们讲过的RocketMQ可视化工作台里去创建,在里面就可以创建一个Topic出来,在创建Topic的时候需要指定一个很关键的参数,就是MessageQueue。
简单来说,就是你要指定你的这个Topic对应了多少个队列,也就是多少个MessageQueue。那么这个MessageQueue是用来干嘛的?
举一个例子
比如你现在有一个Topic,我们为他指定创建了4个MessageQueue,那么我们接着来思考一下,这个Topic的数据在Broker集群中是如何分布的?
每个Topic的数据都是分布式存储在多个Broker中的,比如下面的图里我们会看到这个示意。
notion image

Topic、MessageQueue以及Broker关系

数据分片机制

但是我们如何决定这个Topic的哪些数据放这个Broker上,哪些数据放那个Broker上?这是一个问题
所以在这里RocketMQ引入了MessageQueue的概念,本质上就是一个数据分片的机制
在这个机制中,假设你的Topic有1万条数据,然后你的Topic有4个MessageQueue,那么大致可以认为会在每个MessageQueue中放入2500条数据
当然,这个不是绝对的,有可能有的MessageQueue的数据多,有的数据少,这个要根据你的消息写入MessageQueue的策略来定。
但是我们这里先假定在每个MessageQueue中会平均分配Topic的数据吧,那么下一个问题来了,我们有4个MessageQueue平均分配了Topic的数据,这些MessageQueue放在哪里?
当然是放在Broker上了!也就是说,很有可能就是在2个Broker上,每个Broker放两个MessageQueue,我们看下面的图就是这个示意。
notion image
所以其实MessageQueue就是RocketMQ中非常关键的一个数据分片机制,
他通过MessageQueue将一个Topic的数据拆分为了很多个数据分片,然后在每个Broker机器上都存储一些MessageQueue。
通过这个方法,就可以实现Topic数据的分布式存储!
其他mq的分片机制
notion image
 

消息写入MessageQueue的机制

生产者会跟NameServer进行通信获取Topic的路由数据。
所以生产者从NameServer中就会知道,一个Topic有几个MessageQueue,哪些MessageQueue在哪台Broker机器上,哪些MesssageQueue在另外一台Broker机器上。
现在我们暂时先认为生产者会均匀的把消息写入各个MessageQueue,就是比如这个生产者发送出去了20条数据,那么4个MessageQueue就是每个都会写入5条数据。
notion image
通过这个方法,是不是就可以让生产者把写入请求分散给多个Broker?是不是也可以让每个Broker都均匀分摊到一定的写入请求压力?
这样假设单个Broker可以抗每秒7万并发,那么两个Broker就可以抗每秒14万并发!这样就可以实现RocketMQ集群抗下每秒10万+超高并发的场景了!
另外通过这个方法,是不是就可以让一个Topic中的数据分散在多个MessageQueue中,进而分散在多个Broker机器上?这样就可以实现RocketMQ集群分布式存储海量的消息数据了

数据持久化的重要性

首先我们得明确一点,为什么Broker数据存储是最重要的一个环节? 很简单,实际上类似RocketMQ、Kafka、RabbitMQ的消息中间件系统,他们不只是让你写入消息和获取消息那么简单,他们本身最重要的就是提供强大的数据存储能力,可以把亿万级的海量消息存储在自己的服务器的磁盘上。
这样的话,各种不同的系统从MQ中消费消息的时候,才可以从MQ服务器的磁盘中读取到自己需要的消息。
否则如果MQ不在机器磁盘上存储大量的消息,如果消息都放在自己的内存里,一个是内存很可能放不下,另外一个是可能你机器重启,内存里的消息就会全部丢失了。
所以大家首先要明确一点,Broker数据存储实际上才是一个MQ最核心的环节,他决定了生产者消息写入的吞吐量,决定了消息不能丢失,决定了消费者获取消息的吞吐量,这些都是由他决定的。

消息持久化过程

首先我们来思考一下,当生产者的消息发送到一个Broker上的时候,他接收到了一条消息,接着他会对这个消息做什么事情?

CommitLog数据存储机制

首先第一步,他会把这个消息直接写入磁盘上的一个日志文件,叫做CommitLog,直接顺序写入这个文件,如下图。
notion image
这个CommitLog是很多磁盘文件,每个文件限定最多1GB,Broker收到消息之后就直接追加写入这个文件的末尾,就跟上面的图里一样。如果一个CommitLog写满了1GB,就会创建一个新的CommitLog文件。

磁盘文件顺序写+OS PageCache写入

对于生产者把消息写入到Broker时,Broker会直接把消息写入磁盘上的CommitLog文件,那么Broker是如何提升整个过程的性能的呢?
因为这个部分的性能提升会直接提升Broker处理消息写入的吞吐量,比如你写入一条消息到CommitLog磁盘文件假设需要10ms,那么每个线程每秒可以处理100个写入消息,假设有100个线程,每秒只能处理1万个写入消息请求。
但是如果你把消息写入CommitLog磁盘文件的性能优化为只需要1ms,那么每个线程每秒可以处理1000个消息写入,此时100个线程每秒可以处理10万个写入消息请求。所以大家可以明显看到,Broker把接收到的消息写入CommitLog磁盘文件的性能,对他的TPS有很大的影响。
Broker是基于OS操作系统的PageCache和顺序写两个机制,来提升写入CommitLog文件的性能的。
首先Broker是以顺序的方式将消息写入CommitLog磁盘文件的,也就是每次写入就是在文件末尾追加一条数据就可以了,对文件进行顺序写的性能要比对文件随机写的性能提升很多
我们看下面图里的红圈,就是示意数据是顺序写入的。
notion image
数据写入CommitLog文件的时候,其实不是直接写入底层的物理磁盘文件的,而是先进入OS的PageCache内存缓存中,后续由OS的后台线程选一个时间,异步化的将OS PageCache内存缓冲中的数据刷入底层的磁盘文件。
我们看下面的图,图里示意出了,数据先写入OS的PageCache缓存中,然后后续由OS自己的线程将缓存里的数据刷入磁盘中。
notion image
所以在这样的优化之下,采用磁盘文件顺序写+OS PageCache写入+OS异步刷盘的策略,基本上可以让消息写入CommitLog的性能 跟你直接写入内存里是差不多的,所以正是如此,才可以让Broker高吞吐的处理每秒大量的消息写入。

异步刷盘的风险

在上述的异步刷盘模式下,生产者把消息发送给Broker,Broker将消息写入OS PageCache中,就直接返回ACK给生产者了。此时生产者就认为消息写入成功了,那么会有什么问题吗? 问题肯定是有的,如果生产者认为消息写入成功了,但是实际上那条消息此时是在Broker机器上的os cache中的,如果此时Broker直接宕机,那么是不是os cache中的这条数据就会丢失了? 我们看下面的图,红圈圈出来了数据早os cache里的情况,如果此时broker宕机,那么必然导致这里的数据丢失,而producer还以为数据已经写入成功了,以为不会丢失,所以肯定是有问题的。
所以异步刷盘的的策略下,可以让消息写入吞吐量非常高,但是可能会有数据丢失的风险。

同步刷盘

另外一种模式叫做同步刷盘,如果你使用同步刷盘模式的话,那么生产者发送一条消息出去,broker收到了消息,必须直接强制把这个消息刷入底层的物理磁盘文件中,然后才会返回ack给producer,此时你才知道消息写入成功了。
只要消息进入了物理磁盘上,那么除非是你的物理磁盘坏了导致数据丢失,否则正常来说数据就不会丢失了,我们看下面的图,就是示意了同步刷盘的效果。
notion image
如果broker还没有来得及把数据同步刷入磁盘,然后他自己挂了,那么此时对producer来说会感知到消息发送失败了,然后你只要不停的重试发送就可以了,直到有slave broker切换成master broker重新让你可以写入消息,此时可以保证数据是不会丢的。
但是如果你强制每次消息写入都要直接进入磁盘中,必然导致每条消息写入性能急剧下降,导致消息写入吞吐量急剧下降,但是可以保证数据不会丢失。

适用场景

同步刷盘和异步刷盘各自的优缺点:高吞吐写入+丢失数据风险,写入吞吐量下降+数据不丢失
同步日志数据时候,可以允许高吞吐量,可以允许一定的数据丢失,库存,金钱这种数据,不允许丢失,牺牲一些性能,保证准确。
其他的消息队列默认都是采用异步的方式,如果想要不丢失可以设置持久化参数。刷盘方式参数:flushDiskType 主从复制参数:brokerRole
rokerRole参数进行设置的,这个参数可以被设置成 ASYNC_MASTERSYNC_MASTERSLAVE三个值中的一个。
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式,尤其是SYNC_FLUSH方式,由于频繁 的触发写磁盘动作,会明显降低性能。
通常情况下,应该把Master和Slave设置成ASYNC_FLUSH的刷盘方式, 主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然可以保证数据不丢。
Kafka的刷盘
  • kafka 也是采用page cache 加顺序写加零拷贝,异步刷盘等,kafka 设计初衷就是为了高吞吐量用来处理日志的。
  • Kafka是为了提升吞吐量采用批量异步刷盘的方式来做的,没有提供同步刷盘的配置。如果Kafka想要做到Broker这块的消息的零丢失,则需要通过 acks 的配置。将 acks 设置为 -1 ,则消息发送成功的标准就是 ISR 列表中所有 Replica 写消息成功,而 follower replica 同步 leader replica 的消息的前提,必须是 leader replica 的消息 从 os cache 中写入到 disk 中,不然的话 follower replica 是无法同步到 leader 数据的
rabbitmq
rabbitmq 使用顺序写到.rdq文件,通过缓存一份数据到ets表中,msg_store_persistent,msg_store_transient两个进程,一个用于持久消息的存储,一个用于内存不够时,将存储在内存中的非持久化数据转存到磁盘中。ets 表中记录id以及文件中映射,rabbitmq中对文件的操作封转到了file_handle_cache模块,以写模式打开文件时,默认有1M大小的缓存,即在进行文件的写操作时,是先写入到这个缓存中,当缓存超过大小或者显式刷新,才将缓存中的内容刷入磁盘中。

ConsumeQueue

物理位置存储机制

在Broker中,对Topic下的每个MessageQueue都会有一系列的ConsumeQueue文件。就是在Broker的磁盘上,会有下面这种格式的一系列文件$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
我们之前说过,对每个Topic你不是在这台Broker上都会有一些MessageQueue吗?所以你会看到,{topic}指代的就是某个Topic,{queueId}指代的就是某个MessageQueue
然后对存储在这台Broker机器上的Topic下的一个MessageQueue,他有很多的ConsumeQueue文件,这个ConsumeQueue文件里存储的是一条消息对应在CommitLog文件中的offset偏移量。
 
假设有一个Topic,他有4个MessageQueue,然后在两台Broker机器上,每台Broker机器会存储两个MessageQueue。
那么此时假设生产者选择对其中一个MessageQueue写入了一条消息,此时消息会发送到Broker上。
然后Broker必然会把这个消息写入自己的CommitLog文件中
Topic下的MessageQueue0和MessageQueue1就放在这个Broker机器上,而且他们每个MessageQueue目前在磁盘上就对应了一个ConsumeQueue,所以就是MessageQueue0对应着Broker磁盘上的ConsumeQueue0,MessageQueue1对应着磁盘上的ConsumeQueue1。
notion image
假设Queue的名字叫做:TopicOrderPaySuccess,那么此时在Broker磁盘上应该有如下两个路径的文件
  1. $HOME/store/consumequeue/TopicOrderPaySuccess/MessageQueue0/ConsumeQueue0磁盘文件
  1. $HOME/store/consumequeue/TopicOrderPaySuccess/MessageQueue1/ConsumeQueue1磁盘文件
 
当你的Broker收到一条消息写入了CommitLog之后,其实他同时会将这条消息在CommitLog中的物理位置,也就是一个文件偏移量,就是一个offset,写入到这条消息所属的MessageQueue对应的ConsumeQueue文件中去。
比如现在这条消息在生产者发送的时候是发送给MessageQueue0的,那么此时Broker就会将这条消息在CommitLog中的offset偏移量,写入到MessageQueue0对应的ConsumeQueue0中去,如下图所示
notion image
所以实际上,ConsumeQueue0中存储的是一个一个消息在CommitLog文件中的物理位置,也就是offset
ConsumeQueue中的一个物理位置其实是对CommitLog文件中一个消息的引用。
实际上在ConsumeQueue中存储的每条数据不只是消息在CommitLog中的offset偏移量,还包含了消息的长度,以及taghashcode,一条数据是20个字节,每个ConsumeQueue文件保存30万条数据,大概每个文件是5.72MB。
所以实际上Topic的每个MessageQueue都对应了Broker机器上的多个ConsumeQueue文件,保存了这个MessageQueue的所有消息在CommitLog文件中的物理位置,也就是offset偏移量。

page cache优化

ConsumeQueue会被大量的消费者发送的请求给高并发的读取,所以ConsumeQueue文件的读操作是非常频繁的,而且同时会极大的影响到消费者进行消息拉取的性能和消费吞吐量。
所以实际上broker对ConsumeQueue文件同样也是基于page cache来进行优化的
也就是说,对于Broker机器的磁盘上的大量的ConsumeQueue文件,在写入的时候也都是优先进入page cache中的
而且os自己有一个优化机制,就是读取一个磁盘文件的时候,他会自动把磁盘文件的一些数据缓存到page cache中。
道ConsumeQueue文件主要是存放消息的offset,所以每个文件很小,30万条消息的offset就只有5.72MB而已。所以实际上ConsumeQueue文件们是不占用多少磁盘空间的,他们整体数据量很小,几乎可以完全被os缓存在内存cache里。
大家看下面的图,我们示意了ConsumeQueue文件几乎都是放在os cache里的。
notion image
所以实际上在消费者机器拉取消息的时候,第一步大量的频繁读取ConsumeQueue文件,几乎可以说就是跟读内存里的数据的性能是一样的,通过这个就可以保证数据消费的高性能以及高吞吐
 
 
Broker中就是大量的使用mmap技术去实现CommitLog这种大磁盘文件的高性能读写优化的。
 

基于mmap内存映射实现磁盘文件的高性能读写

传统文件IO操作的多次数据拷贝问题

首先我们先来给大家分析一下,假设RocketMQ没有使用mmap技术,就是使用最传统和基本的普通文件IO操作去进行磁盘文件的读写,那么会存在什么样的性能问题?答案是:多次数据拷贝问题
首先,假设我们有一个程序,这个程序需要对磁盘文件发起IO操作读取他里面的数据到自己这儿来,那么会经过以下一个顺序:
首先从磁盘上把数据读取到内核IO缓冲区里去,然后再从内核IO缓存区里读取到用户进程私有空间里去,然后我们才能拿到这个文件里的数据
为了读取磁盘文件里的数据,是不是发生了两次数据拷贝?
没错,所以这个就是普通的IO操作的一个弊端,必然涉及到两次数据拷贝操作,对磁盘读写性能是有影响的
那么如果我们要将一些数据写入到磁盘文件里去呢?
那这个就是一样的过程了,必须先把数据写入到用户进程私有空间里去,然后从这里再进入内核IO缓冲区,最后进入磁盘文件里去
notion image
在数据进入磁盘文件的过程中,是不是再一次发生了两次数据拷贝?没错,所以这就是传统普通IO的问题,有两次数据拷贝问题。
 

RocketMQ是如何基于mmap技术+page cache技术优化的?

接着我们来看一下,RocketMQ如何利用mmap技术配合page cache技术进行文件读写优化的?
首先,RocketMQ底层对CommitLog、ConsumeQueue之类的磁盘文件的读写操作,基本上都会采用mmap技术来实现。
如果具体到代码层面,就是基于JDK NIO包下的MappedByteBuffermap()函数,来先将一个磁盘文件(比如一个CommitLog文件,或者是一个ConsumeQueue文件)映射到
内存里来这里
我必须给大家解释一下,这个所谓的内存映射是什么意思其实有的人可能会误以为是直接把那些磁盘文件里的数据给读取到内存里来了,类似这个意思,但是并不完全是对的。
因为刚开始你建立映射的时候,并没有任何的数据拷贝操作,其实磁盘文件还是停留在那里,只不过他把物理上的磁盘文件的一些地址和用户进程私有空间的一些虚拟内存地址进行了一个映射
notion image
这个地址映射的过程,就是JDK NIO包下的MappedByteBuffer.map()函数干的事情,底层就是基于mmap技术实现的。
另外这里给大家说明白的一点是,这个mmap技术在进行文件映射的时候,一般有大小限制,在1.5GB~2GB之间
所以RocketMQ才让CommitLog单个文件在1GB,ConsumeQueue文件在5.72MB,不会太大
这样限制了RocketMQ底层文件的大小,就可以在进行文件读写的时候,很方便的进行内存映射了。
然后接下来要给大家讲的一个概念,就是之前给大家说的PageCache,实际上在这里就是对应于虚拟内存
notion image

基于mmap技术+pagecache技术实现高性能的文件读写

接下来就可以对这个已经映射到内存里的磁盘文件进行读写操作了,比如要写入消息到CommitLog文件,你先把一个CommitLog文件通过MappedByteBuffer的map()函数映射其地址到你的虚拟内存地址。 接着就可以对这个MappedByteBuffer执行写入操作了,写入的时候他会直接进入PageCache中,然后过一段时间之后,由os的线程异步刷入磁盘中,如下图我们可以看到这个示意。
notion image
对了!就是上面的图里,似乎只有一次数据拷贝的过程,他就是从PageCache里拷贝到磁盘文件里而已!这个就是你使用mmap技术之后,相比于传统磁盘IO的一个性能优化。
接着如果我们要从磁盘文件里读取数据呢?
那么此时就会判断一下,当前你要读取的数据是否在PageCache里?如果在的话,就可以直接从PageCache里读取了!
比如刚写入CommitLog的数据还在PageCache里,此时你Consumer来消费肯定是从PageCache里读取数据的。
但是如果PageCache里没有你要的数据,那么此时就会从磁盘文件里加载数据到PageCache中去,
而且PageCache技术在加载数据的时候,还会将你加载的数据块的临近的其他数据块也一起加载到PageCache里去
notion image
在你读取数据的时候,其实也仅仅发生了一次拷贝,而不是两次拷贝,所以这个性能相较于传统IO来说,肯定又是提高了。

预映射机制 + 文件预热机制

接着给大家说几个Broker针对上述的磁盘文件高性能读写机制做的一些优化:
  1. 内存预映射机制
    1. Broker会针对磁盘上的各种CommitLogConsumeQueue文件预先分配好MappedFile,也就是提前对一些可能接下来要读写的磁盘文件,提前使用MappedByteBuffer执行map()函数完成映射,这样后续读写文件的时候,就可以直接执行了。
  1. 文件预热
    1. 在提前对一些文件完成映射之后,因为映射不会直接将数据加载到内存里来,那么后续在读取尤其是CommitLogConsumeQueue的时候,其实有可能会频繁的从磁盘里加载数据到内存中去。
所以其实在执行完map()函数之后,会进行madvise系统调用,就是提前尽可能多的把磁盘文件加载到内存里去。
通过上述优化,才真正能实现一个效果,就是写磁盘文件的时候都是进入PageCache的,保证写入高性能;同时尽可能多的通过map + madvise的映射后预热机制,把磁盘文件里的数据尽可能多的加载到PageCache里来,后续对CosumeQueue、CommitLog进行读取的时候,才能尽可能从内存里读取数据。
实际上在Broker读写磁盘的时候,是大量把mmap技术和pagecache技术结合起来使用的,通过mmap技术减少数据拷贝次数,然后 利用pagecache技术实现尽可能优先读写内存,而不是物理磁盘。