RocketMQ数据过滤机制

binlog数据同步场景

一个数据库中可能会包含很多表的数据,比如订单数据库,他里面除了订单信息表以外,可能还包含很多其他的表。
所以我们在进行数据库binlog同步的时候,很可能是把一个数据库里所有表的binlog都推送到MQ里去的!我们看下面的图
notion image
所以在MQ的某个Topic中,可能是混杂了订单数据库里几个甚至十几个表的binlog数据的,不一定仅仅包含我们想要的表的binlog数据!

处理不关注的表的binlog

假设我们的大数据系统仅仅关注订单数据库中的表A的binlog,并不关注其他表的binlog,那么大数据系统可能需要在获取到所有表的binlog之后,对每条binlog判断一下,是否是表A的binlog?
如果不是表A的binlog,那么就直接丢弃不要处理;如果是表A的binlog,才会去进行处理!
但是这样的话,必然会导致大数据系统处理很多不关注的表的binlog,也会很浪费时间,降低消息的效率,我们看下图
notion image

RocketMQ支持的数据过滤机制

在发送消息的时候,给消息设置tag和属性

我们可以采用RocketMQ支持的数据过滤机制,来让大数据系统仅仅关注他想要的表的binlog数据即可。
首先,我们在发送消息的时候,可以给消息设置tag和属性,我们看下面的代码。
notion image
上面的代码清晰的展示了我们发送消息的时候,其实是可以给消息设置tag、属性等多个附加的信息的

在消费数据的时候根据tag和属性进行过滤

接着我们可以在消费的时候根据tag和属性进行过滤,比如我们可以通过下面的代码去指定,我们只要tag=TableA和tag=TableB的数据。
notion image
或者我们也可以通过下面的语法去指定,我们要根据每条消息的属性的值进行过滤,此时可以支持一些语法,比如:
notion image
RocketMQ还是支持比较丰富的数据过滤语法的,如下所示: (1)数值比较,比如:>,>=,<,<=,BETWEEN,=; (2)字符比较,比如:=,<>,IN; (3)IS NULL 或者 IS NOT NULL; (4)逻辑符号 AND,OR,NOT; (5)数值,比如:123,3.1415; (6)字符,比如:'abc',必须用单引号包裹起来; (7)NULL,特殊的常量 (8)布尔值,TRUE 或 FALSE
 
生产项目中,合理的规划Topic和里面的tags,一个Topic代表了一类业务消息数据,然后对于这类业务消息数据,如果你希望继续划分一些类别的话,可以在发送消息的时候设置tags。
举个例子,比如我们都知道现在常见的外卖平台有美团外卖、饿了么外卖还有别的一些外卖,那么假设你现在一个系统要发送外卖订单数据到MQ里去,就可以针对性的设置tags,比如不同的外卖数据都到一个“WaimaiOrderTopic”里去。
但是不同类型的外卖可以有不同的tags:“meituan_waimai”,“eleme_waimai”,“other_waimai”,等等。
然后对你消费“WaimaiOrderTopic”的系统,可以根据tags来筛选,可能你就需要某一种类别的外卖数据罢了。