Giter VIP home page Giter VIP logo

cony's Introduction

Cony

High-level AMQP 0.9.1 client library. It's wrapper around low-level streadway/amqp library.

Goals

Provide a way to work with AMQP declaratively

Requirments

The library uses atomic.Value, so Go 1.4+ is needed.

Documentation

GoDoc Build Status

Thread-safety

Cony is thread-safe as long as streadway/amqp is thread-safe. It's recommended to open AMQP channel per thread, so in case of cony it should be Consumer Producer per goroutine.

License

BSD 2 clause - see LICENSE for more details.

cony's People

Contributors

alexdeefuse avatar kron4eg avatar onlinehead avatar pavel-d avatar sebastienfr avatar speedywizard avatar williambailey avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cony's Issues

Support of amqp.Blocking

I'll provide a PR to support amqp.Blocking channel in the client to be notified when RabbitMQ server blocks a connection as described in the doc :

Blocking notifies the server's TCP flow control of the Connection. When a
server hits a memory or disk alarm it will block all connections until the
resources are reclaimed. Use NotifyBlock on the Connection to receive these
events.

Client won't reconnect after second amqp.Dial timeout

Steps to reproduce:

  1. Start a RabbitMQ server.

  2. Start an instance of the basic example consumer that connects to the server.

  3. Stop the RabbitMQ server so that doesn't close the connection, but won't respond. For example send SIGSTOP to the process or issue docker pause if the server is running in a docker container.

    After about 30 seconds will output a timeout error:

    Client error: Exception (501) Reason: "read tcp 33.33.33.1:61434->33.33.33.60:5672: i/o timeout"
    

    Wait some more to get a second error.

  4. Resume the RabbitMQ, e.g. by sending SIGCONT to the process or issuing docker unpause.

  5. Try sending a message to the consumer using the basic example prodcuer.

Expected result:
The consumer should reconnect and receive the message.

Actual result:
The consumer doesn't reconnect and will not receive the message.

It looks like (*Client).Loop stores the connection even if amq.Dial fails and that the broken connection is used on further iterations. The fix should be to not store the connection until after the error has been checked.

Support of headers in queue and exchange

All the declarations, Queue, Exchange and bindings, use nil as argument Table. It would be nice to add argument table to the structures and pass them to declaration to allow for exemple dead-lettering queue header.

Consumer send on closed channel after Cancel

Hi,

I got multiple errors when trying to close a Consumer using Cancel:
The c.deliveries channel is closed in the Cancel method, but than the serve method, because of the randomness of select, sometimes tries to write in this channel before running the c.close case. The program crashes because of this.

Here is an extract of my program (sigs is closed when I receive a signal):

for rmqConnector.Loop() {
  select {
    case msg := <-rmqConsumer.Deliveries():
      messageChan <- msg.Body
    case err := <-rmqConsumer.Errors():
      fmt.Printf("Consumer error: %v\n", err)
    case err := <-rmqConnector.Errors():
      fmt.Printf("Client error: %v\n", err)
    case _, open := <-sigs:
      if !open {
        rmqConsumer.Cancel()
        return
      }
   }
}

When canceling, I get a "panic: send on closed channel" at consumer.go line 92, called from client.go line 188.

Am I using this wrong ?
Maybe moving

close(c.deliveries)

from

if !c.dead {
  close(c.deliveries)
  close(c.stop)
  c.dead = true
}

to

