reason, ok := <-connection.Connection.NotifyClose(make(chan *amqp.Error))
// exit this goroutine if closed by developer
if !ok {
log.Println("connection closed")
break
}
以上代码在创建多个connection时,当任意关闭一个connection都会触发if内的代码,改为如下:
if !ok && connection.Connection.IsClosed() 判断可解决问题
测试代码:
package main
import (
"flag"
"log"
"net/http"
"time"
"sync/atomic"
"github.com/gin-gonic/gin"
"github.com/streadway/amqp"
)
const delay = 3 // reconnect after delay seconds
var (
amqpUri = flag.String("amqp", "amqp://guest:[email protected]:5672/", "amqp uri")
conn *Connection
channel *Channel
)
// Connection amqp.Connection wrapper
type Connection struct {
*amqp.Connection
}
// Channel wrap amqp.Connection.Channel, get a auto reconnect channel
func (c *Connection) Channel() (*Channel, error) {
ch, err := c.Connection.Channel()
if err != nil {
return nil, err
}
channel := &Channel{
Channel: ch,
}
go func() {
for {
reason, ok := <-channel.Channel.NotifyClose(make(chan *amqp.Error))
// exit this goroutine if closed by developer
if !ok || channel.IsClosed() {
log.Println("channel closed")
channel.Close() // close again, ensure closed flag set when connection closed
break
}
log.Printf("channel closed, reason: %v\n", reason)
// reconnect if not closed by developer
for {
// wait 1s for connection reconnect
time.Sleep(delay * time.Second)
ch, err := c.Connection.Channel()
if err == nil {
log.Println("channel recreate success")
channel.Channel = ch
break
}
log.Printf("channel recreate failed, err: %v\n", err)
}
}
}()
return channel, nil
}
// Dial wrap amqp.Dial, dial and get a reconnect connection
func Dial(url string) (*Connection, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
connection := &Connection{
Connection: conn,
}
go func() {
for {
reason, ok := <-connection.Connection.NotifyClose(make(chan *amqp.Error))
// exit this goroutine if closed by developer
if !ok && connection.Connection.IsClosed() {
log.Println("connection closed")
break
}
log.Printf("connection closed, reason: %v\n", reason)
// reconnect if not closed by developer
for {
// wait 1s for reconnect
time.Sleep(delay * time.Second)
conn, err := amqp.Dial(url)
if err == nil {
connection.Connection = conn
log.Println("connection reconnect success")
break
}
log.Printf("connection reconnect failed, err: %v", err)
}
}
}()
return connection, nil
}
// Channel amqp.Channel wapper
type Channel struct {
*amqp.Channel
closed int32
}
// IsClosed indicate closed by developer
func (ch *Channel) IsClosed() bool {
return (atomic.LoadInt32(&ch.closed) == 1)
}
// Close ensure closed flag set
func (ch *Channel) Close() error {
if ch.IsClosed() {
return amqp.ErrClosed
}
atomic.StoreInt32(&ch.closed, 1)
return ch.Channel.Close()
}
// Consume wrap amqp.Channel.Consume, the returned delivery will end only when channel closed by developer
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
deliveries := make(chan amqp.Delivery)
go func() {
for {
d, err := ch.Channel.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
if err != nil {
log.Printf("consume failed, err: %v\n", err)
time.Sleep(delay * time.Second)
continue
}
for msg := range d {
deliveries <- msg
}
// sleep before IsClose call. closed flag may not set before sleep.
time.Sleep(delay * time.Second)
if ch.IsClosed() {
break
}
}
}()
return deliveries, nil
}
func main() {
conn, err := Dial(*amqpUri)
if err != nil {
log.Panic(err)
}
channel, err := conn.Channel()
if err != nil {
log.Panic(err)
}
queueName := "test"
for i := 0; i < 5; i++ {
go func() {
d, err := channel.Consume(queueName, "", false, false, false, false, nil)
if err != nil {
log.Panic(err)
}
for msg := range d {
log.Printf("msg: %s", string(msg.Body))
msg.Ack(true)
}
}()
}
router := gin.Default()
router.GET("/send", publishMessage)
router.GET("/close", closeConn)
router.Run(":8080")
}
func closeConn(c *gin.Context) {
publishConn, err := Dial(*amqpUri)
if err != nil {
c.String(http.StatusOK, "MQ连接失败"+err.Error())
return
}
defer publishConn.Close()
}
func publishMessage(c *gin.Context) {
publishConn, err := Dial(*amqpUri)
if err != nil {
c.String(http.StatusOK, "MQ连接失败"+err.Error())
return
}
//defer publishConn.Close()
publishChannel, err := publishConn.Channel()
if err != nil {
c.String(http.StatusOK, "Channel创建失败"+err.Error())
return
}
//defer publishChannel.Close()
err = publishChannel.Publish("amq.direct", "key1", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("message" + time.Now().String()),
})
if err != nil {
c.String(http.StatusOK, "发送信息到MQ失败"+err.Error())
} else {
c.String(http.StatusOK, "发送信息到MQ成功")
}
}