Comments (19)
Dave Syer commented
Is this anything to do with rollbacks in a transactional listener container? What is the re-connection algorithm that you had in mind?
from spring-amqp.
Shane Witbeck commented
I just wanted to comment that this will be very helpful from a network and/or broker outage standpoint. I'd also propose introducing an interface (ConnectionRetryStrategy?) which would easily allow users to define their own reconnect algorithm.
from spring-amqp.
Dave Syer commented
I'm still not really sure how it fits in to the existing code. What would a default strategy look like? Would a generic retry of method calls on Channel work. I'm not sure the exceptions you get from the AMQP client are fine grained enough to tell the difference between a retryable exception and a fatal one.
from spring-amqp.
Shane Witbeck commented
For the default strategy it could just be retry every n seconds. Another example would be a 2n back off (retry after 5s, 10s, 20s, etc). This reminds me of the "delay" and "period" attributes available on Spring's ScheduledTimeTask.
In my early testing, I've seen a "connection refused" exception which occurs when I stop a broker while a client is connected to it. I think this could serve as the basis for a retry.
In the larger scope of failover I'm looking for a solution to be able to reconnect to a known list of available brokers. This negates the need for a TCP load balancer from the perspective of the clients. See the "Getting an IP address to move with Rabbit" section of the Pacemaker guide on the RabbitMQ site for more about this. Although this kind of functionality is a bit much for this stage of development of Spring AMQP, it would be nice to have sometime in the future.
from spring-amqp.
Dave Syer commented
Can you pinpoint the exception a bit more? Where was it thrown and what was its type / message / other distinguishing features?
The strategy you are looking for has an existing implementation in http://git.springsource.org/spring-commons/spring-commons-retry/blobs/master/src/main/java/org/springframework/commons/retry; see RetryPolicy.java and backoff/BackoffPolicy.java plus implementations. I was hoping we could just use that if we can find out where to put the interceptor and how to configure the policy.
from spring-amqp.
Shane Witbeck commented
Here's a full stack trace from a simple app using Spring AMQP snapshot and RabbitMQ 2.0.0 client/broker:
org.springframework.amqp.AmqpIOException: java.net.ConnectException: Connection refused
at org.springframework.amqp.rabbit.support.RabbitUtils.convertRabbitAccessException(RabbitUtils.java:116)
at org.springframework.amqp.rabbit.support.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:107)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:336)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:196)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:213)
at com.digitalsanctum.rabid.Producer.sendMsg(Producer.java:33)
at com.digitalsanctum.rabid.Producer.start(Producer.java:61)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.start(Producer.java:66)
at com.digitalsanctum.rabid.Producer.main(Producer.java:25)
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:333)
at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:195)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:432)
at java.net.Socket.connect(Socket.java:529)
at java.net.Socket.connect(Socket.java:478)
at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:338)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:376)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:399)
at org.springframework.amqp.rabbit.connection.RabidConnectionFactory.doCreateConnection(RabidConnectionFactory.java:198)
at org.springframework.amqp.rabbit.connection.RabidConnectionFactory.initConnection(RabidConnectionFactory.java:143)
at org.springframework.amqp.rabbit.connection.RabidConnectionFactory.createConnection(RabidConnectionFactory.java:132)
at org.springframework.amqp.rabbit.support.RabbitAccessor.createConnection(RabbitAccessor.java:88)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:326)
... 20 more
from spring-amqp.
Dave Syer commented
Thanks, that's useful. I think we need to understand the cause of the failures we are expecting to deal with in this way as well though.
If the cause is a network glitch or a broker cluster topology change, them I can believe that maybe all we need to do is try the send again (or more precisely the execute call). I will still be slightly surprised if it just works because the Channel probably would be closed after the exception, so we'd have to make sure that the connection factory can discard dead connections and get us a new one.
What if the broker went down and was restarted? The bindings and queues relating to the send (or receive) that we are attempting could have been deleted in the broker. Is it even possible to deal with that automatically? We would need to bootstrap any broker meta data that our client was responsible for (and which we have no direct knowledge of in the send method of the template). It's not impossible but it starts to get very complicated.
Any suggestions (from anyone) about where to draw the line?
from spring-amqp.
Shane Witbeck commented
There's an addShutdownListener method on the underlying Rabbit Connection interface. I'm dabbling with this now and it appears that you're able to get some more descriptive exception messages from the ShutdownSignalException. Below is an example of an exception that's thrown when I manually shutdown the broker:
com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method<connection.close>(reply-code=320,reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown',class-id=0,method-id=0),null,""}
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:601)
at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:554)
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:522)
at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:97)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:165)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:426)
from spring-amqp.
Dave Syer commented
Yeah, I was looking at that too. There's also the question of what to do if a Channel craps out. In the case of a message listener (basicConsume) there is a callback for that as well in the consumer. I think we might have to add something for the synchronous cases - we just need to figure out which exceptions correspond to a dead channel.
I am also thinking we can only re-bind all the configuration-level artifacts (queues, exchanges, routing keys) that were added as Spring components. If the user is creating his own queues (especially exclusive, volatile ones) and bindings at runtime, it will be his responsibility to re-create them after an exception.
Does that make sense?
from spring-amqp.
Shane Witbeck commented
Just circling back to the use of the addShutdownListener method. I used this method and added an implementation class derived from SingleConnectionFactory called AutoFailoverConnectionFactory to achieve a crude retry and failover strategy. The listener basically hooks back to the main class where the Spring context is initialized and uses AutoFailoverConnectionFactory to cycle through a known list of secondary brokers to connect to. So at a high level this works ok.
I look forward to a finer-grained approach from the Spring AMQP devs :)
from spring-amqp.
Dave Syer commented
What do you do about Bindings, Queues and Exchanges? Assume that they are durable and already declared?
from spring-amqp.
Shane Witbeck commented
In this case, they're all defined via Spring so they get re-created when the Spring context is initialized again.
from spring-amqp.
Mark Lui commented
We have been overriding the method prepareConnection(Connection con) in the SingleConnectionFactory to add a ShutdownListener to support reconnection logic based on this thread. The listener attempts to reset the connectionFactory at a periodic rate as well as bouncing any connection listeners until it can hook back up to the message broker. This works quite well for 1.0.0.M1 release. The move to 1.0.0.M2 changed the preparedConnection method signature from passing com.rabbitmq.client.Connection to org.springframework.amqp.rabbit.connection.Connection losing the ability to add the ShutdownListener. Of course we can use reflection to still add this logic but we want make sure we are in synch the the project.
Will the reconnection logic be supported sometime in the future? What is the planned approach since we could possibly do our own approach for now and move to the next version with any major issues. We need reconnection logic to go to production and would be willing to help is needed. I could donate our code if that would help (based on M1 however)
Mark (Shopzilla)
from spring-amqp.
Dave Syer commented
Thanks for the query. We had to introduce the Spring Connection interface to support segragation of transactional and non-transactional Channels in the CachingConnectionFactory. There's no reason not to add other methods if they are useful, so we can definitely look at supporting the listener use case.
We have a pull request already open with a different approach to re-connection (which you can take a look at), but I like the sound of yours as well. Is it browseable (e.g. on github)?
from spring-amqp.
Mark Lui commented
Thanks for the info. I will take a look at the current pull request. The code is currently on an internal GIT repository. I can put it up on github that would help.
from spring-amqp.
Mark Lui commented
Here is my reconnect code. https://gist.github.com/817505
from spring-amqp.
Dave Syer commented
Thanks, that was helpful. I think I would refactor it to break the direct (cyclic) coupling between listener container and connection factory, but it looks like a useful approach. It doesn't deal with Channel death, only Connections, which makes it slightly distinct from some of the other suggestions.
Do you have any specific scenarios in which you see the Connection dying in practice? Is the connection closed accidentally? By an environment glitch? By the application?
from spring-amqp.
Mark Lui commented
We have not yet gone to production with RabbitMQ but have seen other messaging brokers lose their connection. This can happen if the message broker goes down for whatever reason and we need to restart. Or there can be a network glitch between colos that can cause the connection to go down.
from spring-amqp.
Dave Syer commented
The SimpleMessageListenerContainer together with CachingConnectionFactory now gives us:
- automatic support for re-connection if the listener fails and the broker is still available
- the option to inject advice around the listener, so declarative retry can be applied
I will add a test for the declarative retry (#1665) and Mark has said he might provide some namespace support for it (or at least for the advice chain).
from spring-amqp.
Related Issues (20)
- MessageProperties setDelay maximum value problem HOT 3
- ImmediateAcknowledgeAmqpException keeps the message in the queue HOT 10
- TraceId propagation to the new thread local HOT 4
- Wrong ClassLoader is used for message deserialization when devtools are active
- Wrong ClassLoader is used for message deserialization when devtools are active HOT 1
- Kotlin suspend functions return type is incorrect HOT 3
- Kotlin suspend functions return type is incorrect HOT 1
- Channel cache leak when no answers from broker for pending confirms
- Channel cache leak when no answers from broker for pending confirms HOT 1
- Invoke RabbitListenerErrorHandler when the batch of the listener is enabled HOT 2
- Document that micrometer observations aren't started for batch listeners HOT 4
- Unable to access AMQP Channel from RabbitListenerErrorHandler in case of MessageConversionException HOT 1
- Deadlock when reaching channel limit in DirectMessageListenerContainer HOT 5
- Remove deprecated method in the `RabbitListenerErrorHandler`
- DefaultMessagePropertiesConverter#toMessageProperties should handle x-delay in Short HOT 4
- DefaultMessagePropertiesConverter#toMessageProperties should handle x-delay in Short HOT 1
- Memory leak with AsyncRabbitTemplate HOT 5
- Memory leak with AsyncRabbitTemplate HOT 1
- Use JDK `ObjectInputFilter` instead of calling `AllowedListDeserializingMessageConverter::checkAllowedList` in `ConfigurableObjectInputStream::resolveClass` HOT 1
- Fix RabbitMQ x-death header documentation HOT 7
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spring-amqp.