select {
  case <-c.stop:
    client.deleteConsumer(c)
    ch.Close()
    return

would fix it ?

Using blank queue name results in error on reconnection

When using an auto named queue it seems that it creates it fine first time but on a reconnect the RabbitMQ server issues the following error.

Client error: Exception (403) Reason: "ACCESS_REFUSED - queue name 'amq.gen-zOW_J2f1kRwrKFY-DvZBGw' contains reserved prefix 'amq.*'"
Client error: Exception (504) Reason: "channel/connection is not open"
Client error: Exception (504) Reason: "channel/connection is not open"
Consumer error: Exception (404) Reason: "NOT_FOUND - no previously declared queue"

Then no messages are received and it won't work again until a restart. I presume it is trying to use the name it connected with on the first connection and try to recreate that queue but due to the name it is not allowed.

Reconnection error Exception (501) Reason: "EOF"

Hello,

Hello, running on docker with a bridged network, in the client reconnection loop, when running the amqp.Dial, it returns a connection but also an error. This causes the user client to start listening on the deliveries on a faulty connection.

Here are some logs I added in the client.Loop for debugging, giving the content of the connection and errors.

I'm preparing a PR to set the connection to nil in case an error is returned when dialing to fix the issue.

`
Dialing new connection
Storing new connection &{{{%!V(int32=+0) %!V(uint32=+0)} %!V(uint32=+1)} {%!V(int32=+0) %!V(uint32=+0)} {%!V(int32=+0) %!V(uint32=+0)} %!V(_net.TCPConn=&{{+0xc8201d6540}}) %!V(chan amqp.message=+0xc8201c8d20) %!V(_amqp.writer=&{+0xc8201afb80}) %!V(chan time.Time=+0xc8201c8d80) %!V(chan amqp.readDeadliner=+0xc8201c8e40) %!V(_amqp.allocator=) map[] %!V(bool=true) [] [] %!V(chan *amqp.Error=+0xc8201c8de0) {[] %!V(string=) %!V(int=+0) %!V(int=+0) %!V(time.Duration=+0) %!V(_tls.Config=) map[] %!V(func(string, string) (net.Conn, error)=)} %!V(int=+0) %!V(int=+0) map[]}

Reporting error dialing &{%!V(int=+501) %!V(string=EOF) %!V(bool=false) %!V(bool=false)}

Entering connection loop

Loading connection

Connection not nil

time="2016-02-22T16:05:23Z" level=warning msg="client error" error="Exception (501) Reason: "EOF""
`

Subscriber keeps connection to CLOSE_WAIT

Randomly, after about 10 days my subscriber stops consuming messages. RabbitMQ throws an error telling that it has not received the heartbeat and closes the connection. On consumer side the connection stays in CLOSE_WAIT. This means the subscriber did not close the connection and thus the reconnection loop has not been activated. Did someone encounter such an issue ? Maybe my subscriber is stuck so it does not send the heartbeat or cannot close the connection ? I'll check the go routines status. Last point, everything is running in separate docker containers.

Blocked publisher does not forward unblock message

When testing blocking/unblocking feature of cony, the publisher receives only the blocking message but not the unblocking/back to normal one.

To test the issue rabbitmqctl set_vm_memory_high_watermark 0 puts the rabbit in alarm and rabbitmqctl set_vm_memory_high_watermark 0.4 brings it back to normal.

PR to be provided soon.

Design question

If I understand the design of cony correctly, the Publishers of a Client must be declared (complete with exchange and routing key) upfront, before the Client's loop is started? That is, there is no way to change the exchange or routing key of a Publisher, or to add Publishers after the client has been started?

If so, I see some inflexibility with this design. It prevents message routing keys and exchange to be generated at runtime, while the rest of the message can be dynamic. One could work around it by creating a new Client+Publisher per publishing, but it is costly to create a new connection on each publish.

Distribution issue

Hi,

I was just wondering if any of you experienced issue receiving messages twice or missing some messages some times. I have each rmq message received and processed in a separate go routine and I added message numbering. I can see from the logs in rabbit and in my software that between reception and the handling of the message some strange behavior occur. I'll will dig further, but in any case, feel free to share your experience.

Handling prefetched messages upon lost connection?

Awesome package, this helps us a lot. Just one question that's come up while testing various edge cases:

If prefetch is set to > 1, then you could conceivably have the following scenario:

  1. Multiple messages are published and delivered to the consumer, pending processing
  2. Connection is lost (server blip, let's say). Cony gracefully handles reconnection, BUT
  3. The amqp.Messages in the deliveries channel reference a channel that is now CLOSED.

How would you suggest handling this case? If we remake the deliveries channel would that imply a NACK on the rabbit server?

Exception (501) Reason i/o timeout

I Start an instance of the web producer producter that connects to the server.
Sometimes , It will output an error:
Exception (501) Reason: "read tcp 10.20.5.111:48947->10.20.5.160:5672: i/o timeout

The Rabbitmq Server error log is:
closing AMQP connection <0.3780.1140>( 10.20.5.111:48947->10.20.5.160:5672); client unexpectedly closed TCP connection

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.