golang监听rabbitmq消息队列任务断线自动重连接

博客 动态
0 181
羽尘
羽尘 2022-03-03 14:56:03
悬赏:0 积分 收藏
golang监听rabbitmq消息队列任务断线自动重连接
golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务

需求背景:

goalng常驻内存任务脚本监听rbmq执行任务

任务脚本由supervisor来管理

  当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态

 

 假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了

如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员

 

 如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。

 

 

 

 代码实现一:

消费者:

package mainimport (    "fmt"    "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq")type RecvPro struct {}//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db/*返回值 error 为nil  则表示该消息消费成功否则消息会进入ttl延时队列  重复尝试消费3次3次后消息如果还是失败 消息就执行失败  进入告警 FailAction */func (t *RecvPro) Consumer(dataByte []byte) error {    //time.Sleep(500*time.Microsecond)    //return errors.New("顶顶顶顶")    fmt.Println(string(dataByte))    //time.Sleep(1*time.Second)    //return errors.New("顶顶顶顶")    return nil}//消息已经消费3次 失败了 请进行处理/*如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等 */func (t *RecvPro) FailAction(err error,dataByte []byte) error {    fmt.Println(string(dataByte))    fmt.Println(err)    fmt.Println("任务处理失败了,我要进入db日志库了")    fmt.Println("任务处理失败了,发送钉钉消息通知主人")    return nil}func main() {    t := &RecvPro{}    //rabbitmq.Recv(rabbitmq.QueueExchange{    //    "a_test_0001",    //    "a_test_0001",    //    "",    //    "",    //    "amqp://guest:guest@192.168.2.232:5672/",    //},t,5)    /*        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了     */    err := rabbitmq.Recv(rabbitmq.QueueExchange{        "a_test_0001",        "a_test_0001",        "hello_go",        "direct",        "amqp://guest:guest@192.168.1.169:5672/",    },t,4)    if(err != nil){        fmt.Println(err)    }}
View Code

rabbitmq代码

