MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
上一节的 MQ系列3:RocketMQ 架构分析,我们大致介绍了 RocketMQ的基本组件构成,包括 NameServer、Broker、Producer以及Consumer四部分。
NameServer,指的是服务可以根据给定的名字来进行资源或对象的地址定位,并获取有关的属性信息。在Rocket中也一样,NameServer是 RocketMQ 的服务注册中心(类似于 Kafka 集群 后面的 Zookeeper 集群一样, 对集群元数据进行管理),根据元数据(ip、port和router信息)来唯一定位服务。RocketMQ 需要先启动 NameServer ,再启动 Rocket 中的 Broker。
注册发生在Broker启动之后,启动后快速与NameServer建立长连接,并每30s对NameService发送一次心跳包,Broker会将自己的IP Address、Port、Router 等信息随着心跳一并注册到 NameServer中。
这里的RouterInfo 主要指Broker下包含哪些Topic信息,这种映射关系方便后面消息的生产和消费的时候进行寻址。
注册使用到的核心数据结构如下:HashMap<String BrokerName, BrokerData> brokerAddrTable
| 字段 | 类型 | 说明 |
|---|---|---|
| cluster | String | 所属的集群名称 |
| broker | String | broker的名称 |
| brokerAddress | HashMap | Broker的IP地址列表,包含一个Master IP地址列表 和 多个Slave IP地址列表 |
" Broker-A":{ "cluster":"Broker-Cluster", "brokerName":"Broker-A", "cluster":{ // 1主2从 "0":"192.168.0.1:1234", "1":"192.168.0.2:1234", "2":"192.168.0.3:1234" }}当你对你的Broker中的Topic信息进行更新了(增、删、改)怎么办,你才需要重新将信息注册到NameServer中。

使用到的核心数据结构如下:HashMap<String topic, List<QueueData>> topicQueueTable
| 字段 | 类型 | 说明 |
|---|---|---|
| brokerName | String | broker名称 |
| readQueueNums | Long | 读Queue的数量 |
| writeQueueNums | Long | 写Queue的数量 |
| perm | Integer | 权限 PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0 |
| topicSyncFlag | Long | 同步的位置标识 |
{ "topic-test":[ // topic名称,注意下面会用到 { "brokerName":"Broker-A", "readQueueNums":37, "writeQueueNums":37, "perm":6, // 读写权限 "topicSynFlag":12 }, { "brokerName":"Broker-B", "readQueueNums":37, "writeQueueNums":37, "perm":6, // 读写权限 "topicSynFlag":12 } ]}参考RocketMQ源码如下,这边加了注释,方便理解:
/** * 创建或者更新 MessageQueue 的数据 * @param brokerName * @param topicConfig */ private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); // broker 名称 queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); // 读Queue的数量 queueData.setReadQueueNums(topicConfig.getReadQueueNums()); // 写Queue的数量 queueData.setPerm(topicConfig.getPerm()); // 权限: PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0 queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataList) { // 新增 queueDataList = new LinkedList<QueueData>(); queueDataList.add(queueData); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData); } else { // 更新 boolean addNewOne = true; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); if (qd.getBrokerName().equals(brokerName)) { if (qd.equals(queueData)) { addNewOne = false; } else { log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); // 先删除 } } } if (addNewOne) { queueDataList.add(queueData); // 再添加 } } }如果Broker挂掉,那么再被消息的生产者和消费者使用就会有问题了。这时候需要对已经宕掉的Broker进行清理,确保NamServer中注册的Broker服务信息都是Alive的。它的做法是这样的:
brokerLiveTable 表的 lastUpdateTimestamp 字段中。brokerLiveTable 表lastUpdateTimestamp字段,如果时间戳与当前时间相隔超过 120s(即两分钟),则认为 Broker 已经宕了,并会将broker清除出NameServer的注册表。
使用到的核心数据结构如下:HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable
| 字段 | 类型 | 说明 |
|---|---|---|
| lastUpdateTimestamp | Long | 最后一次收到心跳包的时间戳 |
| dataVersion | DataVersion | 数据版本号对象 |
| channel | Channel | netty的Channel,IO数据交互媒介 |
| haServerAddr | String | master地址,初次请求的时候值为空,slave向NameServer注册之后返回 |
上面的步骤都完成之后,NameServer这个 "中央大脑" 正式开始投入使用。这时候 ,消息的生产和消费具体是怎么做的呢?
hello-brand 到 topic (topic-test) 中topic-test 的 topic 存在于多个 broker中,所以需要如下几个步骤,才能找到具体的地址:topic-test 查询 topicQueueTable , 选择一个并获取它的broker信息(包含brokerName)brokerAddressTable 获取具体的Broker IP地址(一般包含1个Master和n个Slave的IP地址)

上述的流程图比较清晰的描述如下运转流程:
brokerLiveTable表 , 如果检测到某个Broker 宕机(因为使用心跳机制, 如果检测超120s(两分钟)无上报心跳),则从路由注册表中将其移除。topicQueueTable获得broker名称,通过broker名称查询brokerAddressTable获取具体的Broker IP地址),然后根据负载均衡算法从列表中选择1台Broker ,建立连接通道,进行消息发送。参考:
https://zhuanlan.zhihu.com/p/388807516
