Giter VIP home page Giter VIP logo

go-amqp-reconnect's Introduction

streadway/amqp Conneciton/Channel auto reconnect wrap

streadway/amqp Connection/Channel does not reconnect if rabbitmq server restart/down.

To simply developers, here is auto reconnect wrap with detail comments.

How to change existing code

  1. add import import "github.com/isayme/go-amqp-reconnect/rabbitmq"
  2. Replace amqp.Connection/amqp.Channel with rabbitmq.Connection/rabbitmq.Channel!

Example

Close by developer

go run example/close/demo.go

Auto reconnect

go run example/reconnect/demo.go

go-amqp-reconnect's People

Contributors

andygrunwald avatar isayme avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

go-amqp-reconnect's Issues

Implementing a rabbitmq transport object

So I tried to create a rabbitmq transport element containing the connection, channel for sending, channel for consuming and  declaring the queues and exchange.
After losing connection it reconnects well, but after rabbitmq resets the queues and exchange get deleted and have to be recreated.
eventhough the connection gets replaced with the fixed one in the package, it still gives me an error of channel/connection are closed.
Also I didn't succeed checking if connection.IsClosed as it always returns false.

rabbitmq version: 3.8.9
environment: local k3s cluster

Thanks

Reuse connection instead of reusing channel

The implementation of channel is very cpu intensive if we create channel per go routine. It would be more cpu friendy if we implement the reconnect over a connection and not over the channel itself, possible provide an config option to not auto reconnect for channel.

connection判断连接是否关闭存在错误

reason, ok := <-connection.Connection.NotifyClose(make(chan *amqp.Error))
// exit this goroutine if closed by developer
if !ok {
log.Println("connection closed")
break
}

以上代码在创建多个connection时,当任意关闭一个connection都会触发if内的代码,改为如下:

if !ok && connection.Connection.IsClosed() 判断可解决问题

测试代码:

package main

import (
"flag"
"log"
"net/http"
"time"

"sync/atomic"

"github.com/gin-gonic/gin"
"github.com/streadway/amqp"

)

const delay = 3 // reconnect after delay seconds
var (
amqpUri = flag.String("amqp", "amqp://guest:[email protected]:5672/", "amqp uri")
conn *Connection
channel *Channel
)

// Connection amqp.Connection wrapper
type Connection struct {
*amqp.Connection
}

// Channel wrap amqp.Connection.Channel, get a auto reconnect channel
func (c *Connection) Channel() (*Channel, error) {
ch, err := c.Connection.Channel()
if err != nil {
return nil, err
}

channel := &Channel{
	Channel: ch,
}

go func() {
	for {
		reason, ok := <-channel.Channel.NotifyClose(make(chan *amqp.Error))
		// exit this goroutine if closed by developer
		if !ok || channel.IsClosed() {
			log.Println("channel closed")
			channel.Close() // close again, ensure closed flag set when connection closed
			break
		}
		log.Printf("channel closed, reason: %v\n", reason)

		// reconnect if not closed by developer
		for {
			// wait 1s for connection reconnect
			time.Sleep(delay * time.Second)

			ch, err := c.Connection.Channel()
			if err == nil {
				log.Println("channel recreate success")
				channel.Channel = ch
				break
			}

			log.Printf("channel recreate failed, err: %v\n", err)
		}
	}

}()

return channel, nil

}

// Dial wrap amqp.Dial, dial and get a reconnect connection
func Dial(url string) (*Connection, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}

connection := &Connection{
	Connection: conn,
}

go func() {
	for {
		reason, ok := <-connection.Connection.NotifyClose(make(chan *amqp.Error))
		// exit this goroutine if closed by developer
		if !ok && connection.Connection.IsClosed() {
			log.Println("connection closed")
			break
		}
		log.Printf("connection closed, reason: %v\n", reason)

		// reconnect if not closed by developer
		for {
			// wait 1s for reconnect
			time.Sleep(delay * time.Second)

			conn, err := amqp.Dial(url)
			if err == nil {
				connection.Connection = conn
				log.Println("connection reconnect success")
				break
			}

			log.Printf("connection reconnect failed, err: %v", err)
		}
	}
}()

return connection, nil

}

// Channel amqp.Channel wapper
type Channel struct {
*amqp.Channel
closed int32
}

// IsClosed indicate closed by developer
func (ch *Channel) IsClosed() bool {
return (atomic.LoadInt32(&ch.closed) == 1)
}

// Close ensure closed flag set
func (ch *Channel) Close() error {
if ch.IsClosed() {
return amqp.ErrClosed
}

atomic.StoreInt32(&ch.closed, 1)

return ch.Channel.Close()

}

// Consume wrap amqp.Channel.Consume, the returned delivery will end only when channel closed by developer
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
deliveries := make(chan amqp.Delivery)

go func() {
	for {
		d, err := ch.Channel.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
		if err != nil {
			log.Printf("consume failed, err: %v\n", err)
			time.Sleep(delay * time.Second)
			continue
		}

		for msg := range d {
			deliveries <- msg
		}

		// sleep before IsClose call. closed flag may not set before sleep.
		time.Sleep(delay * time.Second)

		if ch.IsClosed() {
			break
		}
	}
}()

return deliveries, nil

}

func main() {

conn, err := Dial(*amqpUri)
if err != nil {
	log.Panic(err)
}

channel, err := conn.Channel()
if err != nil {
	log.Panic(err)
}

queueName := "test"
for i := 0; i < 5; i++ {
	go func() {
		d, err := channel.Consume(queueName, "", false, false, false, false, nil)
		if err != nil {
			log.Panic(err)
		}

		for msg := range d {
			log.Printf("msg: %s", string(msg.Body))
			msg.Ack(true)
		}
	}()
}

router := gin.Default()
router.GET("/send", publishMessage)
router.GET("/close", closeConn)

router.Run(":8080")

}

func closeConn(c *gin.Context) {
publishConn, err := Dial(*amqpUri)
if err != nil {
c.String(http.StatusOK, "MQ连接失败"+err.Error())
return
}
defer publishConn.Close()
}

func publishMessage(c *gin.Context) {
publishConn, err := Dial(*amqpUri)
if err != nil {
c.String(http.StatusOK, "MQ连接失败"+err.Error())
return
}
//defer publishConn.Close()

publishChannel, err := publishConn.Channel()
if err != nil {
	c.String(http.StatusOK, "Channel创建失败"+err.Error())
	return
}
//defer publishChannel.Close()

err = publishChannel.Publish("amq.direct", "key1", false, false, amqp.Publishing{
	ContentType: "text/plain",
	Body:        []byte("message" + time.Now().String()),
})
if err != nil {
	c.String(http.StatusOK, "发送信息到MQ失败"+err.Error())
} else {
	c.String(http.StatusOK, "发送信息到MQ成功")
}

}

Queues creates

QueueDeclare with autoDelete true is not recreated after reconnection

NotifyClose触发条件问题

reason, ok := <-connection.Connection.NotifyClose(make(chan *amqp.Error))

本地创建多个connection,任意关闭其中一个连接都会触该条件,仅判断!ok,而退出for循环不合理,是否有方法可以知道退出的connection与当前是否为同一个连接,如果是再break

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.