assembla / cony Goto Github PK
View Code? Open in Web Editor NEWSimple AMQP wrapper around github.com/streadway/amqp
License: BSD 2-Clause "Simplified" License
Simple AMQP wrapper around github.com/streadway/amqp
License: BSD 2-Clause "Simplified" License
When testing blocking/unblocking feature of cony, the publisher receives only the blocking message but not the unblocking/back to normal one.
To test the issue rabbitmqctl set_vm_memory_high_watermark 0
puts the rabbit in alarm and rabbitmqctl set_vm_memory_high_watermark 0.4
brings it back to normal.
PR to be provided soon.
Steps to reproduce:
Start a RabbitMQ server.
Start an instance of the basic example consumer that connects to the server.
Stop the RabbitMQ server so that doesn't close the connection, but won't respond. For example send SIGSTOP
to the process or issue docker pause
if the server is running in a docker container.
After about 30 seconds will output a timeout error:
Client error: Exception (501) Reason: "read tcp 33.33.33.1:61434->33.33.33.60:5672: i/o timeout"
Wait some more to get a second error.
Resume the RabbitMQ, e.g. by sending SIGCONT
to the process or issuing docker unpause
.
Try sending a message to the consumer using the basic example prodcuer.
Expected result:
The consumer should reconnect and receive the message.
Actual result:
The consumer doesn't reconnect and will not receive the message.
It looks like (*Client).Loop
stores the connection even if amq.Dial
fails and that the broken connection is used on further iterations. The fix should be to not store the connection until after the error has been checked.
Hi,
I was just wondering if any of you experienced issue receiving messages twice or missing some messages some times. I have each rmq message received and processed in a separate go routine and I added message numbering. I can see from the logs in rabbit and in my software that between reception and the handling of the message some strange behavior occur. I'll will dig further, but in any case, feel free to share your experience.
All the declarations, Queue, Exchange and bindings, use nil
as argument Table. It would be nice to add argument table to the structures and pass them to declaration to allow for exemple dead-lettering queue header.
If I understand the design of cony correctly, the Publishers of a Client must be declared (complete with exchange and routing key) upfront, before the Client's loop is started? That is, there is no way to change the exchange or routing key of a Publisher, or to add Publishers after the client has been started?
If so, I see some inflexibility with this design. It prevents message routing keys and exchange to be generated at runtime, while the rest of the message can be dynamic. One could work around it by creating a new Client+Publisher per publishing, but it is costly to create a new connection on each publish.
When using an auto named queue it seems that it creates it fine first time but on a reconnect the RabbitMQ server issues the following error.
Client error: Exception (403) Reason: "ACCESS_REFUSED - queue name 'amq.gen-zOW_J2f1kRwrKFY-DvZBGw' contains reserved prefix 'amq.*'"
Client error: Exception (504) Reason: "channel/connection is not open"
Client error: Exception (504) Reason: "channel/connection is not open"
Consumer error: Exception (404) Reason: "NOT_FOUND - no previously declared queue"
Then no messages are received and it won't work again until a restart. I presume it is trying to use the name it connected with on the first connection and try to recreate that queue but due to the name it is not allowed.
Awesome package, this helps us a lot. Just one question that's come up while testing various edge cases:
If prefetch is set to > 1, then you could conceivably have the following scenario:
amqp.Message
s in the deliveries
channel reference a channel that is now CLOSED.How would you suggest handling this case? If we remake the deliveries channel would that imply a NACK on the rabbit server?
Hello,
Hello, running on docker with a bridged network, in the client reconnection loop, when running the amqp.Dial, it returns a connection but also an error. This causes the user client to start listening on the deliveries on a faulty connection.
Here are some logs I added in the client.Loop for debugging, giving the content of the connection and errors.
I'm preparing a PR to set the connection to nil
in case an error is returned when dialing to fix the issue.
`
Dialing new connection
Storing new connection &{{{%!V(int32=+0) %!V(uint32=+0)} %!V(uint32=+1)} {%!V(int32=+0) %!V(uint32=+0)} {%!V(int32=+0) %!V(uint32=+0)} %!V(_net.TCPConn=&{{+0xc8201d6540}}) %!V(chan amqp.message=+0xc8201c8d20) %!V(_amqp.writer=&{+0xc8201afb80}) %!V(chan time.Time=+0xc8201c8d80) %!V(chan amqp.readDeadliner=+0xc8201c8e40) %!V(_amqp.allocator=) map[] %!V(bool=true) [] [] %!V(chan *amqp.Error=+0xc8201c8de0) {[] %!V(string=) %!V(int=+0) %!V(int=+0) %!V(time.Duration=+0) %!V(_tls.Config=) map[] %!V(func(string, string) (net.Conn, error)=)} %!V(int=+0) %!V(int=+0) map[]}
Reporting error dialing &{%!V(int=+501) %!V(string=EOF) %!V(bool=false) %!V(bool=false)}
Entering connection loop
Loading connection
Connection not nil
time="2016-02-22T16:05:23Z" level=warning msg="client error" error="Exception (501) Reason: "EOF""
`
Line 76 in dd62697
I find i can't set queue ttl, which set by args 'x-expires'
I Start an instance of the web producer producter that connects to the server.
Sometimes , It will output an error:
Exception (501) Reason: "read tcp 10.20.5.111:48947->10.20.5.160:5672: i/o timeout
The Rabbitmq Server error log is:
closing AMQP connection <0.3780.1140>( 10.20.5.111:48947->10.20.5.160:5672); client unexpectedly closed TCP connection
https://github.com/rabbitmq/amqp091-go is maintained by the RabbitMQ core team and should replace streadway/amqp.
I'll provide a PR to support amqp.Blocking channel in the client to be notified when RabbitMQ server blocks a connection as described in the doc :
Blocking notifies the server's TCP flow control of the Connection. When a
server hits a memory or disk alarm it will block all connections until the
resources are reclaimed. Use NotifyBlock on the Connection to receive these
events.
I would like to add the ability to a declare queue passive and declare an exchange passive.
Randomly, after about 10 days my subscriber stops consuming messages. RabbitMQ throws an error telling that it has not received the heartbeat and closes the connection. On consumer side the connection stays in CLOSE_WAIT. This means the subscriber did not close the connection and thus the reconnection loop has not been activated. Did someone encounter such an issue ? Maybe my subscriber is stuck so it does not send the heartbeat or cannot close the connection ? I'll check the go routines status. Last point, everything is running in separate docker containers.
Hi,
I got multiple errors when trying to close a Consumer using Cancel:
The c.deliveries channel is closed in the Cancel method, but than the serve method, because of the randomness of select, sometimes tries to write in this channel before running the c.close case. The program crashes because of this.
Here is an extract of my program (sigs is closed when I receive a signal):
for rmqConnector.Loop() {
select {
case msg := <-rmqConsumer.Deliveries():
messageChan <- msg.Body
case err := <-rmqConsumer.Errors():
fmt.Printf("Consumer error: %v\n", err)
case err := <-rmqConnector.Errors():
fmt.Printf("Client error: %v\n", err)
case _, open := <-sigs:
if !open {
rmqConsumer.Cancel()
return
}
}
}
When canceling, I get a "panic: send on closed channel" at consumer.go line 92, called from client.go line 188.
Am I using this wrong ?
Maybe moving
close(c.deliveries)
from
if !c.dead {
close(c.deliveries)
close(c.stop)
c.dead = true
}
to
select {
case <-c.stop:
client.deleteConsumer(c)
ch.Close()
return
would fix it ?
latest commit was Dec 21, 2016
should we assume this project is no longer active?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.