RocketMQ生产者原理

首先我们之前说过,在发送消息之前,得先有一个Topic,然后在发送消息的时候你得指定你要发送到哪个Topic里面去。

寻址发送的broker

拉取路由信息

接着既然你知道你要发送的Topic,那么就可以跟NameServer建立一个TCP长连接,然后定时从他那里拉取到最新的路由信息,包括集群里有哪些Broker,集群里有哪些Topic,每个Topic都存储在哪些Broker上。
notion image

负载均衡选择Broker

然后生产者系统自然就可以通过路由信息找到自己要投递消息的Topic分布在哪几台Broker上,此时可以根据负载均衡算法,从里面选择一台Broker机器出来,比如round robine轮询算法,或者是hash算法,都可以。

建立TCP长连接发消息

选择一台Broker之后,就可以跟那个Broker也建立一个TCP长连接,然后通过长连接向Broker发送消息即可。
Broker收到消息之后就会存储在自己本地磁盘里去。
notion image
这里唯一要注意的一点,就是生产者一定是投递消息到Master Broker的,然后Master Broker会同步数据给他的Slave Brokers,实现一份数据多份副本,保证Master故障的时候数据不丢失,而且可以自动把Slave切换为Master提供服务。

发送模式

同步发送

notion image
可以看到上面的代码片段就是我们目前发送消息到RocketMQ里去的代码,实际上这种方式就是所谓的同步发送消息到MQ
那么什么叫同步发送消息到MQ里去?
所谓同步,意思就是你通过这行代码发送消息到MQ去,SendResult sendResult = producer.send(msg),然后阻塞在这里,代码不能往下走了
要一直等待MQ返回一个结果给你,你拿到了SendResult之后,接着你的代码才会继续往下走。这个就是所谓的同步发送模式。

异步发送

首先在构造Producer的时候加入下面红框中的代码:
notion image
接着把发送消息的代码改成如下所示:
notion image
这个意思就是说,你把消息发送出去,然后上面的代码就直接往下走了,不会卡在这里等待MQ返回结果给你!
然后当MQ返回结果给你的时候,Producer会回调你的SendCallback里的函数,如果发送成功了就回调onSuccess函数,如果发送失败了就回调onException函数。
这个就是所谓的异步发送,异步的意思就是你发送消息的时候不会卡在上面那行代码等待MQ返回结果给你,会继续执行下面的别的代码,当MQ返回结果给你的时候,会回调你的函数!

单向发送

还有一种发送消息的方法,叫做发送单向消息,就是用下面的代码来发送消息:
notion image
这个sendOneway的意思,就是你发送一个消息给MQ,然后代码就往下走了,根本不会关注MQ有没有返回结果给你,你也不需要MQ返回的结果,无论发送的消息是成功还是失败,都不关你的事。
 

发送模式使用场景

消息发送模式:
  1. 同步发送:生产者发送消息,同步阻塞等待返回结果;适合可靠性高,不丢消息的场景,但是吞吐量低
  1. 异步发送:生产者发送消息,不阻塞,异步回调获取发送结果;适合吞吐量高,允许消息丢失的场景
  1. 单向发送:生产者发送消息,直接返回,不阻塞,没有返回结果;适合高吞吐量,不在意发送结果,只发送一次的场景。
RocketMQ和Kafka都有同步,异步,单向这三种发送模式;RabbitMQ有普通Confirm模式,批量Confirm模式,异步Confirm模式
 

自动容错机制

如果某个Broker临时出现故障了,比如Master Broker挂了,此时正在等待的其他Slave Broker自动热切换为Master Broker,那么这个时候对这一组Broker就没有Master Broker可以写入了
如果还是均匀把数据写入各个Broker上的MessageQueue,那么会导致在一段时间内,每次访问到这个挂掉的Master Broker都会访问失败,
对于这个问题,通常来说建议大家在Producer中开启一个开关,就是sendLatencyFaultEnable
一旦打开了这个开关,那么他会有一个自动容错机制,比如如果某次访问一个Broker发现网络延迟有500ms,然后还无法访问,那么就会自动回避访问这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker了。
这样的话,就可以避免一个Broker故障之后,短时间内生产者频繁的发送消息到这个故障的Broker上去,出现较多次数的异常。而是在一个Broker故障之后,自动回避一段时间不要访问这个Broker,过段时间再去访问他。
那么这样过一段时间之后,可能这个Master Broker就已经恢复好了,比如他的Slave Broker切换为了Master可以让别人访问了