![NPM version](https://camo.githubusercontent.com/2d72c6115c2f3bf13d10402cde51d6ec68e517da40fc821e12da0d260b026e98/68747470733a2f2f62616467652e667572792e696f2f6a732f7261626269746d712d71756575652d73747265616d2e706e67)
$ npm i rabbitmq-queue-stream
var RabbitMQStream = require("rabbitmq-queue-stream");
var stream = require("stream");
var options = {
connection: {
url: "amqp://user:[email protected]"
},
queueStream: {
name: "myQueue",
prefetchCount: 100
}
};
/*
* Initialize two consumer channels to our queue.
*/
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
if(err) {
return console.error(err);
}
/*
* Each consumer channel comes with a .source and .sink property.
*
* .source is a Readable stream that gives us a stream of objects
* from the specified queue
*
* Every job written to .sink is deleted from the queue. Only object
* originating from .source should be written to .sink
*
*/
streamifiedQueues.channels.forEach(function(channel) {
var myProcessingStream = new stream.Transform({objectMode: true});
myProcessingStream._transform(function(data, enc, next) {
console.log("Doing something with", data);
this.push(data);
/*
* Messages are successfully acked and removed from the queue by default.
* RabbitMQStream provides methods to requeue and delete messages too.
*
* Requeue:
* this.push(RabbitMQStream.RequeueMessage(data));
*
* Delete:
* this.push(RabbitMQStream.DeleteMessage(data));
*/
next();
});
channel.source
.pipe(myProcessingStream)
.pipe(channel.sink);
});
/* example graceful shutdown routine */
var gracefulShutdown = function() {
//stop fetching messages
streamifiedQueues.unsubscribeConsumers(function(err) {
if(err) {
//handle error
}
//Wait some time for queues to flush out before closing consumers.
streamifiedQueues.closeConsumers(function(err) {
if(err) {
//handle error
}
streamifiedQueues.disconnect(function(err) {
if(err) {
//handle error
}
process.exit(0);
});
});
});
};
});
- parseError - Emitted when a job cannot be json parsed. Passes in malform
myQueueStream.source.on("parseError", function(err, message) {
console.error("Problem JSON parsing message", message);
});
- deleted - Emitted everytime a job is deleted from the queue
var totalDeleted = 0;
myQueueStream.source.on("deleted", function() {
console.log("Deleted", totalDeleted++);
});
- formatError - Sink received a job that does not have the necessary information to be deleted from the queue.
Most likely emitted when objects not originating from .source are written to sink.
myQueueStream.sink.on("formatError", function(err, message) {
console.error("Malformatted message written to .sink. Please check your pipeline configuration", message);
});
- Add a jsonMode to make automatic parsing of source data optional