Giter VIP home page Giter VIP logo

amqp-coffee's Introduction

amqp-coffee

Build Status Node.JS AMQP 0.9.1 Client

Sample

AMQP = require('amqp-coffee') # path to this

testData = "the data to be published..  I am a string but could be anything"

amqpConnection = new AMQP {host:'localhost'}, (e, r)->
  if e?
    console.error "Error", e

  amqpConnection.queue {queue: "queueName"}, (e,q)->
    q.declare ()->
      q.bind "amq.direct", "queueName", ()->

      amqpConnection.publish "amq.direct", "queueName", testData, {confirm: true}, (err, res)->
        console.log "Message published"

      consumer = amqpConnection.consume "queueName", {prefetchCount: 2}, (message)->
        console.log message.data.toString()
        message.ack()

      , (e,r)->
        console.log "Consumer setup"
        amqpConnection.publish "amqp.direct", "queueName", "message contents", {deliveryMode:2, confirm:true}, (e, r)->
          if !e? then console.log "Message Sent"

Methods

new amqp-coffee([connectionOptions],[callback])

Creates a new amqp Connection. The connection is returned directly and in the callback. The connection extends EventEmitter.

The callback is called if there is a sucessful connection OR a unsucessful connection and connectionOptions.reconnect is false. If connectionOptions.reconnect is false, you will get a error back in the callback. If no callback is specified it will be emitted.

