RocketMQ的可靠性传输

博客 动态
0 251
羽尘
羽尘 2022-03-10 16:57:04
悬赏:0 积分 收藏

RocketMQ的可靠性传输

引用消息队列时,如何保证消息的可靠传输是一大难点,本文推荐了一个尽最大可能去避免消息丢失的解决方案。

整体

分析:

需确保一发一存一消费这些过程均无消息丢失

利用ACK机制保证每个阶段需要执行的操作成功后,再往下一个阶段推动(放行)

实现:

Producer——>Broker

  • 发送方式

    • 同步发送

      • Producer向broker发送消息,会阻塞当前线程等待broker响应结果
      public class SyncProducer {	public static void main(String[] args) throws Exception {    	// 实例化消息生产者Producer        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");    	// 设置NameServer的地址	    	producer.setNamesrvAddr("localhost:9876");    	// 启动Producer实例        producer.start();    	for (int i = 0; i < 100; i++) {    	    // 创建消息,并指定Topic,Tag和消息体    	    Message msg = new Message("TopicTest" /* Topic */,        	"TagA" /* Tag */,        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */        	);        	// 发送消息到一个Broker            SendResult sendResult = producer.send(msg);            // 通过sendResult返回消息是否成功送达            System.out.printf("%s%n", sendResult);    	}    	// 如果不再发送消息,关闭Producer实例。    	producer.shutdown();    }}
    • 异步发送

      • Producer首先构建一个向broker发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果
      public class AsyncProducer {	public static void main(String[] args) throws Exception {    	// 实例化消息生产者Producer        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");    	// 设置NameServer的地址        producer.setNamesrvAddr("localhost:9876");    	// 启动Producer实例        producer.start();        producer.setRetryTimesWhenSendAsyncFailed(0);		int messageCount = 100;        // 根据消息数量实例化倒计时计算器	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);    	for (int i = 0; i < messageCount; i++) {                final int index = i;            	// 创建消息,并指定Topic,Tag和消息体                Message msg = new Message("TopicTest",                    "TagA",                    "OrderID188",                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));                // SendCallback接收异步返回结果的回调                producer.send(msg, new SendCallback() {                    @Override                    public void onSuccess(SendResult sendResult) {                        countDownLatch.countDown();                        System.out.printf("%-10d OK %s %n", index,                            sendResult.getMsgId());                    }                    @Override                    public void onException(Throwable e) {                        countDownLatch.countDown();      	                System.out.printf("%-10d Exception %s %n", index, e);      	                e.printStackTrace();                    }            	});    	}	// 等待5s	countDownLatch.await(5, TimeUnit.SECONDS);    	// 如果不再发送消息,关闭Producer实例。    	producer.shutdown();    }}
    • Oneway

      • Oneway方式只负责发送请求,不等待应答,Producer只负责把请求发出去,不会处理响应结果
      public class OnewayProducer {	public static void main(String[] args) throws Exception{    	// 实例化消息生产者Producer        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");    	// 设置NameServer的地址        producer.setNamesrvAddr("localhost:9876");    	// 启动Producer实例        producer.start();    	for (int i = 0; i < 100; i++) {        	// 创建消息,并指定Topic,Tag和消息体        	Message msg = new Message("TopicTest" /* Topic */,                "TagA" /* Tag */,                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */        	);        	// 发送单向消息,没有任何返回结果        	producer.sendOneway(msg);    	}    	// 如果不再发送消息,关闭Producer实例。    	producer.shutdown();    }}
    ?? 使用producer.send()方法时,不指定回调函数,则默认使用的发送消息方式为同步发送
  • 推荐

    同步发送:

    • 同步发送会返回四个状态码
      • SEND_OK:消息发送成功
      • FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时
      • FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时
      • SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用
    • 处理
      • 根据返回的状态码,进行消息重试,默认设置为3次,可以通过设置调整

        producer.setRetryTimesWhenSendFailed(重试次数);

    异步发送:

    • 在onException()方法中处理,如果发送失败,则在这里执行重试

    额外问题:

    • 如果Broker收到消息后,就因为某些原因宕机了,就算Producer再怎么重试都是无法解决消息丢失的问题,该如何处理?

    ?? 利用多主模式,挂了一个,就换一个master继续消息发送

总结:

保证Producer——>Broker消息不丢失的方案

?? 同步发送+重试机制+多主(master)模式

Broker存储及备份

  • 刷盘

    • 同步刷盘

      • 消息写入内存后,立刻调用刷盘线程进行刷盘
      • 如果消息在约定的时间内未刷盘成功(默认5s),则返回FLUSH_DISK_TIMEOUT,Producer收到后进行重试
      ?? 同步刷盘可以保证消息不丢失,但降低了吞吐量,还增加了系统延迟
    • 异步刷盘(默认

      • 消息写入CommitLog时,不会直接写入磁盘,而是先写到PageCache缓存后返回成功
      • 启用后台线程异步将消息刷入磁盘
      ?? 异步刷盘提高了消息吞吐量,提升了请求处理能力,却带来了消息可能会丢失问题
?? 配置:flushDiskType=SYNC_FLUSH————SYNC_FLUSH(同步) ASYNC_FLUSH(异步)
  • 高可用
    • 多主
      • 多个Master节点,防止单主宕机,丢失消息问题
    • 主从+双写
      • 主从的情况下(写入master成功后立即ACK给Producer),会发生,master——>slave时,主节点Broker宕机,同步失败,从而导致消息丢失
      • 开启双写,只有等master和slave都写入成功,即双写成功后才会ACK给Producer,否则,会触发Producer的重试机制

总结

保证Broker存储及备份阶段,消息不丢失

?? 多主从+双写+同步刷盘

Broker——>Consumer

  • 消息确认

    • 消费者从Broker中拉去消息后,不是立马给Broker返回ack确认消息,而是等待业务代码顺利执行完成之后,再给Broker返回ack确认消息
  • 消息重试

    • 消息消费失败后,需提供重试消息的能力,RocketMQ本身提供了重新消费的能力

    总结

    保证Broker——>Consumer阶段,消息不丢失

    ?? 消息确认+重新消费

最终方案:

?? 同步发送+重试机制+多主从+主从双写+同步刷盘+消息确认+重新消费
posted @ 2022-03-10 16:54 Carson-Zhao 阅读(0) 评论(0) 编辑 收藏 举报
回帖
    羽尘

    羽尘 (王者 段位)

    2335 积分 (2)粉丝 (11)源码

     

    温馨提示

    亦奇源码

    最新会员