Giter VIP home page Giter VIP logo

rabbitmq-consistent-hash-exchange's Introduction

rabbitmq-consistent-hash-exchange's People

Contributors

acogoluegnes avatar camelpunch avatar dcorbacho avatar dumbbell avatar fenollp avatar gerhard avatar gmr avatar kjnilsson avatar lukebakken avatar michaelklishin avatar mordyovits avatar rade avatar spring-operator avatar videlalvaro avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-consistent-hash-exchange's Issues

Use retries when waiting for Mnesia tables to load

This plugin currently uses mnesia:wait_for_tables/1. RabbitMQ itself has moved to use a system with periodic retries instead of a single fixed value (as of 3.6.7 IIRC).

This plugin should do the same, otherwise deployments that use it will be plagued by the same cluster restart unpredictability
we've seen with a single attempt and a single timeout in the core.

Highly non-uniform distribution of messages over queue when using UUID as routing key

We are using consistent hash exchange to route messages from about 350 sources each with a distinct UUID (generated using v4 alg) to about 5-20 consumers by using the UUID as the routing key and are noticing highly non-uniform distribution of messages over the consumer queues - quite a few of the queues (>50%) are getting no messages.

After playing around with the jump_consistent_hash code it seems that erlang:phash2 output might not be uniformly distributed when fed string data (tried UUID's and random numbers in string form).

See the test code below that prints distribution of 350 random integers (0-10000) over 20 buckets. You can see that the distribution is reasonably uniform when jump_consistent_hash is fed integers, but quite non-uniform when fed strings.

-module(helloworld).
-export([start/0]).

-define(SEED_ALGORITHM, exs1024).

jump_consistent_hash(_Key, 1) ->
    0;
jump_consistent_hash(KeyList, NumberOfBuckets) when is_list(KeyList) ->
    jump_consistent_hash(hd(KeyList), NumberOfBuckets);
jump_consistent_hash(Key, NumberOfBuckets) when is_integer(Key) ->
    SeedState = rand:seed_s(?SEED_ALGORITHM, {Key, Key, Key}),
    jump_consistent_hash_value(-1, 0, NumberOfBuckets, SeedState);
jump_consistent_hash(Key, NumberOfBuckets) ->
    jump_consistent_hash(erlang:phash2(Key), NumberOfBuckets).

jump_consistent_hash_value(B, J, NumberOfBuckets, _SeedState) when J >= NumberOfBuckets ->
    B;

jump_consistent_hash_value(_B0, J0, NumberOfBuckets, SeedState0) ->
    B = J0,
    {R, SeedState} = rand:uniform_s(SeedState0),
    J = trunc((B + 1) / R),
    jump_consistent_hash_value(B, J, NumberOfBuckets, SeedState).

count(Needle, Haystack) -> count(Needle, Haystack, 0).
count(_, [], Count) -> Count;
count(X, [X|Rest], Count) -> count(X, Rest, Count+1);
count(X, [_|Rest], Count) -> count(X, Rest, Count).

countall(Stop, L) -> countall(0, Stop, L).
countall(Start, Stop, _) when Start == Stop -> [];
countall(Start, Stop, L) -> [{Start, count(Start, L)} | countall(Start+1, Stop, L)].

test(NBuckets, Clusters) ->
    Hashes = [erlang:phash2(C) || C<-Clusters],
    Buckets = [jump_consistent_hash(C, NBuckets) || C<-Clusters],
    Zipped = lists:zip3(Buckets, Hashes, Clusters),
    Sorted = lists:keysort(1, Zipped),
    io:fwrite("~p\n", [Sorted]),
    io:fwrite("~p\n", [countall(NBuckets, Buckets)]).

start() ->
    ClusterNums = [rand:uniform(10000) || _ <- lists:seq(1, 350)],
    ClusterNumStrs = [integer_to_list(N) || N <- ClusterNums],
    NBuckets = 20,
    test(NBuckets, ClusterNums),
    test(NBuckets, ClusterNumStrs).

{Bucket, NSources} when fed integres:

[{0,23},
 {1,15},
 {2,21},
 {3,27},
 {4,26},
 {5,14},
 {6,16},
 {7,18},
 {8,14},
 {9,9},
 {10,23},
 {11,16},
 {12,18},
 {13,21},
 {14,11},
 {15,12},
 {16,8},
 {17,19},
 {18,26},
 {19,13}]

{Bucket, NSources} when fed strings:

[{0,86},
 {1,0},
 {2,36},
 {3,0},
 {4,84},
 {5,81},
 {6,0},
 {7,0},
 {8,0},
 {9,0},
 {10,0},
 {11,63},
 {12,0},
 {13,0},
 {14,0},
 {15,0},
 {16,0},
 {17,0},
 {18,0},
 {19,0}]

rabbitmqctl status extract:

 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.7.7"},
      {amqp_client,"RabbitMQ AMQP Client","3.7.7"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.7"},
      {rabbitmq_consistent_hash_exchange,"Consistent Hash Exchange Type",
          "3.7.7"},
      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.7"},
      {rabbit,"RabbitMQ","3.7.7"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.7.7"},
      {cowboy,"Small, fast, modern HTTP server.","2.2.2"},
      {ranch_proxy_protocol,"Ranch Proxy Protocol Transport","1.5.0"},
      {ranch,"Socket acceptor pool for TCP protocols.","1.5.0"},
      {ssl,"Erlang/OTP SSL application","9.0"},
      {public_key,"Public key infrastructure","1.6"},
      {cowlib,"Support library for manipulating Web protocols.","2.1.0"},
      {crypto,"CRYPTO","4.3"},
      {asn1,"The Erlang ASN1 compiler version 5.0.6","5.0.6"},
      {os_mon,"CPO  CXC 138 46","2.4.5"},
      {jsx,"a streaming, evented json parsing toolkit","2.8.2"},
      {mnesia,"MNESIA  CXC 138 12","4.15.4"},
      {inets,"INETS  CXC 138 49","7.0"},
      {xmerl,"XML parser","1.3.17"},
      {recon,"Diagnostic tools for production use","2.3.2"},
      {lager,"Erlang logging framework","3.6.3"},
      {goldrush,"Erlang event stream processor","0.1.9"},
      {compiler,"ERTS  CXC 138 10","7.2"},
      {syntax_tools,"Syntax tools","2.1.5"},
      {syslog,"An RFC 3164 and RFC 5424 compliant logging framework.","3.4.2"},
      {sasl,"SASL  CXC 138 11","3.2"},
      {stdlib,"ERTS  CXC 138 10","3.5"},
      {kernel,"ERTS  CXC 138 10","6.0"}]},
 {os,{unix,linux}},
 {erlang_version,
     "Erlang/OTP 21 [erts-10.0] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:64] [hipe]\n"},

Documentation Lacking around Routing Key Implications

In my opinion you are missing some relevant information about the significance of the routing key. The questions I would like to see answered are:

Given a single consumer what should my routing key be? Based on the current documentation I think it should be less than thousands but as close to thousands as possible. So is 1000 a good number?

Given the addition of a new consumer should I adjust my bindings to reflect the additional consumer ie with 1 consumer we have a routing key of 1000 but with 2 we have a routing key of 500 on each queue because we expect the same throughput but spread across multiple queues.

Also, given the previous statement is true, why is this not just a percentage of total work load? Ie we you specify with the routing key what percentage of messages go to each queue. (this is why I don't think the previous statement is true)

Also, I will just throw it out there that I think you should handle a # being passed as the routing key. I started out without fully understanding and just assumed that a # would give me everything. That caused some real headaches.

Finally, I don't want to sound like a negative nancy so I will commend you on this excellent plugin. I will probably be sending a pull request in the near future. Our routing keys are of this nature ID.Hash.MessageType which will cause some slight issues when the message type is different. There is a chance that a messagem of type Foo will not go to the same queue as a message of type Bar, even though they have the same Identifier. Therefore, the feature I will (hopefully) add is the ability to specify what segment of the routing key to hash with some pattern matching. In our scenario something like #.#.* where hashes are matches and *'s are ignores or something similar.

Handling orphan queues

(This is not an issue, but a general AMQP question that arises when using this exchange type)

This exchange is aimed at scaling not by having n consumers for a queue, but having 1 queue for consumer, and n queues.

This requires flexibility in adding or removing queues. For instance, if I add a new worker to my worker set, I will create a new queue, bind it to the consistent-hash exchange, and it will start to get its share of messages. Likewise, if I remove a new worker from my worker set, (for maintenance, because it crashed, downscaling...) I need my queue to be removed as well, otherwise it will fill up with messages that won't get consumed.

As you can imagine, ignoring the race condition in the routing mentioned, I must guarantee that "at most" all messages are processed by my workers. Do you have any suggestion on how to deal with the messages that might be inside the queue that I need deleted?

Thanks.

same message with same routing key goes to different consumer

Hi again ;-)

What am I doing wrong?

I've written a little node.js sample code, can you help me?

var amqp = require('amqp');

var connection = amqp.createConnection();

connection.on('error', function (err) {
  console.log(err);
});

connection.on('ready', function () {
  console.log('connected to ' + connection.serverProperties.product);

  // exchange
  connection.exchange('e', {
    type: 'x-consistent-hash',
    autoDelete: true,
    durable: false
  }, function(exchange) {

    function makeQueue(name, points, callback) {
      connection.queue(name, {
        durable: false
      }, function(q) {

        q.bind(exchange, points);

        q.on('queueBindOk', function() {

          q.subscribe({
            ack: false
          }, function (msg) {
            console.log(name + ': ' + msg.data.toString());
          });

          q.on('basicConsumeOk', callback);
        });
      });
    }

    var done = 0;

    function check() {
      done++;
      if (done === 4) {
        startPublishing();
      }
    }

    makeQueue('q0', '10', check);
    makeQueue('q1', '10', check);
    makeQueue('q2', '20', check);
    makeQueue('q3', '20', check);

    function startPublishing() {
      for (var i = 0, len = 10; i < len; i++) {
        exchange.publish(i.toString(), 'message(orig) with routingKey: ' + i);
        exchange.publish(i.toString(), 'message(copy) with routingKey: ' + i);
      }
    }

  });
});

Choosing the seed for generated points in the hash space

At present, the points occupied by a consumer in the hash space are generated randomly. I have a use case where I would like to have those to produced consistently from a given seed value; this is to allow me to bind to more than one queue and have messages (with the same routing key) on those queues routed consistently to the same consumer.

One common way of doing this is to provide a seed value (often a string) and then repeatedly hashing to produce a sequence of hashes:

h1 = hash(seed)
h2 = hash(h1)
h3 = hash(h2)
...
hN = hash(hN-1)

Would this be possible maybe my appending an optional suffix to the binding key e.g. routing_key = <<"20,seedValue">>?

Exchange Continues To Return Routes To Removed Queue

Hi @emile,

I'm presently working on a feature addition rabbitmq/rabbitmq-server#29. The feature behavior has been changed (not yet pushed to github) so that all bindings are removed from the queue before they are then dead-lettered during queue expiration.

Here's the code being called to remove bindings from the expiring queue:

rabbit_misc:execute_mnesia_transaction(
    fun() -> rabbit_binding:remove_for_destination(Name) end),

Here's code checking the bindings remaining on the exchange (in this case a consistent hash exchange)

Bindings = rabbit_binding:list_for_source(X#exchange.name),

The list of bindings that are returned exclude the expiring queue (as expected), but during the dead-lettering publish call, when I check the results of rabbit_exchange:route(X, Delivery), it gives me back only the old route pointing to the expired queue.

I am still new to working in the rabbit code base, so perhaps I'm doing something dense, but it seems like the exchange should not continue returning a route for a queue that is no longer bound to it. Is there something additional that needs to be done in order to cause the consistent-hash-exchange to update the hash ring so that it will re-assign virtual nodes to the remaining bound queues?

Thanks for your help and for this plugin.
Alex

Hash on AMQP Message Properties

I'd like to add support for hash distribution on message properties such as message_id or correlation_id.

Before I write the code, I'd like validation that my proposed enhancement is what the team would like to see/merge. I'd be adding a new config value hash-property which you would specify the AMQP property that should be hashed on:

{<<"hash-property">>, longstr, <<"message_id">>}

To achieve this I would:

  • Implement validate/1 to check that if hash-property is set, the value is a valid AMQP property name. Additionally it would check to ensure that hash-header and hash-property are not both sent, failing if they are.
  • Modify route/2 to support hash-property
  • Add a new hash/2 to support hash-property

Does this make sense?

Message routing distribution among queues is not uniformly random

Overview

Hi RabbitMQ team. Before getting to the problem, I wanted to say that this exchange has worked well in our production environment. We use it to execute distributed caching in our applications, and it has been very reliable.

In our environment, we have 48 queues bound to a consistent hash exchange and each queue is bound with routing key "1". i.e. All have equal weighting. We are consistently pushing 1000 messages per second through this exchange. I noticed that the message distribution wasn't equal because some queues were receiving over 100 messages per second, and others were receiving close to 0 messages per second. These queues should all be receiving close to 20 messages per second. The 48 queues come from having 12 application instances create dedicated queues for themselves on every node of a 4-node cluster (4 * 12 = 48).

That is the problem "as seen in the wild". This negatively impacts us as the applications need to be provisioned for the worst case. i.e. Since one application instance has to handle hundreds of messages per second, they all have to. This is the reality of our deployment environment and we would rather provision our apps like this than put in the engineering to have per-instance app sizes based on queue publish rates.

Experiment

This problem description is a little vague though. I created an experiment to help diagnose the issue. This Gist has the final form of the experiment. The script has three modes: load, analyze and reset. The load mode creates the exchange, binds some number of queues, and publishes a large number of messages to the exchange. The analyze mode gets the number of messages in each queue and outputs some stats. The reset mode deletes the queues and exchanges. Here is example usage:

./consistent_hash_exchange_experiment.py user password hostname port vhost load
./consistent_hash_exchange_experiment.py user password hostname port vhost analyze
./consistent_hash_exchange_experiment.py user password hostname port vhost reset

Other parameters are hard coded inside the script.

Wrong Guesses

There were a few theories I had which turned out to be wrong. I'll explain them and indicate how this is handled in the experiment now.

  1. Repeated routing key
    My first guess is that certain routing keys were occurring more than others. Since the messages have the same routing key, they would go to the same queue due to consistent hashing. I verified this is not the case in our production environment. The experiment script uses a unique routing key for each message to accommodate.

  2. Bad hash function
    After some googling, I came across this Erlang mailing list thread. This made me think that the use of phash2 was the issue. I don't think this is the case anymore since I modified our production environment as well as the experiment to set the routing key to a cryptographic hash of the previous routing key. The issue still persisted after the change.

  3. Maybe this is actually from a uniform distribution
    Just because message distribution is supposed to be uniform across queues, it doesn't mean it will happen exactly like this in practice. The experiment writes the messages to the exchange and then messages are routed to the queues and sit in the queues. The experiment then queries the number of messages in each queue. The Chi-Squared goodness of fit test gives an indication of how likely it is that a set of data came from a given distribution. scipy.stats.chisquare performs this analysis on a List of bucket counts for the uniform distribution over the number of buckets. It outputs a p-value which can be interpreted as "what is the probability that these counts came from a uniform distribution". The p-values generated during most (I'll get to this later) experiments are extremely small.

Current Working Theory

The above incorrect theories pointed to the issue being with how buckets are assigned. This was somewhat confirmed by the source code. What follows is my understanding of how buckets are assigned. When a queue is bound to the exchange, a number of points in the hash space are generated and assigned to that queue (I call them "queue points"). Then when a message comes in, the routing key is hashed and the message is routed to the queue corresponding to the first queue point that the message point is greater than. I think this routing mechanism is fine. The problem lies in how the queue points are generated.

The number of queue points is equal to the number put in the routing key during binding. The queue points are randomly chosen. They are chosen according to a uniform distribution over [0, HASH_MAX] using phash. Since the points are generated randomly, there is no guarantee that "bucket sizes" will be equal. I believe the discrepancy in bucket size using this method is the cause of the non-uniform distribution for message routing.

Playing around with the routing key used for queue binding indicates this is the case. For example, binding all queues with "10" for the routing key gives each queue 10 buckets. Since each queue now has more randomly sized buckets, their collective share of messages should be more equal. This has been confirmed in our production environment where using "10" for the routing key makes the situation a little better, but does not solve the problem. Additionally, running the routing experiment with higher and higher binding keys produces higher p-values. There is a limit to this trick since performance drastically degrades. e.g. Running the experiment with the binding key set to "10000" with 100 queues produced a p-value of about 0.6 which is a good indication that the message distribution close to uniformly distributed. However, only 5 messages per second could be published to the exchange (using a binding key of "10" permitted 2500 messages per second). I suspect this is due to a prohibitively large mnesia table that stores queue points.

Suggested Fix

For anyone reading this with the same problem, I suggest bumping up the binding keys you are using as a workaround. Like I said, this won't fix the situation, but it will make it a little better. Also, don't make the values too large or you'll incur a performance hit.

I suggest the examples for this exchange be changed to use slightly higher binding keys in the short term. Maybe change some of the documentation language too? I'm not sure about this one.

In order to fix the issue, I think a better allocation strategy is needed for queue points. The easiest example to think of is that binding N queues with a binding routing key of "1" should result in N equally sized buckets. However, the general case of unequal weightings needs to be accommodated, and I'm not sure how to accomplish this in a smart manner like I'm suggesting.

Thanks for reading this issue, which seems more like a blog post now that I've written it! I'd be thrilled to discuss potential solutions or work on a PR if their is appetite.

Relevant Version Information

RabbitMQ Version: 3.7.6
Erlang Version: 20.3
Consisten Hash Exchange Version: 3.7.6
Server OS: Ubuntu 16.04 Server with 4.4.0-97-generic Kernel
Python Version: 3.5.2
Pika Version: 0.10.0
Client OS: Ubuntu 16.04 Server with 4.4.0-130-generic Kernel

Is header hashing supported?

I am pretty sure I read about header hashing in the readme, but now I can not find the reference. It seems to work with consisten hashing, but not with modulus hash.

Steps to reproduce:

  1. Create a modulus-hash exchange with a hash-header header
    screen shot 2015-01-29 at 11 15 23 am
  2. Apply sharding policy
rabbitmqctl set_policy images-shard "^shard.images$" '{"shards-per-node": 2, "routing-key": "1234"}'
  1. verify queues and bindings are created
    screen shot 2015-01-29 at 11 16 19 am
  2. publish a message to exchange using the specified hash-header
    screen shot 2015-01-29 at 11 16 28 am
  3. repeat previous step 10 times, increasing the value of hash-me by one each time

Expected result: One queue has 6 messages, other queue has 5

Observed result: One queue has 11 messages, the other is empty
screen shot 2015-01-29 at 11 17 38 am

Message Distribution is not uniform (at all) with weights less than 4

I'm using RabbitMQ 3.6.9 including the rabbitmq_consistent_hash_exchange 3.6.9.

Running the python example as-is does not produce anything close to a uniform message distribution between q1 and q2 (each with weight 1) or q3 and q4 (each with weight 2).

There is no discussion on why you would want to have some queues weighted higher than others. I'm assuming you would bind more powerful consumers to the higher weighted queues? A word or two on that would help.

In any event, all of my consumers are similar machines, so I changed all weights to the value of 1 with the expectation of a uniform distribution.

Here are the esults where q4 received roughly 62% of the messages while q2 received roughly 9% of the messages. Obviously this is not a uniform distribution.

q1: 16,849 q2: 9,423 q3: 11,531 q4: 62,197

Happily, changing the weight to 8 buckets for each queue (instead of 1) produces a very smooth distribution of about 25K messages each! Using random GUIDs as the routing key (instead of integer) also resulted in a nice uniform distribution.

I repeated the experiment with 4 buckets (weight) per queue and the distribution started showing signs of being lumpy. 3 was worse. 2 was worse still. 1 is all but unusable.

Queue deletion and binding removal can affect hash ring state consistency

In RabbitMQ 3.7.9, when I use certain bindings weights (e.g. 34, 100) when binding a queue, routing of message can fail, and I get the following type of errors in the RMQ log:

2019-01-07 10:22:38.768 [warning] <0.699.0> Bucket 6 not found

How to reproduce

The problem can be reproduced with a slightly modified version of Python tutorial 5 (https://www.rabbitmq.com/tutorials/tutorial-five-python.html). The only change is that the exchange_type is changed to 'x-consistent-hash' in both emit and receive code. I will add the full code at the bottom of this post.

To reproduce, do the following in the 'receive' shell:

python receive_logs_topic.py 34
<Ctrl-C>
python receive_logs_topic.py 34

So, on the receive side, we start the receive program (which binds an exclusive queue), stop the program, and restart (which binds a new exclusive queue). Notice that everything works fine if the receive program is not restarted. The deletion of the queue is what causes the problem.
Now, on the 'emit' shell:

python emit_logs_topic.py key value

The receive shell does not receive the message, and a warning is produced in the log.

I have used a binding weight of 100 successfully in the past on older versions of RMQ (e.g. 3.5.7). I suspect the problem is related to the changes made for #37. The weight of 34 is the lowest weight that seems to give this problem.

Additional information

  • RabbitMQ 3.7.9
  • Erlang 21.0.5
  • OS: CentOS Linux release 7.6.1810 (Core)
  • Plugin information:
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@sutoldev11686-2
 |/
[  ] rabbitmq_amqp1_0                  3.7.9
[  ] rabbitmq_auth_backend_cache       3.7.9
[  ] rabbitmq_auth_backend_http        3.7.9
[  ] rabbitmq_auth_backend_ldap        3.7.9
[  ] rabbitmq_auth_mechanism_ssl       3.7.9
[E*] rabbitmq_consistent_hash_exchange 3.7.9
[  ] rabbitmq_event_exchange           3.7.9
[E*] rabbitmq_federation               3.7.9
[E*] rabbitmq_federation_management    3.7.9
[  ] rabbitmq_jms_topic_exchange       3.7.9
[E*] rabbitmq_management               3.7.9
[e*] rabbitmq_management_agent         3.7.9
[  ] rabbitmq_mqtt                     3.7.9
[  ] rabbitmq_peer_discovery_aws       3.7.9
[  ] rabbitmq_peer_discovery_common    3.7.9
[  ] rabbitmq_peer_discovery_consul    3.7.9
[  ] rabbitmq_peer_discovery_etcd      3.7.9
[  ] rabbitmq_peer_discovery_k8s       3.7.9
[  ] rabbitmq_random_exchange          3.7.9
[  ] rabbitmq_recent_history_exchange  3.7.9
[  ] rabbitmq_sharding                 3.7.9
[E*] rabbitmq_shovel                   3.7.9
[E*] rabbitmq_shovel_management        3.7.9
[  ] rabbitmq_stomp                    3.7.9
[E*] rabbitmq_top                      3.7.9
[  ] rabbitmq_tracing                  3.7.9
[  ] rabbitmq_trust_store              3.7.9
[e*] rabbitmq_web_dispatch             3.7.9
[  ] rabbitmq_web_mqtt                 3.7.9
[  ] rabbitmq_web_mqtt_examples        3.7.9
[  ] rabbitmq_web_stomp                3.7.9
[  ] rabbitmq_web_stomp_examples       3.7.9

emit_logs_topic.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='x-consistent-hash')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

receive_logs_topic.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='x-consistent-hash')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

Exchange perfomance

Hi. If I understand correctly if add more nodes to my cluster will I get more perfomance on my consistence hash exchange? thx

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.