The connectionOptions argument should be an object which specifies:

  • host: a string of the hostname OR an array of hostname strings OR an array of hostname objects {host, port}
  • port: a integer of the port to connect to. Not used if host is an object.
  • login: "guest"
  • password: "guest"
  • vhost: '/'
  • port: 5672
  • heartbeat: 10000 # in ms
  • reconnect: true
  • reconnectDelayTime: 1000 # in ms
  • hostRandom: false
  • `connectTimeout: 30000 # in ms, this is only used if reconnect is false
  • clientProperties : {version: clientVersion, platform, product}
  • ssl: false
  • sslOptions : {} # tls options like cert, key, ca, secureProtocol, passphrase
  • temporaryChannelTimeout: 2000 # in ms, temporary channels are used to setup queues, bindings, and exchanges. If you are frequently tearing down and setting up new queues it could make sense to make this longer.
  • noDelay: true # disable Nagle's algorithm by default

Host Examples

host: 'localhost'
host: {host: 'localhost', port: 15672}
host: ['localhost','yourhost']
host: [{host: 'localhost', port:15672}, {host: 'localhost', port:15673}]

Sample Connection

amqp-coffee = require('amqp-coffee')

amqp = new amqp-coffee {host: 'localhost'}, (error, amqpConnection)->
   assert(amqp == amqpConnection)

Reconnect Flow

On a connection close, we start the reconnect process if reconnect is true. After the reconnectDelayTime the hosts are rotated if more than one host is specified. A new connection is atempted, if the connection is not sucessful this process repeats. After a connection is re-establed, all of the channels are reset, this atempts to reopen that channel. Different channel types re-establish there channels differently.

  • Publisher channels, will only reconnect when a publish is atempted.
  • Consumer channels will reconnect and resume consuming. If it was a autoDelete queue, this could fail. Make sure you listen to the ready even on the connection to re-set up and consume any autoDelete queues.
  • Queue / Exchange channels are recreated on demand.

Event: 'ready'

Emitted when the connection is open successfully. This will be called after each successful reconnect.

Event: 'close'

Emitted when a open connection leaves the ready state and is closed.

Event: 'error'

Very rare, only emitted when there's a server version mismatch

connection.queue([queueOptions],[callback])

This returns a channel that can be used to declare, bind, unbind, or delete queus. This on its own does NOT declare a queue. When creating a queue class using connection.queue, you can specify options that will be used in all the child methods.

The queueOptions argument should be an object which specifies:

  • queue: a string repensenting the queue name, can also be empty to use a autogenerated queue name
  • autoDelete: default: true
  • noWait: default: false
  • exclusive: default: false. The queue can only be used by the current connection.
  • durable: default: false
  • passive: default: false. The queue creation will not fail if the queue already exists.
  • arguments: default: {}. Pass queue configuartion arguments, e.g. 'x-dead-letter-exchange'.

Both queues and exchanges use "temporary" channels, which are channels amqp-coffee manages specifically for declaring, binding, unbinding, and deleting queues and exchanges. After 2 seconds of inactivity these channels are closed, and reopened on demand.

queue.declare([queueOptions],[callback])

Will take a new set of queueOptions, or use the default. Issues a queueDeclare and waits on queueDeclareOk if a callback is specified.

amqp = new AMQP, ()->
  amqp.queue({queue:'queueToCreate'}, (err, Queue)->
    Queue.declare (err, res)->
      # the queue is now declared

To use a auto-generated queue name

amqp = new AMQP, ()->
  amqp.queue({queue:''}, (err, Queue)->
    Queue.declare (err, res)->
      queueName = res.queue

queue.delete([queueDeleteOptions],[callback])

The queueDeleteOptions argument should be an object which specifies:

  • queue: name of the queue
  • ifUnused: default: false
  • ifEmpty: default: true
  • noWait: default: false

queue.bind(exchange, routingkey, [queueName], [callback])

Sets up bindings from an already existing exchange to an already existing queue

queue.unbind(exchange, routingKey, [queueName], [callback])

Tears down an already existing binding

queue.messageCount(queueOptions, callback)

Rabbitmq specific, re-declares the queue and returns the messageCount from the response

queue.consumerCount(queueOptions, callback)

Rabbitmq specific, re-declares the queue and returns the consumerCount from the response

connection.exchange([exchangeArgs],[callback])

This returns a channel that can be used to declare, bind, unbind, or delete exchanges. This on its own does NOT declare a exchange. When creating an exchange class using connection.exchange, you can specify options that will be used in all the child methods.

The exchangeArgs argument should be an object which specifies:

  • exchange: a string representing the exchange name
  • type: "direct"
  • passive: false
  • durable: false
  • noWait: false
  • autoDelete: true
  • internal: false

Both queues and exchanges use "temporary" channels, which are channels amqp-coffee manages specifically for declaring, binding, unbinding, and deleting queues and exchanges. After 2 seconds of inactivity these channels are closed, and reopened on demand.

exchange.declare([exchangeArgs],[callback])

exchange.delete([exchangeDeleteOptions], [callback])

The exchangeDeleteOptions argument should be an object which specifies:

  • exchange: the name of the exchange
  • ifUnused: false
  • noWait: false

exchange.bind(destinationExchange, routingKey, [sourceExchange], [callback])

Rabbitmq Extension, to bind between exchanges, sourceExchange if omitted will be defaulted to the exchange it's being called on.

exchange.unbind(exchange.unbind(destinationExchange, routingKey, [sourceExchange], [callback])

Rabbitmq Extension, to bind between exchanges, sourceExchange if omitted will be defaulted to the exchange it's being called on.

connection.publish(exchange, routingKey, data, [publishOptions], [callback])

amqp-coffee manages publisher channels and sets them up on the first publish. Confirming is a state a channel must be put in, so a channel is needed for confimed publishes and one for non confimed publishes. They are only created on demand. So you should have a maximum of 2 channels publishing for a single connection.

New in 0.1.20 if you set the mandatory or immediate flag with the confirm flag we add a tracking header on that message headers.x-seq which is a numeric representation of that message just like the sequence number. That flag is used to re-connect a messages that has failed publishing and come back as a "basicReturn" to a already existing callback. This allows you to publish to a queue that may not exist and get a bounce if it doesnt. Or if a queue is in a bad state the message will fail routing and come back.

  • exchange: string of the exchange to publish to
  • routingKey: string to use to route the message
  • data: any type of data, if it is an object it will be converted into json automatically and unconverted on consume. Strings are converted into buffers.
  • publishOptions: All parameters are passed through as arguments to the publisher.
    • confirm: false
    • mandatory: false
    • immediate: false
    • contentType: 'application/octet-stream'

connection.consume(queueName, options, messageListener, [callback])

consumers use their own channels and are re-subscribed to on reconnect. Returns a consumer object.

  • queueName: string of the queue to subscribe to
  • options:
    • noLocal: false
    • noAck: true
    • exclusive: false
    • noWait: false
    • prefetchCount : integer. If specified the consumer will enter qos mode and you will have to ack messages. If specified noAck will be set to false
    • consumerTag: optional string. If not specified one will be generated for you.
  • messageListener: a function (message)
  • callback: a function that is called once the consume is setup

messageListener is a function that gets a message object which has the following attributes:

  • data: a getter that returns the data in its parsed form, eg a parsed json object, a string, or the raw buffer
  • raw: the raw buffer that was returned
  • properties: headers specified for the message
  • size: message body size
  • ack(): function : only used when prefetchCount is specified
  • reject(): function: only used when prefetchCount is specified
  • retry(): function: only used when prefetchCount is specified
listener = (message)->
  # we will only get 1 message at a time because prefetchCount is set to 1
  console.log "Message Data", message.data
  message.ack()

amqp = new AMQP ()->
  amqp.queue {queue: 'testing'}, (e, queue)->
    queue.declare ()->
      queue.bind 'amq.direct', 'testing', ()->
        amqp.publish 'amq.direct', 'testing', 'here is one message 1'
        amqp.publish 'amq.direct', 'testing', 'here is one message 2'

      amqp.consume 'testing', {prefetchCount: 1}, listener, ()->
        console.log "Consumer Ready"

consumer Event: error

Errors will be emitted from the consumer if we can not consumer from that queue anymore. For example if you're consuming a autoDelete queue and you reconnect that queue will be gone. It will return the raw error message with code as the message.

consumer Event: cancel

The cancel event will be emitted from the consumer if we receive a server initiated "basic.cancel". For this to happen you must let the server know you are expecting a cancel, you do this by specifying clientProperties on connect. clientProperties: { capabilities: { consumer_cancel_notify: true }} https://www.rabbitmq.com/consumer-cancel.html

consumer.setQos(prefetchCount, [callback])

Will update the prefetch count of an already existing consumer; can be used to dynamically tune a consumer.

consumer.cancel([callback])

Sends basicCancel and waits on basicCancelOk

consumer.pause([callback])

consumer.cancel

consumer.close([callback])

Calls consumer.cancel, if we're currently consuming. Then calls channel.close and calls the callback as soon as the channel close is sent, NOT when channelCloseOk is returned.

consumer.resume([callback])

consumer.consume, sets up the consumer with a new consumer tag

consumer.flow(active, [callback])

An alias for consumer.pause (active == false) and consome.resume (active == true)

connection.close()

More documentation to come. The tests are a good place to reference.

Differences between amqp-coffee and node-amqp

First of all this was heavily inspired by https://github.com/postwait/node-amqp

Changes from node-amqp

  • the ability to share channels intelligently. ( if you are declaring multiple queues and exchanges there is no need to use multiple channels )
  • auto channel closing for transient channels ( channels used for declaring and binding if they are inactive )
  • consumer reconnecting
  • fixed out-of-order channel operations by ensuring things are writing in order and not overwriting buffers that may not have been pushed to the network.
  • switch away from event emitters for consumer acks
  • everything that can be async is async
  • native bson support for messages with contentType application/bson
  • ability to delete, bind, and unbind a queue without having to know everything about the queue like auto delete etc...
  • can get the message and consumer count of a queue
  • can turn flow control on and off for a consumer (pause, resume) receiving messages.
  • rabbitmq master queue connection preference. When you connect to an array of hosts that have queues that are highly available (HA) it can talk to the rabbit api and make sure it talks to the master node for that queue. You can get way better performance with consumer acks.

amqp-coffee's People

Contributors

avvs avatar barshow avatar beppu avatar donkeyhighway avatar jdubie avatar markherhold avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqp-coffee's Issues

Publish hangs forever after connection loss (reconnect = false)

Hi,

I'm using amqp-coffee with reconnect = false option. Below is an example that reproduces the issue I faced recently. The example is a simple producer that publishes new message every second:

const AMQP = require('amqp-coffee');

const amqp = new AMQP({host: 'localhost', reconnect: false});

setInterval(() => {
  amqp.publish('coffee:test', '', 'hello', {}, (err) => {
    if (err) return console.log('Error: ' + err);
    console.log('Published');
  });
}, 1000);

The scenario is following:

  1. run the example script
  2. wait until at least one message is published
  3. shut down message broker server

Expected result:
Option 1. For all next publishes the callback is invoked with an error
Option 2. The process crashes because of unhandled exception

Actual result:
Next publishes hang forever. Callbacks are never called. As I understand, publish waits until connection is back, but since reconnect is disabled it never happens.

What do you think?

nodejs 4.x and 5.x support.

I've been working with these engines for a while now and it works pretty well, imo, npm constraints should include these version, since it's safe to use

tls support

I've been trying to integrate one, but with no success so far, main culprit is how reconnect is handled. basically what's done is called socket.connect() on a severed net socket, which reuses one, problem is that it doesnt work with tls (or I just dont know how to make it work). I wondered if it's better to abstract (re)connecting to reconnect-core module and setup proper handler there?

consumerCount on disabled connection

if i will call queue.consumerCount or queue.messageCount durning connection problems - it stops fire callbacks, even if connection will back later, is something remain closed?

idea to fix:

  • set flag to true on amqp ready event, and set to false on amqp close event
  • return specific error in consumerCount/messageCount callback if flag is not active

Performing a Connection Healthcheck

Hi, I'd like to determine if a/all of the currently established connections to RabbitMQ are healthy to help assess the overall health of my application periodically. Can you give a recommendation to doing this?

One thought I had would be to use the existing client, kick off a heartbeat and listen for the response. However, I don't know how this would affect the amqp-coffee internals given that amqp-coffee would be performing its own healthchecks. Thoughts?

Awesome module by the way! ๐Ÿ˜

direct reply-to & bug in the fields serialization

Hey,

I've been playing around with RPC modes & priority queues and noticed that there is a bug:

https://github.com/dropbox/amqp-coffee/blob/master/src/lib/Connection.coffee#L438

Specifically if value of the field is 0

  if args[field.name]

that check wont pass and there would be inconsistency between headers payload and actual payload (example, priority: 0). That being said we should simply check for hasOwnProperty to fix that issue

On the second note - https://www.rabbitmq.com/direct-reply-to.html - this is a nice feature to have, but require that publisher & consumer uses the same channel, which is currently not possible as they are always on the separate channels. I looked at how to do it myself, but didn't come to any conclusion - could be a decent idea to write an RPC channel with capabilities of both, but then I wouldnt want to duplicate a lot of code from Publisher/Consumer channels

What do you think is the best way to implement it?

working with activeMQ?

Has anyone tried to make this work with activeMQ instead of rabbit?
according to docs it should click, and my activeMQ instance is configured with:
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>

yet, if i try to run the basic.coffee from the examples (with port 5672 added), it just hangs around and i donยดt see it appear as a consumer in my activemq or do anything useful until i kill it again.

amazon gift code or pizza vouchers on offer for any helpful pointers!

best regards
Andreas

Error: Unmatched field {"name":"queue","domain":"shortstr"}

I'm getting an error when I try to call q.declare()

Error: Unmatched field {"name":"queue","domain":"shortstr"} at serializeFields (node_modules/amqp-coffee/bin/src/lib/serializationHelpers.js:261:19)

does this look familiar to anyone?

Message handler not fired for blank message in QOS mode

Hi, I have an issue where a blank message (0 size, empty body) published to a queue I'm consuming with amqp-coffee stops the consumer.
I'm using QOS mode to validate each message and then reject it or process and ack it. When a blank message gets published to the queue, my message handler is not fired and the message can be neither acked nor rejected โ€“ the consumer just stops working. When I disable prefetchCount, the empty messages are discarded silently.
My guess is that somewhere a callback is not called and everything just stops. I tried debugging it but couldn't get anywhere. Maybe it's something you can look into? I'll gladly help.

CHANNEL_ERROR - unexpected method in connection state running

Any idea what this means and how it can be fixed? Happens on reconnect attempt

12:31:58 arkmail-backend: connected to rabbit@ubuntu72 v3.5.3
12:31:58 arkmail-backend: connection is closed. Had error: { [Error: CHANNEL_ERROR - unexpected method in connection state running] replyCode: undefined }

And this would go on infinitely until I restart the process. Thanks for the help

Attempt to allocate Buffer larger than maximum size: 0x3fffffff bytes

iojs v2.4.0

mservice-arkmail-api-0 (err): RangeError: Attempt to allocate Buffer larger than maximum size: 0x3fffffff bytes
mservice-arkmail-api-0 (err):     at checked (buffer.js:174:11)
mservice-arkmail-api-0 (err):     at fromNumber (buffer.js:57:51)
mservice-arkmail-api-0 (err):     at new Buffer (buffer.js:42:5)
mservice-arkmail-api-0 (err):     at AMQPParser.header (/opt/arkmail-backend/node_modules/arkmail-backend/node_modules/mservice-utils/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:47:28)
mservice-arkmail-api-0 (err):     at AMQPParser.execute (/opt/arkmail-backend/node_modules/arkmail-backend/node_modules/mservice-utils/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:30:33)
mservice-arkmail-api-0 (err):     at Socket.<anonymous> (/opt/arkmail-backend/node_modules/arkmail-backend/node_modules/mservice-utils/node_modules/amqp-coffee/bin/src/lib/Connection.js:419:31)
mservice-arkmail-api-0 (err):     at emitOne (events.js:77:13)
mservice-arkmail-api-0 (err):     at Socket.emit (events.js:169:7)
mservice-arkmail-api-0 (err):     at readableAddChunk (_stream_readable.js:146:16)
mservice-arkmail-api-0 (err):     at Socket.Readable.push (_stream_readable.js:110:10)
mservice-arkmail-api-0 (err):     at TCP.onread (net.js:521:20)

heartbeat troubles

I have same bug described in #23

Connection is closing after [heartbeat * 3] ms

Here is code which can reproduce bug on my localhost rabbit (mac) and server rabbit (ubuntu:
https://gist.github.com/vkfont/65d77cf461c075ebba60

Andreys-MacBook-Air:mservice-utils drago$ node debug/test
connected to rabbit@Andreys-Air v3.5.0
queue "amq.gen-xm_fKnPv_bzjKuguX8KsQQ" created
connection is closed: 30000ms
connected to rabbit@Andreys-Air v3.5.0
events.js:87
      throw Error('Uncaught, unspecified "error" event.');
            ^
Error: Uncaught, unspecified "error" event.
    at Error (native)
    at Consumer.emit (events.js:87:13)
    at Consumer._channelClosed (/Users/drago/Documents/ark/mservice-utils/node_modules/amqp-coffee/bin/src/lib/Consumer.js:183:14)
    at Consumer._channelClosed (/Users/drago/Documents/ark/mservice-utils/node_modules/amqp-coffee/bin/src/lib/Consumer.js:4:61)
    at Consumer.Channel._onChannelMethod (/Users/drago/Documents/ark/mservice-utils/node_modules/amqp-coffee/bin/src/lib/Channel.js:326:16)
    at Connection._onMethod (/Users/drago/Documents/ark/mservice-utils/node_modules/amqp-coffee/bin/src/lib/Connection.js:571:39)
    at AMQPParser.<anonymous> (/Users/drago/Documents/ark/mservice-utils/node_modules/amqp-coffee/bin/src/lib/Connection.js:4:61)
    at AMQPParser.emit (events.js:118:17)
    at AMQPParser.parseMethodFrame (/Users/drago/Documents/ark/mservice-utils/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:110:19)
    at AMQPParser.frameEnd (/Users/drago/Documents/ark/mservice-utils/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:80:16)

high latency on VM machines

This isn't issue of this library, but I was wondering if you might have had similar issues and solved them already. I've already spent 6 hours researching and tuning anything that is tunable, but with no luck at all.

Issue is described here: rabbitmq/rabbitmq-server#564

TL;DR:
publisher - broker - consumer chain for 1 message takes 20ms on a VM and 0.5ms on my laptop

node-amqp differences

CoffeeScript aside, it would be awesome if you could document in the README (or maybe blog post) why you decided to write this driver vs use the node-amqp driver. Including what issues this driver addresses that may have been over looked or in node-amqp.

(node) warning: possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit.

Is this a sign something is wrong, or expected? The line in the stack trace that refers to my code points to empty space, which seems a little weird. The line right before it does message.ack(); in a consume callback.

(node) warning: possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit.
Trace
    at Connection.EventEmitter.addListener (events.js:160:15)
    at Consumer.Channel (/home/sgb/ui_client_adapter/node_modules/amqp-coffee/bin/src/lib/Channel.js:61:23)
    at new Consumer (/home/sgb/ui_client_adapter/node_modules/amqp-coffee/bin/src/lib/Consumer.js:49:38)
    at ChannelManager.consumerChannel (/home/sgb/ui_client_adapter/node_modules/amqp-coffee/bin/src/lib/ChannelManager.js:89:11)
    at Connection.consume (/home/sgb/ui_client_adapter/node_modules/amqp-coffee/bin/src/lib/Connection.js:242:34)
    at /home/sgb/ui_client_adapter/lib/livestreamer.js:167:47
    at doneFn (/home/sgb/ui_client_adapter/node_modules/amqp-coffee/bin/src/lib/Channel.js:240:11)
    at TemporaryChannel.Channel._onChannelMethod (/home/sgb/ui_client_adapter/node_modules/amqp-coffee/bin/src/lib/Channel.js:315:37)
    at Connection._onMethod (/home/sgb/ui_client_adapter/node_modules/amqp-coffee/bin/src/lib/Connection.js:538:39)
    at AMQPParser.<anonymous> (/home/sgb/ui_client_adapter/node_modules/amqp-coffee/bin/src/lib/Connection.js:4:61)

Exclusive consumers throw errors

Hey. Great job on the wrapper. I'm facing an issue with exclusive consumers, however. When I set a consumer to exclusive, and try to run my workers from more than 1 machine, the second (and on) machine(s) throw errors on queue init. This makes sense, considering Rabbit throws an error when you try to connect to an already-consumed exclusive queue.

What's the right procedure here?

Not able to connect to RabbitMQ cluster setup on Play with docker

I have setup Cluster on DinD PWD which creates and run cluster i am able to use management ui and see members in cluster but I am not able to connect to it there is no response for connection and client hangs.

version: "3"
services:
  rabbit1:
    image: lucifer8591/rabbitmq-server:3.7.17
    hostname: rabbit1
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER:-admin}
      - RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS:-admin}
  rabbit2:
    image: lucifer8591/rabbitmq-server:3.7.17
    hostname: rabbit2
    links:
      - rabbit1
    environment:
      - CLUSTERED=true
      - CLUSTER_WITH=rabbit1
      - RAM_NODE=true
    ports:
      - "5673:5672"
      - "15673:15672"
  rabbit3:
    image: lucifer8591/rabbitmq-server:3.7.17
    hostname: rabbit3
    links:
      - rabbit1
      - rabbit2
    environment:
      - CLUSTERED=true
      - CLUSTER_WITH=rabbit1
    ports:
      - "5674:5672"
  const connectOptions = {
    host: [
      { host: 'ip172-18-0-40-bnf0r3vad2eg00bttg0g-5672.direct.labs.play-with-docker.com', port:5672 },
      { host: 'ip172-18-0-42-bnf0r3vad2eg00bttg0g-5673.direct.labs.play-with-docker.com', port: '5673' },
{ host: 'ip172-18-0-42-bnf0r3vad2eg00bttg0g-5674.direct.labs.play-with-docker.com', port: '5674' },

    ],
    login: "admin",
    password: "admin",
    heartbeat: 60,
    reconnect: true,
  }

  const connection = new AMQP(connectOptions);

connection.on('ready', () => console.log('Connected))
connection.on('error', () => console.log('error'))

Handle method.basicCancel

When we have the following options and #51 is merged in consumer gets new events

{
      host: 'localhost',
      port: 5672,
      login: 'guest',
      password: 'guest',
      vhost: '/',
      temporaryChannelTimeout: 6000,
      clientProperties: {
        capabilities: {
          consumer_cancel_notify: true,
        },
      },
},

Example log:

  amqp:AMQPParser 2 > method basicCancel {"consumerTag":"Vitalys-iMac.local-30767-1452811088079","noWait":1} +20s
  amqp:Consumer onMethod basicCancel, {"consumerTag":"Vitalys-iMac.local-30767-1452811088079","noWait":1} +1ms

It would be nice to have https://github.com/dropbox/amqp-coffee/blob/master/src/lib/Consumer.coffee#L196 extended with a handler for such an event (https://www.rabbitmq.com/consumer-cancel.html). It could simply emit an event, which a user must handle, or automatically try to reconnect. To synthetically generate such an event - create a consumer and then purge queue/exchange in the rabbitmq management console

Problem with contentType of published messages

Hi, I have a little program which publishes messages using amqp-coffee. I need to specify contentType of messages, because on a side of consumers there is a serializer which checks MIME type of a message and it should be 'plain\text'.

So, I tried to specify contentType: 'plain\text' on a conncetion.publish function using options argument but it always becomes 'string/utf8'. After reading code I found that in publish method of Publisher class it checks type of data and rewrites options.contentType.

It would be really cool, if this method skips checking type of data in cases when contentType already specified.

Some feedback

Hello, I finally got a chance to try this out, and I've got some small feedback and a few larger issues i've found.

I'll just note them here for now, I can write up with more detail in separate issues if you want

  • Error handling: Socket errors after the initial connection are silently discarded
  • Timeouts: There is currently no built-in way to time out a connection attempt
  • Client Info: This is hard-coded, it would be good to allow callers to override these
  • AMQP URI: It would be nice to support the amqp uri format for connecting: amqp://user:pass@host:port/vhost
  • I can see why you would want to separate the Exchange object from the exchange.declare call, but as both take callbacks it ends up being a little fiddly. I would suggest that either the object creation has no side effects, or providing a convenience method for the common case of declaring an exchange
  • Exchange and queue objects do not expose their name property, which makes it a bit awkward to refer to them in other areas, eg connection.publish(exchange.name, ..)
  • Queue name is a required parameter, meaning auto generated queue names don't seem to be supported

Hope that's helpful - most of these are quite minor apart from the error handling one.

UncaughtException: TypeError: Cannot call method 'reset' of undefined

"version": "0.1.22"

One of our servers crashed recently due to a type error, perhaps caused by a reimplementation of a topology upon reconnection.

Location: https://github.com/dropbox/amqp-coffee/blob/master/src/lib/Connection.coffee#L238

Stack "uncaughtException: TypeError: Cannot call method 'reset' of undefined\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/Connection.js:349:42\n at iterate (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:146:13)\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:157:25\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:251:17\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:154:25\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:248:21\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:612:34\n at Consumer.Channel._onChannelReconnect (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/Channel.js:190:14)\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/Channel.js:107:26\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:607:21\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:246:17\n at iterate (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:146:13)\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:157:25\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:248:21\n at /srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/async/lib/async.js:612:34\n at Consumer.Channel._onChannelMethod (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/Channel.js:330:37)\n at Connection._onMethod (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/Connection.js:602:39)\n at AMQPParser. (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/Connection.js:4:61)\n at AMQPParser.emit (events.js:106:17)\n at AMQPParser.parseMethodFrame (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:111:19)\n at AMQPParser.frameEnd (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:81:16)\n at AMQPParser.frame (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:64:21)\n at AMQPParser.header (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:51:21)\n at AMQPParser.execute (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:31:33)\n at Socket. (/srv/www/inboxmessenger/releases/20150722234551/node_modules/queue-api/node_modules/amqp-coffee/bin/src/lib/Connection.js:426:29)\n at Socket.emit (events.js:95:17)"

Publishing has unexpected outcome

lib-version: 0.1.22rc4
lang: JS

I am publishing a message to user queues through a topic exchange named "inbox.events".
The user queues are bound to the exchanges with routing keys following this scheme: "user.events.user_id.#".

Here is my publishing code:

screen shot 2015-06-16 at 2 27 43 pm

This is the console output:

screen shot 2015-06-16 at 2 29 50 pm

The problem is that although I am publishing to a different routing_key based on the current iteration of the users array, one user queue receives both messages. Are there any known issues with how published messages make their way out of the client lib and through to Rabbit?

Many thanks.

API docs

Hello,

As we all know, node-amqp has plenty of shortcomings - mostly around the API quirks and general cruftiness due to age and back-compat - I'm interested in trying out alternatives like this.

However, there's no docs beyond the sample and the tests - are you guys working on some basic API docs at all?

Channels not closing

I have a problem with channels not closing. When using node-amqp I had the same issue, but resolved it with this queue option:

closeChannelOnUnsubscribe: true

Then when my socket.io client disconnected I would unsubscribe his consumerTag from the queue, and the channel would close:

socket.on('disconnect', function () {
    if (consumerTag) {
        my_queue.unsubscribe(consumerTag);
    }
});

Would there be an equivalent in amqp-coffee?

I'm trying all sorts of things like queue.channel.close(), consumer.cancel() and queue.delete(), but none seem to close the channels. I can see that they remain in rabbitmq dashboard with no consumer.

Possible memory leak at connection's close event

Hello,

After creating several consumers I'm getting the node's EventEmitter leak warning:

(node) warning: possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit.
Trace
    at Connection.EventEmitter.addListener (events.js:160:15)
    at Consumer.Channel (/home/likewise-open/CORTEX/tsouza/workspaces/nodejs/stratos/node_modules/amqp-coffee/bin/src/lib/Channel.js:61:23)
    at new Consumer (/home/likewise-open/CORTEX/tsouza/workspaces/nodejs/stratos/node_modules/amqp-coffee/bin/src/lib/Consumer.js:47:38)
    at ChannelManager.consumerChannel (/home/likewise-open/CORTEX/tsouza/workspaces/nodejs/stratos/node_modules/amqp-coffee/bin/src/lib/ChannelManager.js:90:11)
    at Connection.consume (/home/likewise-open/CORTEX/tsouza/workspaces/nodejs/stratos/node_modules/amqp-coffee/bin/src/lib/Connection.js:259:34)

As a workaround, is it safe to call connection.setMaxListeners(Infinity) or the listeners will actually leak (i.e. on reconnections) ?

Regards

AMQP channels question

We've recently gone live to production using your library to bridge RabbitMQ queues with clients who go on/off line quite frequently (think mobile). Whenever a client comes online we send a consume to the rabbitmq server and then when the client goes offline, we fire a consume.close(). During our beta stage things seemed to have gone just fine, however, recently, we've been seeing many open channels on our rabbitmq server that have no consumers. I was under the impression that a single channel would suffice for consuming for multiple queues where each queue ties to one client and one client only. Is my mental model incorrect?

Any thoughts?
Code looks somehting like this:

  var Conn = Rabbit.get();
   client.consumer = Conn.consume(/* params*/);
   ...
   client.on('disconnected', function(){
     client.consumer.close();
   });

Unspecified "error" event thrown

Hey there.

I've been using the wrapper in prod for almost a week now, and for the most part, it's been working great.

I noticed today that every few hours, an unspecified error event is thrown from Connection.js. I've included the stack trace, and screenshots from my logs. Please, let me know if there's anything else I can provide...

Stack Trace Gist: https://gist.githubusercontent.com/schonfeld/271cf723bed2385aa02d/raw/65a18c13552ad0053482383751e44ede1185d683/gistfile1.txt

Screenshot: https://www.dropbox.com/s/gm5geyrf6pyp1mg/Screenshot%202014-08-11%2012.43.29.png

Thanks,

  • Michael.

Getting an error on npm install on linux server

Here is the output from NPM install after all of the module downloads. This is from installing v0.1.10. NPM install does work on a Mac.

[email protected] install /node_modules/amqp-coffee/node_modules/bson
(node-gyp rebuild 2> builderror.log) || (exit 0)

make: Entering directory /node_modules/amqp-coffee/node_modules/bson/build' CXX(target) Release/obj.target/bson/ext/bson.o SOLINK_MODULE(target) Release/obj.target/bson.node SOLINK_MODULE(target) Release/obj.target/bson.node: Finished COPY Release/bson.node make: Leaving directory/node_modules/amqp-coffee/node_modules/bson/build'

[email protected] install /node_modules/amqp-coffee
./scripts/compile.sh

amqp-coffee Compiling coffeescript to bin/
coffee> rm: missing operand
Try rm --help' for more information. npm ERR! [email protected] install:./scripts/compile.sh`
npm ERR! Exit status 123
npm ERR!
npm ERR! Failed at the [email protected] install script.
npm ERR! This is most likely a problem with the amqp-coffee package,
npm ERR! not with npm itself.
npm ERR! Tell the author that this fails on your system:
npm ERR! ./scripts/compile.sh
npm ERR! You can get their info via:
npm ERR! npm owner ls amqp-coffee
npm ERR! There is likely additional logging output above.

