SpringBoot整合RabbitMQ实战附加死信交换机

博客 分享
0 197
张三
张三 2022-06-10 20:00:20
悬赏:0 积分 收藏

SpringBoot整合RabbitMQ实战附加死信交换机

SpringBoot整合RabbitMQ实战附加死信交换机

前言

使用springboot,实现以下功能,有两个队列1、2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库、报警等
完整demo项目代码https://gitee.com/daenmax/rabbit-mq-demo

环境

Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.4
1.双击C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat启动MQ服务
2.然后访问http://localhost:15672/,默认账号密码均为guest,
3.手动添加一个虚拟主机为admin_host,手动创建一个用户账号密码均为admin

pom.xml

        <!-- RabbitMQ -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>            <version>2.7.0</version>        </dependency>

配置

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: admin    password: admin    virtual-host: admin_host    publisher-confirm-type: correlated    publisher-returns: true    listener:      simple:        acknowledge-mode: manual        retry:          enabled: true    #开启失败重试          max-attempts: 3    #最大重试次数          initial-interval: 1000  #重试间隔时间 毫秒

配置文件

RabbitConfig

package com.example.rabitmqdemo.mydemo.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.HashMap;import java.util.Map;/** * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 * Queue:消息的载体,每个消息都会被投到一个或多个队列。 * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来. * Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 * vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。 * Producer:消息生产者,就是投递消息的程序. * Consumer:消息消费者,就是接受消息的程序. * Channel:消息通道,在客户端的每个连接里,可建立多个channel. */@Slf4j@Componentpublic class RabbitConfig {    //业务交换机    public static final String EXCHANGE_PHCP = "phcp";    //业务队列1    public static final String QUEUE_COMPANY = "company";    //业务队列1的key    public static final String ROUTINGKEY_COMPANY = "companyKey";    //业务队列2    public static final String QUEUE_PROJECT = "project";    //业务队列2的key    public static final String ROUTINGKEY_PROJECT = "projectKey";    //死信交换机    public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";    //死信队列1    public static final String QUEUE_COMPANY_DEAD = "company_dead";    //死信队列2    public static final String QUEUE_PROJECT_DEAD = "project_dead";    //死信队列1的key    public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";    //死信队列2的key    public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";//    /**//     * 解决重复确认报错问题,如果没有报错的话,就不用启用这个//     *//     * @param connectionFactory//     * @return//     *///    @Bean//    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//        factory.setConnectionFactory(connectionFactory);//        factory.setMessageConverter(new Jackson2JsonMessageConverter());//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//        return factory;//    }    /**     * 声明业务交换机     * 1. 设置交换机类型     * 2. 将队列绑定到交换机     * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念     * HeadersExchange :通过添加属性key-value匹配     * DirectExchange:按照routingkey分发到指定队列     * TopicExchange:多关键字匹配     */    @Bean("exchangePhcp")    public DirectExchange exchangePhcp() {        return new DirectExchange(EXCHANGE_PHCP);    }    /**     * 声明死信交换机     * 1. 设置交换机类型     * 2. 将队列绑定到交换机     * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念     * HeadersExchange :通过添加属性key-value匹配     * DirectExchange:按照routingkey分发到指定队列     * TopicExchange:多关键字匹配     */    @Bean("exchangePhcpDead")    public DirectExchange exchangePhcpDead() {        return new DirectExchange(EXCHANGE_PHCP_DEAD);    }    /**     * 声明业务队列1     *     * @return     */    @Bean("queueCompany")    public Queue queueCompany() {        Map<String,Object> arguments = new HashMap<>(2);        arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);        //绑定该队列到死信交换机的队列1        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);        return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();    }    /**     * 声明业务队列2     *     * @return     */    @Bean("queueProject")    public Queue queueProject() {        Map<String,Object> arguments = new HashMap<>(2);        arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);        //绑定该队列到死信交换机的队列2        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);        return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();    }    /**     * 声明死信队列1     *     * @return     */    @Bean("queueCompanyDead")    public Queue queueCompanyDead() {        return new Queue(QUEUE_COMPANY_DEAD);    }    /**     * 声明死信队列2     *     * @return     */    @Bean("queueProjectDead")    public Queue queueProjectDead() {        return new Queue(QUEUE_PROJECT_DEAD);    }    /**     * 绑定业务队列1和业务交换机     * @param queue     * @param directExchange     * @return     */    @Bean    public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);    }    /**     * 绑定业务队列2和业务交换机     * @param queue     * @param directExchange     * @return     */    @Bean    public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);    }    /**     * 绑定死信队列1和死信交换机     * @param queue     * @param directExchange     * @return     */    @Bean    public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);    }    /**     * 绑定死信队列2和死信交换机     * @param queue     * @param directExchange     * @return     */    @Bean    public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);    }}

生产者

RabbltProducer

package com.example.rabitmqdemo.mydemo.producer;import com.example.rabitmqdemo.mydemo.config.RabbitConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.nio.charset.StandardCharsets;import java.util.UUID;@Component@Slf4jpublic class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{    @Resource    private RabbitTemplate rabbitTemplate;    /**     * 初始化消息确认函数     */    @PostConstruct    public void init() {        rabbitTemplate.setConfirmCallback(this);        rabbitTemplate.setReturnsCallback(this);        rabbitTemplate.setMandatory(true);    }    /**     * 发送消息服务器确认函数     * @param correlationData     * @param ack     * @param cause     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        if (ack) {            System.out.println("消息发送成功" + correlationData);        } else {            System.out.println("消息发送失败:" + cause);        }    }    /**     * 消息发送失败,消息回调函数     * @param returnedMessage     */    @Override    public void returnedMessage(ReturnedMessage returnedMessage) {        String str = new String(returnedMessage.getMessage().getBody());        System.out.println("消息发送失败:" + str);    }    /**     * 处理消息发送到队列1     * @param str     */    public void sendCompany(String str){        MessageProperties messageProperties = new MessageProperties();        messageProperties.setContentType("application/json");        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);        //也可以用下面的方式        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);    }    /**     * 处理消息发送到队列2     * @param str     */    public void sendProject(String str){        MessageProperties messageProperties = new MessageProperties();        messageProperties.setContentType("application/json");        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);        //也可以用下面的方式        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);    }}

业务消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/** * 监听业务交换机 * @author JeWang */@Component@Slf4jpublic class RabbitConsumer {    /**     * 监听业务队列1     * @param message     * @param channel     * @throws IOException     */    @RabbitListener(queues = "company")    public void company(Message message, Channel channel) throws IOException {        try{            System.out.println("次数" + message.getMessageProperties().getDeliveryTag());            channel.basicQos(1);            Thread.sleep(2000);            String s = new String(message.getBody());            log.info("处理消息"+s);            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机            //String str = null;            //str.split("1");            //处理成功,确认应答            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }catch (Exception e){            log.error("处理消息时发生异常:"+e.getMessage());            Boolean redelivered = message.getMessageProperties().getRedelivered();            if(redelivered){                log.error("异常重试次数已到达设置次数,将发送到死信交换机");                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);            }else {                log.error("消息即将返回队列处理重试");                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);            }        }    }    /**     * 监听业务队列2     * @param message     * @param channel     * @throws IOException     */    @RabbitListener(queues = "project")    public void project(Message message, Channel channel) throws IOException {        try{            System.out.println("次数" + message.getMessageProperties().getDeliveryTag());            channel.basicQos(1);            Thread.sleep(2000);            String s = new String(message.getBody());            log.info("处理消息"+s);            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机            //String str = null;            //str.split("1");            //处理成功,确认应答            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }catch (Exception e){            log.error("处理消息时发生异常:"+e.getMessage());            Boolean redelivered = message.getMessageProperties().getRedelivered();            if(redelivered){                log.error("异常重试次数已到达设置次数,将发送到死信交换机");                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);            }else {                log.error("消息即将返回队列处理重试");                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);            }        }    }}

死信消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/** * 监听死信交换机 * @author JeWang */@Component@Slf4jpublic class RabbitConsumerDead {    /**     * 处理死信队列1     * @param message     * @param channel     * @throws IOException     */    @RabbitListener(queues = "company_dead")    public void company_dead(Message message, Channel channel) throws IOException {        try{            channel.basicQos(1);            String s = new String(message.getBody());            log.info("处理死信"+s);            //在此处记录到数据库、报警之类的操作            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }catch (Exception e){            log.error("接收异常:"+e.getMessage());        }    }    /**     * 处理死信队列2     * @param message     * @param channel     * @throws IOException     */    @RabbitListener(queues = "project_dead")    public void project_dead(Message message, Channel channel) throws IOException {        try{            channel.basicQos(1);            String s = new String(message.getBody());            log.info("处理死信"+s);            //在此处记录到数据库、报警之类的操作            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }catch (Exception e){            log.error("接收异常:"+e.getMessage());        }    }}

测试

MqController

package com.example.rabitmqdemo.mydemo.controller;import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RequestMapping("/def")@RestController@Slf4jpublic class MsgController {    @Resource    private RabbltProducer rabbltProducer;        @RequestMapping("/handleCompany")    public void handleCompany(@RequestBody String jsonStr){        rabbltProducer.sendCompany(jsonStr);    }}
posted @ 2022-06-10 19:12 DaenMax 阅读(15) 评论(0) 编辑 收藏 举报
回帖
    张三

    张三 (王者 段位)

    821 积分 (2)粉丝 (41)源码

     

    温馨提示

    亦奇源码

    最新会员