ppat / storm-rabbitmq Goto Github PK
View Code? Open in Web Editor NEWA library of tools for interacting with RabbitMQ from Storm.
License: MIT License
A library of tools for interacting with RabbitMQ from Storm.
License: MIT License
This could be used for ConnectionConfig so that it could rely on those and hence allow the users not to have to specify the default values explicitely
Hi ppat, I like your library but I'm seeing issues with speed. I'm writing an application which gets continually bombarded with messages so I always have a very large number of messages on the queue: in the order of thousands. I used your basic connection code from the readme and noticed when I run my app that it starts off well, it processes messages very fast but that gradually slows down to a crawl. I was looking through your code and I couldn't see anything immediately obvious, have you experienced this before?
In class
RabbitMQMessageScheme
In function
private Map<String, Object> serialiazableHeaders(Map<String, Object> headers)
The values are instances of
com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
I expect them to be
String
As a result, after looping with the instanceof
the serializableHeaders comes as empty Map.
My only rabbit dependency is:
io.latent
storm-rabbitmq
0.6.0
It appears that the latest version of storm-rabbitmq in maven central is 0.6.0. Any chance 0.6.1 could be put there?
How to add configuration for RabbitMQSpout with Trident?
Hello,
When using the rabbitmq spout in a storm cluster with multiple nodes, I am running into this exception.
java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) ~[storm-core-0.9.0.1.jar:na]
...
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_51]
It looks like this is due to the DefaultClientProperties of the AMQP connection factory. The snapshot below shows one of the flows where this is getting added.
LongStringHelper$ByteArrayLongString.<init>(byte[]) line: 44
LongStringHelper.asLongString(String) line: 101
AMQConnection.defaultClientProperties() line: 73
ConnectionFactory.<init>() line: 88
ConnectionConfig.asConnectionFactory() line: 68
RabbitMQConsumer.<init>(ConnectionConfig, int, String, boolean, Declarator, ErrorReporter) line: 38
RabbitMQSpout.loadConsumer(Declarator, ErrorReporter, ConsumerConfig) line: 64
RabbitMQSpout.open(Map, TopologyContext, SpoutOutputCollector) line: 54
I tried to configure storm to suggest to use Kryo serializer for ByteArrayLongString with a config like this:
topology.kryo.register:
- com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
But it did not work. Also, I cannot write my own custom serializer because the ByteArrayLongString class is private.
Any idea how to resolve this? Anything we can do in the storm-rabbitmq library to address this issue?
Thanks!
Vinay
Following the guide for SSL/TLS configuration on the RabbitMQ site, requires that a Truststore Manager is given to the sslConfigure()
method. In addition to the existing ssl
configuration, it needs to accept keystore and truststore paths along with the passphrases (optionally).
I created a pull request to address this.
Would like to output tuples on multiple streams for the same incoming message, instead of one or the other via a split with MultiStreamSpout.
Hi,
I just begin to use storm and I want to integrate it with rabbitmq. Can anyone provide a sample code to implemente the backtype.storm.spout.Scheme to deserialize a RabbitMQ message payload?
A simple example would be of great help. I am confused about this part. I am using json style message in my rabbitmq payload.
Thanks.
Currently, if I specify the Rabbit Host Parts via topology config, I get a a NPE when it is trying to configure the connection. It looks up HA Hosts and fails because it is null (not specified). If I use ""
for Hosts, it fails again trying to use the value as a host. The Config.getFromMap() needs to check for null before trying .toString()
.
I fixed this in a pull request.
If I'm not mistaken, the RabbitMQBolt calls by default the producer method with an empty routing key producer.send(scheme.produceMessage(tuple)) even when using the variation of the constructor with the Declarator, which includes a routing key among other things. I find this a bit confusing. I would expect the send(Message, routingKey) method to be used when a Declarator is supplied. It might be a matter of personal taste though...
We build a cluster of RabbitMQ and would like to apply its HA feature by modifying some code to meet requirement of rabbitmq java client. Just to replace argument from (host, port) to (address[]) and call different newConnection method by argument list. It will be great if the storm-rabbitmq component would support this feature as well.
Regards.
Please refer to the client API as below
http://www.rabbitmq.com/javadoc/com/rabbitmq/client/ConnectionFactory.html#newConnection(com.rabbitmq.client.Address[])
The io.latent.storm.rabbitmq.RabbitMQSpout
class doesn't override BaseRichSpout
activate and deactivate methods.
Here's my proposal
activate
method is following: @Override
public void activate() {
logger.info("activating spout!");
consumer = loadConsumer(declarator, reporter, consConfig);
scheme.open(topoConfig, topoContext);
consumer.open();
super.activate();
}
We will cache topoConfig
and topoContext
from the open
method.
deactivate
method is following: @Override
public void deactivate() {
logger.info("deactivating spout!");
consumer.close();
scheme.close();
consumer = null;
super.deactivate();
}
I'll update this issue with my code in my development branch once it finishes
dsad
Using the RabbitMQSpout
successfully but cannot get the RabbitMQBolt
(sink) to work. I am using the exact the same ConnectionConfig
for both.
The RabbitMQ management console shows the RabbitMQBolt
connecting successfully, but the target queue is not filling up and the RabbitMQ tracer is not detecting the bolt sending any publish requests.
I have verified that the bolt is receiving tuples by debugging the extractBody
method of my TupleToMessageNonDynamic
implementation.
Dependencies:
org.apache.storm, storm-core, 0.9.4
io.latent, storm-rabbitmq, 0.6.1
com.typesafe, config, 1.2.1
Can you support Trident spout?
I've scoured the internet, but I cannot find a repo that has the storm-rabbitmq module in it, and you don't mention one in your documentation. Would it be possible to publish it to Clojars, or Maven central?
Hi @ppat , could you release version 1.0.0 to maven central? Thanks
(Wasn't sure you'd see my comment on the commit log)
This prevents subclasses from overriding the methods unless they're in the same package.
The storm dependency gives me problems when uploading a topology with storm-rabbitmq to a storm cluster. The error that storm gives me is:
Exception in thread "main" java.lang.RuntimeException: Found multiple
defaults.yaml resources. You're probably bundling the Storm
jars with your topology jar.
Shouldn't the storm dependency be marked with <scope>provided</scope>
?
For now, I'm removing storm from my topology with this:
<!-- Reading from RabbitMQ with storm-rabbitmq -->
<dependency>
<groupId>io.latent</groupId>
<artifactId>storm-rabbitmq</artifactId>
<version>0.3.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
</exclusion>
</exclusions>
</dependency>
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.