Comments (6)
Hi, thanks for pointing out this!
Currently, the library does not implement TCPstream reconnect and does not provide callback interface for TCP stream failure.
ConnectionCallback/ChannelCallback
are intenteded to AMQP protocol layer messages.
I will need to figure out how to handle this properly within library.
But before that is available, using connection pool may be a solution, such as. bb8, deadpool.
I also just push a fix in version v1.0.9 which will set is_open
to false
on TCP stream failure. I think a simple solution can use is_open
to determine if connection is closed unexpectedly, because is_open
never be false
if user keep the connection object from drop and never call close
explicitly.
from amqprs.
@cdbennett I just found out the API has flaw - when the connection is closed normally, the function will never return.
New candiate version to test v1.1.0-rc.2
I have to change the API like below
// wait on io failure
// if return `true`, it means got notification due to network failure, we try to recreate connection
// if return `false`, the connection may already be closed, or closed later by client or server.
if connection.listen_network_io_failure().await {
loop {
time::sleep(Duration::from_secs(10)).await;
// ... do some recovery...
if let Ok(recover_conn) = Connection::open(&conn_args).await {
// ..
break;
};
}
};
from amqprs.
@cdbennett , reconnect and auto recovery is not a trival implementation. now in PR #59 , a new API is added for waiting on underlying io failure and call the user-provided handler. You can look at new integration test case as an example amqprs/tests/test_io_error_handling.rs
Feel free to comment if this API is goodm fit for your needs.
You can use v1.1.0-rc.1
to try out the API.
from amqprs.
@gftea Fantastic! That's exactly what I was looking for. In my proof of concept implementation I don't even do anything inside the callback closure (except log a message for testing). Instead, the simple fact that the blocking method () returns means that the connection was lost.
A simple and seemingly robust way to handle reconnection is to start over with creating a new amqprs Connection and setting up channel, queue, exchange binding etc., especially in the case that the RabbitMQ server was restarted and the queues weren't durable etc.
Example of recovery behavior demonstrated now:
<< running fine here >>
2023-02-15T00:36:45.771509Z INFO amqprs::api::channel::dispatcher: register consumer amqprs_reconnect
<< unplugged the network connection >>
2023-02-15T00:36:52.632756Z ERROR amqprs_reconnect: RabbitMQ network I/O failure
2023-02-15T00:36:52.632749Z ERROR amqprs::net::reader_handler: socket will be closed due to failure of reading frame, cause: peer shutdown
2023-02-15T00:36:52.632836Z WARN amqprs_reconnect: terminating RabbitMQ connection process due to connection loss
2023-02-15T00:36:52.632928Z DEBUG amqprs::api::channel::dispatcher: dispatcher mpsc channel closed, channel 1 [closed] of connection 'AMQPRS000@localhost:7771den [closed]'
2023-02-15T00:36:52.632954Z ERROR amqprs_reconnect: RabbitMQ connection returned error: RabbitMQ server connection lost
2023-02-15T00:36:52.632954Z INFO amqprs::net::writer_handler: received shutdown notification for connection 'AMQPRS000@localhost:7771den [closed]'
2023-02-15T00:36:52.632963Z INFO amqprs::api::channel::dispatcher: exit dispatcher of channel 1 [closed] of connection 'AMQPRS000@localhost:7771den [closed]'
2023-02-15T00:36:52.633019Z INFO amqprs::api::channel: try to close channel 1 [closed] of connection 'AMQPRS000@localhost:7771den [closed]' at drop
2023-02-15T00:36:52.633088Z ERROR amqprs::api::channel: failed to gracefully close channel 1 [closed] of connection 'AMQPRS000@localhost:7771den [closed]' at drop, cause: 'internal communication error: channel closed'
2023-02-15T00:36:52.633145Z DEBUG amqprs::api::channel::basic: exit task of async consumer amqprs_reconnect
2023-02-15T00:36:53.634773Z INFO amqprs_reconnect: ready to restart RabbitMQ task
2023-02-15T00:36:53.634819Z DEBUG amqprs_reconnect: starting RabbitMQ task
2023-02-15T00:36:53.635586Z ERROR amqprs_reconnect: RabbitMQ connection returned error: can't connect to RabbitMQ server at localhost:7771
Caused by:
AMQP network error: network io error: Connection refused (os error 61)
2023-02-15T00:36:54.636616Z INFO amqprs_reconnect: ready to restart RabbitMQ task
2023-02-15T00:36:54.636681Z DEBUG amqprs_reconnect: starting RabbitMQ task
2023-02-15T00:36:54.638034Z ERROR amqprs_reconnect: RabbitMQ connection returned error: can't connect to RabbitMQ server at localhost:7771
...
Caused by:
AMQP network error: network io error: Connection refused (os error 61)
<< plugged back in the network connection here >>
2023-02-15T00:36:59.652533Z INFO amqprs_reconnect: ready to restart RabbitMQ task
2023-02-15T00:36:59.652579Z DEBUG amqprs_reconnect: starting RabbitMQ task
2023-02-15T00:37:00.369965Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS001@localhost:7771den [open]'
<< nice!! we reconnected >>
2023-02-15T00:37:00.369992Z INFO amqprs::api::connection: open connection AMQPRS001@localhost:7771den
2023-02-15T00:37:00.370078Z DEBUG amqprs::net::reader_handler: callback registered on connection 'AMQPRS001@localhost:7771den [open]'
2023-02-15T00:37:00.370118Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS001@localhost:7771den [open]'
2023-02-15T00:37:00.550221Z INFO amqprs::api::connection: open channel 1 [open] of connection 'AMQPRS001@localhost:7771den [open]'
2023-02-15T00:37:00.551715Z DEBUG amqprs::api::channel::dispatcher: callback registered on channel 1 [open] of connection 'AMQPRS001@localhost:7771den [open]'
2023-02-15T00:37:00.739970Z DEBUG amqprs_reconnect: declared queue 'amq.gen-cXcKSyrZ_DoeFNXaOB9TkA'
2023-02-15T00:37:00.740022Z DEBUG amqprs_reconnect: binding exchange amq.topic -> queue amq.gen-cXcKSyrZ_DoeFNXaOB9TkA
2023-02-15T00:37:01.516925Z INFO amqprs::api::channel::dispatcher: register consumer amqprs_reconnect
<< and all is good >>
I will keep playing with it but it is looking good! Thanks for the quick response, it is great to see such active enthusiastic development!
from amqprs.
Works perfectly, thanks.
from amqprs.
I would close the issue if you are so far happy with it.
from amqprs.
Related Issues (20)
- FieldTable does not accept integer type HOT 3
- Implementing Prefetch for a given Queue using AsyncConsumer HOT 2
- Feature request: Support for external authentication mechanism HOT 3
- ChannelCallback::cancel() not called when server delete queue HOT 3
- Support `OpenConnectionArguments::from_url`? HOT 2
- Parsing Headers into Strings?
- Parsing Headers into Strings? HOT 3
- "channel concurrency" inconsistency between documentation & implementation HOT 1
- [feature request] Missing heartbeat should trigger an I/O failure HOT 4
- Is this production ready? HOT 2
- missing reset of heartbeat miss counter
- Quick question! HOT 3
- "Alternative default: queue declaration arguments HOT 1
- BasicPublishArguments: deprecate the immediate field HOT 2
- BasicConsumeArguments: rename no_ack since it is confusing HOT 3
- Address (most) clippy warnings HOT 3
- ConsumerMessage.delivery and .basic_properties should not be options HOT 2
- ExchangeDeclareArguments: exchange type is inherently an enum HOT 3
- FieldTable needs a convenience method for instantiation from a map HOT 4
- Best practices for opening channel, and prevent it from being dropped. HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from amqprs.