Giter VIP home page Giter VIP logo

Comments (6)

gftea avatar gftea commented on July 28, 2024 2

Hi, thanks for pointing out this!

Currently, the library does not implement TCPstream reconnect and does not provide callback interface for TCP stream failure.

ConnectionCallback/ChannelCallback are intenteded to AMQP protocol layer messages.

I will need to figure out how to handle this properly within library.

But before that is available, using connection pool may be a solution, such as. bb8, deadpool.

I also just push a fix in version v1.0.9 which will set is_open to false on TCP stream failure. I think a simple solution can use is_open to determine if connection is closed unexpectedly, because is_open never be false if user keep the connection object from drop and never call close explicitly.

from amqprs.

gftea avatar gftea commented on July 28, 2024 2

@cdbennett I just found out the API has flaw - when the connection is closed normally, the function will never return.

New candiate version to test v1.1.0-rc.2
I have to change the API like below

    // wait on io failure
    // if return `true`, it means got notification due to network failure, we try to recreate connection
    // if return `false`, the connection may already be closed, or closed later by client or server.
    if connection.listen_network_io_failure().await {
        loop {
            time::sleep(Duration::from_secs(10)).await;

             // ... do some recovery...
            if let Ok(recover_conn) = Connection::open(&conn_args).await {
                 // ..
                break;
            };
        }
    };

from amqprs.

gftea avatar gftea commented on July 28, 2024 1

@cdbennett , reconnect and auto recovery is not a trival implementation. now in PR #59 , a new API is added for waiting on underlying io failure and call the user-provided handler. You can look at new integration test case as an example amqprs/tests/test_io_error_handling.rs

Feel free to comment if this API is goodm fit for your needs.
You can use v1.1.0-rc.1 to try out the API.

from amqprs.

cdbennett avatar cdbennett commented on July 28, 2024 1

@gftea Fantastic! That's exactly what I was looking for. In my proof of concept implementation I don't even do anything inside the callback closure (except log a message for testing). Instead, the simple fact that the blocking method () returns means that the connection was lost.

A simple and seemingly robust way to handle reconnection is to start over with creating a new amqprs Connection and setting up channel, queue, exchange binding etc., especially in the case that the RabbitMQ server was restarted and the queues weren't durable etc.

Example of recovery behavior demonstrated now:

<< running fine here >>
2023-02-15T00:36:45.771509Z  INFO amqprs::api::channel::dispatcher: register consumer amqprs_reconnect

<< unplugged the network connection >>

2023-02-15T00:36:52.632756Z ERROR amqprs_reconnect: RabbitMQ network I/O failure
2023-02-15T00:36:52.632749Z ERROR amqprs::net::reader_handler: socket will be closed due to failure of reading frame, cause: peer shutdown
2023-02-15T00:36:52.632836Z  WARN amqprs_reconnect: terminating RabbitMQ connection process due to connection loss
2023-02-15T00:36:52.632928Z DEBUG amqprs::api::channel::dispatcher: dispatcher mpsc channel closed, channel 1 [closed] of connection 'AMQPRS000@localhost:7771den [closed]'
2023-02-15T00:36:52.632954Z ERROR amqprs_reconnect: RabbitMQ connection returned error: RabbitMQ server connection lost
2023-02-15T00:36:52.632954Z  INFO amqprs::net::writer_handler: received shutdown notification for connection 'AMQPRS000@localhost:7771den [closed]'
2023-02-15T00:36:52.632963Z  INFO amqprs::api::channel::dispatcher: exit dispatcher of channel 1 [closed] of connection 'AMQPRS000@localhost:7771den [closed]'
2023-02-15T00:36:52.633019Z  INFO amqprs::api::channel: try to close channel 1 [closed] of connection 'AMQPRS000@localhost:7771den [closed]' at drop
2023-02-15T00:36:52.633088Z ERROR amqprs::api::channel: failed to gracefully close channel 1 [closed] of connection 'AMQPRS000@localhost:7771den [closed]' at drop, cause: 'internal communication error: channel closed'
2023-02-15T00:36:52.633145Z DEBUG amqprs::api::channel::basic: exit task of async consumer amqprs_reconnect
2023-02-15T00:36:53.634773Z  INFO amqprs_reconnect: ready to restart RabbitMQ task
2023-02-15T00:36:53.634819Z DEBUG amqprs_reconnect: starting RabbitMQ task
2023-02-15T00:36:53.635586Z ERROR amqprs_reconnect: RabbitMQ connection returned error: can't connect to RabbitMQ server at localhost:7771

Caused by:
    AMQP network error: network io error: Connection refused (os error 61)
2023-02-15T00:36:54.636616Z  INFO amqprs_reconnect: ready to restart RabbitMQ task
2023-02-15T00:36:54.636681Z DEBUG amqprs_reconnect: starting RabbitMQ task
2023-02-15T00:36:54.638034Z ERROR amqprs_reconnect: RabbitMQ connection returned error: can't connect to RabbitMQ server at localhost:7771
...
Caused by:
    AMQP network error: network io error: Connection refused (os error 61)

<< plugged back in the network connection here >>

2023-02-15T00:36:59.652533Z  INFO amqprs_reconnect: ready to restart RabbitMQ task
2023-02-15T00:36:59.652579Z DEBUG amqprs_reconnect: starting RabbitMQ task
2023-02-15T00:37:00.369965Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS001@localhost:7771den [open]'

<< nice!! we reconnected >>

2023-02-15T00:37:00.369992Z  INFO amqprs::api::connection: open connection AMQPRS001@localhost:7771den
2023-02-15T00:37:00.370078Z DEBUG amqprs::net::reader_handler: callback registered on connection 'AMQPRS001@localhost:7771den [open]'
2023-02-15T00:37:00.370118Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS001@localhost:7771den [open]'
2023-02-15T00:37:00.550221Z  INFO amqprs::api::connection: open channel 1 [open] of connection 'AMQPRS001@localhost:7771den [open]'
2023-02-15T00:37:00.551715Z DEBUG amqprs::api::channel::dispatcher: callback registered on channel 1 [open] of connection 'AMQPRS001@localhost:7771den [open]'
2023-02-15T00:37:00.739970Z DEBUG amqprs_reconnect: declared queue 'amq.gen-cXcKSyrZ_DoeFNXaOB9TkA'
2023-02-15T00:37:00.740022Z DEBUG amqprs_reconnect: binding exchange amq.topic -> queue amq.gen-cXcKSyrZ_DoeFNXaOB9TkA
2023-02-15T00:37:01.516925Z  INFO amqprs::api::channel::dispatcher: register consumer amqprs_reconnect

<< and all is good >>

Demo in https://github.com/cdbennett/amqprs-reconnect-example/blob/f4a460d3862b407e8005023c9e0410936c604ec3/src/main.rs#L166

I will keep playing with it but it is looking good! Thanks for the quick response, it is great to see such active enthusiastic development!

from amqprs.

cdbennett avatar cdbennett commented on July 28, 2024

Works perfectly, thanks.

from amqprs.

gftea avatar gftea commented on July 28, 2024

I would close the issue if you are so far happy with it.

from amqprs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.