Giter VIP home page Giter VIP logo

Comments (5)

barshow avatar barshow commented on September 27, 2024

I dont know any issues, my initial thought would be we're overwriting the outgoing routing key, but looking through the code I think we're protected from that. Can you run you test script again with DEBUG=* AMQP=3 that will print out all kinds of useful information. Thank you

from amqp-coffee.

barshow avatar barshow commented on September 27, 2024

actually run with AMQP=4

from amqp-coffee.

 avatar commented on September 27, 2024

@barshow So I wrote this script to isolate this interaction from the rest of my application, and recreate the issue.

The script creates a connection, creates a topic exchange, creates two queues and binds them to the exchange with unique routing keys, and then publishes to both queues.

I ran the script with the debug flags you mentioned and everything seemed normal. Unfortunately, both messages still ended up in one user's queue.

Please let me know if I'm overlooking something, because it seems that the lib is handling these publishes incorrectly.

var AMQP = require('amqp-coffee');
var async = require('async');

var users = [{
    'user_id': '123'
}, {
    'user_id': '456'
}];

var getTestConnection = function (cb) {
    console.log('creating connection');
    var opts = {
        'host': 'localhost'
    };
    var conn = new AMQP(opts, function (err, c) {
        if (err) return cb(err);
        cb(null, conn);
    });
};

var createTopicExchange = function (conn, done) {
    var name = 'inbox.events';
    if (typeof name != 'string') throw new Error('suppy topic exchange name');
    var opts = {
        'exchange': name,
        'type': 'topic',
        'passive': false,
        'durable': false
    };
    var declareExchange = function (opts, exchange, cb) {
        console.log('declaring exchange');
        exchange.declare(opts, function (err, res) {
            if (err) return cb(err);
            cb(null, exchange);
        });
    };
    var fns = [
        conn.exchange.bind(conn, opts),
        declareExchange.bind(null, opts)
    ];
    async.waterfall(fns, function (err, exchange) {
        if (err) done(err);
        done(null, conn);
    });
};

var createTwoQueuesAndBindToExchange = function (users, conn, done) {
    var queuePrefix = 'user.events';
    var createDeclareBind = function (u, cb) {
        var opts = {
            'queue': [queuePrefix, u.user_id].join('.'),
            'durable': false,
            'passive': false
        };
        var fns = [
            conn.queue.bind(conn, opts),
            function declareQueue (queue, next) {
                console.log('declaring queue')
                queue.declare(opts, function (err, res) {
                    if (err) return next(err);
                    next(null, queue);
                });
            },
            function bindQueue (queue, next) {
                console.log('binding queue');
                var routing = [queuePrefix, u.user_id, '#'].join('.');
                var name = [queuePrefix, u.user_id ].join('.');
                queue.bind('inbox.events', routing, name, function (err) {
                    if (err) return next(err);
                    next(null, queue);
                });
            }
        ];
        async.waterfall(fns, cb);
    };
    async.map(users, createDeclareBind, function (err, queues) {
        if (err) done(err);
        done(null, conn);
    });
};

function sendEvents(users, conn, done) {
    var msg = {'test': 'msg'};
    msg = new Buffer(JSON.stringify(msg));
    var publishOpts = {
        'confirm': true,
        'contentType': 'application/json'
    };
    var Publish = function (user, cb) {
        var uid = user.user_id;
        var pattern = ['user.events', uid].join('.');
        console.log('publishING to pattern >', pattern);
        conn.publish('inbox.events', pattern, msg, publishOpts, function (err) {
            if (err) return cb(err);
            console.log('publishED to pattern >', pattern);
            cb();
        });
    };
    async.map(users, Publish, function (err) {
        if (err) return done(err);
        console.log('publishing complete');
        done(null, conn);
    });
};

var fns = [
    getTestConnection,
    createTopicExchange,
    createTwoQueuesAndBindToExchange.bind(null, users),
    sendEvents.bind(null, users)
];

async.waterfall(fns, function (err, result) {
    if (err) throw err;
    console.log('script complete');
});

from amqp-coffee.

barshow avatar barshow commented on September 27, 2024

Wow great find! Thanks for taking the time and writing that test script. I would consider this a bug and its fixed in #35 or 0.1.22-rc5 The problem is the shared options object, didn't test for that and routing key was getting nested in it, and therefor clobbered in situations like this. I added a test and fixed that.

from amqp-coffee.

 avatar commented on September 27, 2024

No problem, thanks for the quick turnaround.

from amqp-coffee.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.