package rabbitmqimport (    "errors"    "strconv"    "time"    //"errors"    "fmt"    "github.com/streadway/amqp"    "log")// 定义全局变量,指针类型var mqConn *amqp.Connectionvar mqChan *amqp.Channel// 定义生产者接口type Producer interface {    MsgContent() string}// 定义生产者接口type RetryProducer interface {    MsgContent() string}// 定义接收者接口type Receiver interface {    Consumer([]byte)    error    FailAction(error , []byte)  error}// 定义RabbitMQ对象type RabbitMQ struct {    connection *amqp.Connection    Channel *amqp.Channel    dns string    QueueName   string            // 队列名称    RoutingKey  string            // key名称    ExchangeName string           // 交换机名称    ExchangeType string           // 交换机类型    producerList []Producer    retryProducerList []RetryProducer    receiverList []Receiver}// 定义队列交换机对象type QueueExchange struct {    QuName  string           // 队列名称    RtKey   string           // key值    ExName  string           // 交换机名称    ExType  string           // 交换机类型    Dns     string              //链接地址}// 链接rabbitMQfunc (r *RabbitMQ)MqConnect() (err error){    mqConn, err = amqp.Dial(r.dns)    r.connection = mqConn   // 赋值给RabbitMQ对象    if err != nil {        fmt.Printf("rbmq链接失败  :%s \n", err)    }    return}// 关闭mq链接func (r *RabbitMQ)CloseMqConnect() (err error){    err = r.connection.Close()    if err != nil{        fmt.Printf("关闭mq链接失败  :%s \n", err)    }    return}// 链接rabbitMQfunc (r *RabbitMQ)MqOpenChannel() (err error){    mqConn := r.connection    r.Channel, err = mqConn.Channel()    //defer mqChan.Close()    if err != nil {        fmt.Printf("MQ打开管道失败:%s \n", err)    }    return err}// 链接rabbitMQfunc (r *RabbitMQ)CloseMqChannel() (err error){    r.Channel.Close()    if err != nil {        fmt.Printf("关闭mq链接失败  :%s \n", err)    }    return err}// 创建一个新的操作对象func NewMq(q QueueExchange) RabbitMQ {    return RabbitMQ{        QueueName:q.QuName,        RoutingKey:q.RtKey,        ExchangeName: q.ExName,        ExchangeType: q.ExType,        dns:q.Dns,    }}func (mq *RabbitMQ) sendMsg (body string) (err error)  {    err = mq.MqOpenChannel()    ch := mq.Channel    if err != nil{        log.Printf("Channel err  :%s \n", err)    }    defer mq.Channel.Close()    if mq.ExchangeName != "" {        if mq.ExchangeType == ""{            mq.ExchangeType = "direct"        }        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)        if err != nil {            log.Printf("ExchangeDeclare err  :%s \n", err)        }    }    // 用于检查队列是否存在,已经存在不需要重复声明    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)    if err != nil {        log.Printf("QueueDeclare err :%s \n", err)    }    // 绑定任务    if mq.RoutingKey != "" && mq.ExchangeName != "" {        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)        if err != nil {            log.Printf("QueueBind err :%s \n", err)        }    }    if mq.ExchangeName != "" && mq.RoutingKey != ""{        err = mq.Channel.Publish(            mq.ExchangeName,     // exchange            mq.RoutingKey, // routing key            false,  // mandatory            false,  // immediate            amqp.Publishing {                ContentType: "text/plain",                Body:        []byte(body),            })    }else{        err = mq.Channel.Publish(            "",     // exchange            mq.QueueName, // routing key            false,  // mandatory            false,  // immediate            amqp.Publishing {                ContentType: "text/plain",                Body:        []byte(body),            })    }    return}/*发送延时消息 */func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){    err =mq.MqOpenChannel()    ch := mq.Channel    if err != nil{        log.Printf("Channel err  :%s \n", err)    }    defer mq.Channel.Close()    if mq.ExchangeName != "" {        if mq.ExchangeType == ""{            mq.ExchangeType = "direct"        }        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)        if err != nil {            return        }    }    if ttl <= 0{        return errors.New("发送延时消息,ttl参数是必须的")    }    table := make(map[string]interface{},3)    table["x-dead-letter-routing-key"] = mq.RoutingKey    table["x-dead-letter-exchange"] = mq.ExchangeName    table["x-message-ttl"] = ttl*1000    //fmt.Printf("%+v",table)    //fmt.Printf("%+v",mq)    // 用于检查队列是否存在,已经存在不需要重复声明    ttlstring := strconv.FormatInt(ttl,10)    queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)    routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)    _, err = ch.QueueDeclare(queueName, true, false, false, false, table)    if err != nil {        return    }    // 绑定任务    if routingKey != "" && mq.ExchangeName != "" {        err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)        if err != nil {            return        }    }    header := make(map[string]interface{},1)    header["retry_nums"] = 0    var ttl_exchange string    var ttl_routkey string    if(mq.ExchangeName != "" ){        ttl_exchange = mq.ExchangeName    }else{        ttl_exchange = ""    }    if mq.RoutingKey != "" && mq.ExchangeName != ""{        ttl_routkey = routingKey    }else{        ttl_routkey = queueName    }    err = mq.Channel.Publish(        ttl_exchange,     // exchange        ttl_routkey, // routing key        false,  // mandatory        false,  // immediate        amqp.Publishing {            ContentType: "text/plain",            Body:        []byte(body),            Headers:header,        })    if err != nil {        return    }    return}func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {    err :=mq.MqOpenChannel()    ch := mq.Channel    if err != nil{        log.Printf("Channel err  :%s \n", err)    }    defer mq.Channel.Close()    if mq.ExchangeName != "" {        if mq.ExchangeType == ""{            mq.ExchangeType = "direct"        }        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)        if err != nil {            log.Printf("ExchangeDeclare err  :%s \n", err)        }    }    //原始路由key    oldRoutingKey := args[0]    //原始交换机名    oldExchangeName := args[1]    table := make(map[string]interface{},3)    table["x-dead-letter-routing-key"] = oldRoutingKey    if oldExchangeName != "" {        table["x-dead-letter-exchange"] = oldExchangeName    }else{        mq.ExchangeName = ""        table["x-dead-letter-exchange"] = ""    }    table["x-message-ttl"] = int64(20000)    //fmt.Printf("%+v",table)    //fmt.Printf("%+v",mq)    // 用于检查队列是否存在,已经存在不需要重复声明    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)    if err != nil {        log.Printf("QueueDeclare err :%s \n", err)    }    // 绑定任务    if mq.RoutingKey != "" && mq.ExchangeName != "" {        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)        if err != nil {            log.Printf("QueueBind err :%s \n", err)        }    }    header := make(map[string]interface{},1)    header["retry_nums"] = retry_nums + int32(1)    var ttl_exchange string    var ttl_routkey string    if(mq.ExchangeName != "" ){        ttl_exchange = mq.ExchangeName    }else{        ttl_exchange = ""    }    if mq.RoutingKey != "" && mq.ExchangeName != ""{        ttl_routkey = mq.RoutingKey    }else{        ttl_routkey = mq.QueueName    }    //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)    err = mq.Channel.Publish(        ttl_exchange,     // exchange        ttl_routkey, // routing key        false,  // mandatory        false,  // immediate        amqp.Publishing {            ContentType: "text/plain",            Body:        []byte(body),            Headers:header,        })    if err != nil {        fmt.Printf("MQ任务发送失败:%s \n", err)    }}// 监听接收者接收任务 消费者func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {    err :=mq.MqOpenChannel()    ch := mq.Channel    if err != nil{        log.Printf("Channel err  :%s \n", err)    }    defer mq.Channel.Close()    if mq.ExchangeName != "" {        if mq.ExchangeType == ""{            mq.ExchangeType = "direct"        }        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)        if err != nil {            log.Printf("ExchangeDeclare err  :%s \n", err)        }    }    // 用于检查队列是否存在,已经存在不需要重复声明    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)    if err != nil {        log.Printf("QueueDeclare err :%s \n", err)    }    // 绑定任务    if mq.RoutingKey != "" && mq.ExchangeName != "" {        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)        if err != nil {            log.Printf("QueueBind err :%s \n", err)        }    }    // 获取消费通道,确保rabbitMQ一个一个发送消息    err =  ch.Qos(1, 0, false)    msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)    if err != nil {        log.Printf("Consume err :%s \n", err)    }    for msg := range msgList {        retry_nums,ok := msg.Headers["retry_nums"].(int32)        if(!ok){            retry_nums = int32(0)        }        // 处理数据        err := receiver.Consumer(msg.Body)        if err!=nil {            //消息处理失败 进入延时尝试机制            if retry_nums < 3{                fmt.Println(string(msg.Body))                fmt.Printf("消息处理失败 消息开始进入尝试  ttl延时队列 \n")                retry_msg(msg.Body,retry_nums,QueueExchange{                        mq.QueueName,                        mq.RoutingKey,                        mq.ExchangeName,                        mq.ExchangeType,                        mq.dns,                    })            }else{                //消息失败 入库db                fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n")                receiver.FailAction(err,msg.Body)            }            err = msg.Ack(true)            if err != nil {                fmt.Printf("确认消息未完成异常:%s \n", err)            }        }else {            // 确认消息,必须为false            err = msg.Ack(true)            if err != nil {                fmt.Printf("消息消费ack失败 err :%s \n", err)            }        }    }}//消息处理失败之后 延时尝试func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){    //原始队列名称 交换机名称    oldQName := queueExchange.QuName    oldExchangeName := queueExchange.ExName    oldRoutingKey := queueExchange.RtKey    if oldRoutingKey == "" || oldExchangeName == ""{        oldRoutingKey = oldQName    }    if queueExchange.QuName != "" {        queueExchange.QuName = queueExchange.QuName + "_retry_3";    }    if queueExchange.RtKey != "" {        queueExchange.RtKey = queueExchange.RtKey + "_retry_3";    }else{        queueExchange.RtKey = queueExchange.QuName + "_retry_3";    }//fmt.Printf("%+v",queueExchange)    mq := NewMq(queueExchange)    _ = mq.MqConnect()    defer func(){        _ = mq.CloseMqConnect()    }()    //fmt.Printf("%+v",queueExchange)    mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)}func Send(queueExchange QueueExchange,msg string) (err error){    mq := NewMq(queueExchange)    err = mq.MqConnect()    if err != nil{        return    }    defer func(){        mq.CloseMqConnect()    }()    err = mq.sendMsg(msg)    return}//发送延时消息func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){    mq := NewMq(queueExchange)    err = mq.MqConnect()    if err != nil{        return    }    defer func(){        _ = mq.CloseMqConnect()    }()    err = mq.sendDelayMsg(msg,ttl)    return}/*runNums  开启并发执行任务数量 */func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){    mq := NewMq(queueExchange)    //链接rabbitMQ    err = mq.MqConnect()    if(err != nil){        return    }    //rbmq断开链接后 协程退出释放信号    taskQuit:= make(chan struct{}, 1)    //尝试链接rbmq    tryToLinkC := make(chan struct{}, 1)    //开始执行任务    for i:=1;i<=runNums;i++{        go Recv2(mq,receiver,taskQuit);    }    //如果rbmq断开连接后 尝试重新建立链接    var tryToLink = func() {        for {            err = mq.MqConnect()            if(err == nil){                tryToLinkC <- struct{}{}                break            }            time.Sleep(time.Second * 10)        }    }    for{        select {        case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接             go tryToLink()            <-tryToLinkC //建立链接成功后 重新开启协程执行任务            fmt.Println("重新开启新的协程执行任务")            go Recv2(mq,receiver,taskQuit);        }        time.Sleep(time.Millisecond*100)    }}func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){        defer func() {            fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")            taskQuit <- struct{}{}            return        }()        // 验证链接是否正常        err := mq.MqOpenChannel()        if(err != nil){            return        }        mq.ListenReceiver(receiver)}type retryPro struct {    msgContent   string}
View Code

 

实现重连方式很多,下面实现方式比较简单

  1. Recv方法创建ampq链接
  2. 启动协程开始执行任务
    1.   MqOpenChannel 打开一个channel通道处理amqp消息
    2.        拿到消息 处理任务

   3,协程中捕获异常发送消息到 taskQuit <- struct{}{}

  4,主进程监听taskQuit管道 开始尝试重新链接amqp  直到链接成功

  5,重新链接成功后启动新的协程处理任务

主要代码分析:

/*runNums  开启并发执行任务数量 */func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){    mq := NewMq(queueExchange)    //链接rabbitMQ    err = mq.MqConnect()    if(err != nil){        return    }    //rbmq断开链接后 协程退出释放信号    taskQuit:= make(chan struct{}, 1)    //尝试链接rbmq    tryToLinkC := make(chan struct{}, 1)    //开始执行任务    for i:=1;i<=runNums;i++{        go Recv2(mq,receiver,taskQuit);    }    //如果rbmq断开连接后 尝试重新建立链接    var tryToLink = func() {        for {            err = mq.MqConnect()            if(err == nil){                tryToLinkC <- struct{}{}                break            }            time.Sleep(time.Second * 10)        }    }    for{        select {        case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接             go tryToLink()            <-tryToLinkC //建立链接成功后 重新开启协程执行任务            fmt.Println("重新开启新的协程执行任务")            go Recv2(mq,receiver,taskQuit);        }        time.Sleep(time.Millisecond*100)    }}func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){        defer func() {            fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")            taskQuit <- struct{}{}            return        }()        // 验证链接是否正常        err := mq.MqOpenChannel()        if(err != nil){            return        }        mq.ListenReceiver(receiver)}

 

posted on 2022-03-03 14:50 孙龙-程序员 阅读(1) 评论(0) 编辑 收藏 举报
回帖
    羽尘

    羽尘 (王者 段位)

    2335 积分 (2)粉丝 (11)源码

     

    温馨提示

    亦奇源码

    最新会员