Giter VIP home page Giter VIP logo

node-rabbit-wrap's Introduction

rabbit-wrapper

Build Status

This is a wrapper for amqp.node with support for node-amqp API options.

See https://github.com/postwait/node-amqp for details.
See also http://www.squaremobius.net/amqp.node/doc/channel_api.html

Installation

To install in your project, run the following command:

npm install --save git://github.com/tagular/node-rabbit-wrap.git

Usage

var rabbit = require('rabbit-wrapper');

var connectOpts = {};
var connection = rabbit('amqp://localhost:5672', connectOpts).connect();

//publish a message
connection.exchange('this.is.my.exchange', {type: 'direct', autoDelete: true})
.send('this.is.my.routing.key', {my: 'message', goes: 'here'})

//consume messages
connection.exchange('this.is.my.exchange', {type: 'direct', autoDelete: true})
.queue('this.is.a.queue')
//bind queue to routing key
.bindQueue('this.is.my.exchange', 'this.is.my.routing.key')
.listen({ack: true}, function (msg, ack, headers, fields) {
	do.some.stuff.with.my.message(msg);
	
	//want to acknowledge the message? just call `ack`
	ack();
	
	//rejecting the message? just pass false to `ack`
	ack(false);
	
	//want to reject and requeue? pass true as the second param to `ack`
	ack(false, true);
})

API

rabbit(uri, options), new Connection(uri, options)

You must pass this an amqp URI, documentation for which you is available on the rabbitmq website. Your URIs will look like, e.g. amqp://my.rabbit.server.com:5672.

options

  • heartbeat (default 0) – interval of the connection heartbeat in seconds
  • noDelay (default false) - if true, turns on TCP_NODELAY (i.e. Nagle's algorithm) for the underlying socket
  • reconnect (default true) – if true, the wrapper will attempt to reconnect to rabbit on connection failures (i.e. connection error events)

For more options, see the amqp.node api docs.

Connection.connect(cb)

Opens the connection and calls the callback provided, if any.

Connection.exchange(name, options[, cb])

Declares a new exchange. Calls your cb when the operation is complete (or has failed!)

Returns a new Exchange object.

options

  • type (required) [String] – sets your exchange type (i.e. direct, topic, fanout, or head)ers)
  • confirm (default false) [Boolean] – whether to open the exchange on a confirming channel, which will cause rabbit to confirm publishes
  • durable (default true) [Boolean] – whether the exchange will survive a rabbit restart
  • autoDelete (default false) [Boolean]) – whether rabbit will destroy the exchange after its number of bindings reaches zero

For more options, see the amqp.node api docs.

Connection.queue(name, options[, cb])

Declares a queue.

Returns a new Queue object.

options

  • exclusive (default false) [Boolean] – if true, makes the queue available only to the connection that created it
  • durable (default true) [Boolean] – whether the queue will survive a rabbit restart
  • autoDelete (default false) [Boolean]) – whether rabbit will destroy the queue after its number of consumers reaches zero
  • arguments [Object] – additional arguments (e.g. x-expired) that are specific to rabbitmq

Queue.bindQueue(exchange, binding[, cb])

Binds a queue to the given exchange, with the routing key specified in the binding param

Queue.listen([options, ] listener[, cb])

Adds a consumer for the queue. The listener argument is the function that will be called on new messages. The cb param is an optional callback that will run after the underlying consume operation has completed.

options

  • ack [Boolean] – Sets whether your listener must ack or nack message
  • consumerTag [String] – Sets a custom consumer tag, which can be helpful for identifying consumers in rabbitMQ's management API
  • priority [Number`* – Higher priority consumers will receive messages before lower priority consumers
  • arguments [Object – Arbitrary arguments (see rabbitmq documentation for any ])

listener

This parameter must be a function, and it will be given four parameters.

queue.listen({ack: true}, function (msg, ack, headers, fields) {
	/* do stuff! */
})
  • msg [Object, Buffer] – If the content type of the message is application/json, this will be the decoded JSON message as whatever type it should be. If the content type is anything else, this will be a Buffer that you can use for whatever purpose necessary
  • ack [Function]
    • ack() – acknowledges message
    • ack(false) – rejects message
    • ack(false, true) – rejects and requeues message
  • headers [Object] – Message headers
  • fields [Object] – Message fields, which have protocol info like the deliveryTag, exchange, and routing key

For more options, see the amqp.node api docs.

test

$ make test

node-rabbit-wrap's People

Contributors

colinmutter avatar hopkinsth avatar james-huston avatar rfink avatar rv-jhuston avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Forkers

ghas-results

node-rabbit-wrap's Issues

Disk WAL?

Maybe add support for an optional write-ahead log in case of catastrophic failures? Not sure.

Make the API less dumb

Opening an issue for (eventually) making some API changes, like, for listeners...

queue.listen({ack: true}, function (msg, ack) {
    doStuff(msg);

    this.headers; //message headers
    this.info; //other delivery information

    //want to acknowledge the message? just call `ack`
    ack();

    //rejecting the message? just pass false to `ack`
    ack(false);

    //want to reject and requeue? pass true as the second param to `ack`
    ack(false, true);
})

Remove temporary queues from connection fixture

Temporary queues would be redeclared on an error in the setup we have now. That is, we need to make sure that temporary queues are removed from the channel's Fixture object when something calls ignore or destroy on a queue.

...which means that Fixtures need to have a way of removing calls quickly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.