npm ERR! System Linux 2.6.32-279.19.1.el6.x86_64
npm ERR! command "/.nvm/v0.10.25/bin/node" "/.nvm/v0.10.25/bin/npm" "install" "amqp-coffee"
npm ERR! cwd
npm ERR! node -v v0.10.25
npm ERR! npm -v 1.3.24
npm ERR! code ELIFECYCLE
npm ERR!
npm ERR! Additional logging details can be found in:
npm ERR! /npm-debug.log
npm ERR! not ok code 0

AMQPParser throw an error: Oversize frame 131318

I got the following error after having a lot of rejects.
It may be that putting a message in a dead-letter queue adds things to the message header, and doing this repeatedly makes the message too large.

I need to be able to handle this kind of exception, but connection 'error' event is not firing on this kind of error.

2016-11-22T18:10:34+0000[ID=1] events.js:72
2016-11-22T18:10:34+0000[ID=1] throw er; // Unhandled 'error' event
2016-11-22T18:10:34+0000[ID=1] ^
2016-11-22T18:10:34+0000[ID=1] Error: Oversize frame 131318
2016-11-22T18:10:34+0000[ID=1] at AMQPParser.error (/usr/local/logdog/card-guard-processor/node_
modules/amqp-coffee/bin/src/lib/AMQPParser.js:150:21)
2016-11-22T18:10:34+0000[ID=1] at AMQPParser.header (/usr/local/logdog/card-
guard-processor/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:46:23)
2016-11-22T18:10:34+0000[ID=1] at AMQPParser.frameEnd (/usr/local/logdog/card-guard-processor/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:95:19)
2016-11-22T18:10:34+0000[ID=1] at AMQPParser.frame (/usr/local/logdog/card-guard-processor/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:64:21)
2016-11-22T18:10:34+0000[ID=1] at AMQPParser.header (/usr/local/logdog/card-guard-processor/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:51:21)
2016-11-22T18:10:34+0000[ID=1] at AMQPParser.frameEnd (/usr/local/logdog/card-guard-processor/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:95:19)
2016-11-22T18:10:34+0000[ID=1] at AMQPParser.frame (/usr/local/logdog/card-guard-processor/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:64:21)
2016-11-22T18:10:34+0000[ID=1] at AMQPParser.header (/usr/local/logdog/card-guard-processor/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:51:21)
2016-11-22T18:10:34+0000[ID=1] at AMQPParser.execute (/usr/local/logdog/card-guard-processor/node_modules/amqp-coffee/bin/src/lib/AMQPParser.js:31:33)
2016-11-22T18:10:34+0000[ID=1] at Socket. (/usr/local/logdog/card-guard-processor/node_modules/amqp-coffee/bin/src/lib/Connection.js:497:29)

Connection continuously emits 'error' after the underlying socket died (reconnect = false)

Hi,

I'm using amqp-coffee with reconnect=false in one of my projects. During testing of some negative scenarios I faced a little bit odd behaviour. Here's a code example:

const amqp = new AMQP({host: 'localhost', reconnect: false});
amqp.consume('coffee:test:q', {}, (msg) => { /* ... */ });
amqp
  .on('error', (e) => console.log('ERROR: ' + e))
  .on('close', () => console.log('CLOSE'))

I run the program above and then manually take the local RabbitMQ server down. What I get in stdout is:

CLOSE
ERROR: Error: This socket has been ended by the other party
ERROR: Error: This socket has been ended by the other party
ERROR: Error: This socket has been ended by the other party
...

The Connection doesn't emit error only once, but continues emitting it every 10 seconds. As far as I see, it happens because of heartbeat timer which keeps triggering after the error. What I would rather expect is that once an error is emitted, the connection cancels all its internal timers.

What are your thoughts?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.