MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
在之前的篇章中,我们学习了RocketMQ的原理,以及RocketMQ中 命名服务 ServiceName 的运行流程,本篇从消息的生产、消费来理解一条消息的生命周期。
在RocketMQ中,消息生产指的是 消息生产者往消息队列中写入数据的过程。因为业务场景的复杂性,RocketMQ架构设计了多种不同的发送策略。下面先讨论几种常见的场景:
-** 同步发送:** 整个过程业务是阻塞等待的,消息发送之后等待 Broker 响应,得到响应结果之后再传递给业务线程。

一般情况下,我们发送消息,会使用默认的DefaultMQProducer类,经过以下几个步骤实现:
消息发送之后,会相应的拿到回执。返回对象中的状态(SendResult.SendStatus)有4种,如下:
SYNC_FLUSH,那么没有在规定的时间完成刷盘则会报该错误。SYNC_MASTER模式,如果没有在设定时间内完成主从同步,则会报该错误。SYNC_MASTER,如果未找到Slave的Broker,则会报该错误。实时同步消息是一种对可靠性、实时性要求比较高的场景,使用的也比较广泛,比如:
public class SyncProducerApplication { public static void main(String[] args) throws Exception { // 1、创建生产者producer,并指定生产者组名为 testSyncGroup DefaultMQProducer producer = new DefaultMQProducer("testSyncGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息,并指定Topic,Tag和消息体 Message msg = new Message("testTopic","sync", "测试同步消息".getBytes("UTF-8")); // 5、发送消息到一个Broker SendResult sendResult = producer.send(msg); // 6、通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); // 7、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}
我们知道,异步主要用于那些对实时响应不敏感的业务,可以容忍一定时间的等待,只要能达到最终一致性即可。
有时候为了在流量高峰期进行削峰和分流,缓解压力,我们经常采用异步消息的发送模式。这种业务场景也很常见,比如:
public class AsyncProducerApplication { public static void main(String[] args) throws Exception { // 1、创建生产者producer,并指定生产者组名为 testAsyncGroup DefaultMQProducer producer = new DefaultMQProducer("testAsyncGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息,并指定Topic,Tag和消息体 Message msg = new Message("testTopic","async", "测试异步消息".getBytes("UTF-8")); // 5、发送消息到一个Broker SendResult sendResult = producer.send(msg); // 6. 发送异步消息,SendCallback是处理异步回调的方法 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 成功回调 System.out.println("success: " + sendResult); } @Override public void onException(Throwable throwable) { // 失败回调 System.out.println("fail: " + throwable); } }); // 7、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}
OneWay的模式主要用在Care发送结果的场景,只要消息发送出去即完成任务,不需要对发送的状态、结果负责。常见的使用场景如
public class OneWayProducerApplication { public static void main(String[] args) throws Exception { // 1、创建生产者producer,并指定生产者组名为 testOneWayGroup DefaultMQProducer producer = new DefaultMQProducer("testOneWayGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息,并指定Topic,Tag和消息体 Message msg = new Message("testTopic","oneway", "测试单向发送消息".getBytes("UTF-8")); // 5、发送消息到一个Broker producer.sendOneway(msg); // 6、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}
指定延迟的时间,在延迟时间到达之后再进行消息的发送。这种的使用场景也很多:
延时时间并不是随意指定的,Rocket源码中指定了18种等级,分别代表不同的时间时长,如下:
// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";通过下面的代码,可以得到的结果是消费的时间点比信息记录的时间点延迟了1分钟,这是因为我们在send的时候做了delay。
public class DelayProducerApplication { public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException { // 1、创建生产者producer,并指定生产者组名为 testDelayGroup DefaultMQProducer producer = new DefaultMQProducer("testDelayGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息,并指定Topic,Tag和消息体 Message msg = new Message("testTopic","delay", "测试延迟发送消息".getBytes("UTF-8")); // 5、设置延时等级4,对应1m,所以这个消息在一分钟之后发送 msg.setDelayTimeLevel(4); // 6、发送消息到一个Broker SendResult sendResult = producer.send(msg); // 7、通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); // 8、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}
waitStoreMsgOK: 消息发送时是否等消息存储完成后再返回。
public class BatchProducerApplication { public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException { // 1、创建生产者producer,并指定生产者组名为 testBatchGroup DefaultMQProducer producer = new DefaultMQProducer("testBatchGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息列表,并指定Topic,Tag和消息体 List<Message> messages = new ArrayList<>(); String topic = "testTopic"; messages.add(new Message(topic, "batch", "测试批量发送消息 0".getBytes("UTF-8"))); messages.add(new Message(topic, "batch", "测试批量发送消息 1".getBytes("UTF-8"))); messages.add(new Message(topic, "batch", "测试批量发送消息 2".getBytes("UTF-8"))); // 5、发送消息到一个Broker SendResult sendResult = producer.send(messages); // 6、通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); // 7、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}消息的发送一般是经过 client发送、Broker服务器接收并处理、Broker服务器返回应答 三个步骤。
如果我们想要提高消息生产的效率,一般有如下方法:
根据阿里内部调优后的性能测试报告,消息的写入性能达到90万+的TPS,我们可以朝着这个指标进行优化。
本篇介绍了RocketMQ 消息生产与发送的几种模式:
