Giter VIP home page Giter VIP logo

rabbitmq-queue-stream's Introduction

rabbitmq-queue-stream

Build Status codecov.io NPM version

Tests

$ make test

Usage

$ npm i rabbitmq-queue-stream
var RabbitMQStream = require("rabbitmq-queue-stream");
var stream         = require("stream");

var options = {
  connection: {
    url: "amqp://user:[email protected]"
  },
  queueStream: {
    name: "myQueue",
    prefetchCount: 100
  }
};

/*
 * Initialize two consumer channels to our queue.
*/
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
  if(err) {
    return console.error(err);
  }

  /*
   * Each consumer channel comes with a .source and .sink property.
   * 
   * .source is a Readable stream that gives us a stream of objects
   * from the specified queue
   *
   * Every job written to .sink is deleted from the queue. Only object
   * originating from .source should be written to .sink
   *
  */

  streamifiedQueues.channels.forEach(function(channel) {

    var myProcessingStream = new stream.Transform({objectMode: true});
    myProcessingStream._transform(function(data, enc, next) {
      console.log("Doing something with", data);
      this.push(data);
      /*
       * Messages are successfully acked and removed from the queue by default.
       * RabbitMQStream provides methods to requeue and delete messages too.
       *
       * Requeue:
       *     this.push(RabbitMQStream.RequeueMessage(data));
       *
       * Delete:
       *     this.push(RabbitMQStream.DeleteMessage(data));
      */
      next();
    });

    channel.source
      .pipe(myProcessingStream)
      .pipe(channel.sink);
  });

  /* example graceful shutdown routine */
  var gracefulShutdown = function() {
    //stop fetching messages
    streamifiedQueues.unsubscribeConsumers(function(err) {
      if(err) {
        //handle error
      }
      //Wait some time for queues to flush out before closing consumers.
      streamifiedQueues.closeConsumers(function(err) {
        if(err) {
          //handle error
        }
        streamifiedQueues.disconnect(function(err) {
          if(err) {
            //handle error
          }
          process.exit(0);
        });
      });
    });
  };
});

Emitted Events

.source

  • parseError - Emitted when a job cannot be json parsed. Passes in malform
myQueueStream.source.on("parseError", function(err, message) {
  console.error("Problem JSON parsing message", message);
});

.sink

  • deleted - Emitted everytime a job is deleted from the queue
var totalDeleted = 0;
myQueueStream.source.on("deleted", function() {
  console.log("Deleted", totalDeleted++);
});
  • formatError - Sink received a job that does not have the necessary information to be deleted from the queue.
    Most likely emitted when objects not originating from .source are written to sink.
myQueueStream.sink.on("formatError", function(err, message) {
  console.error("Malformatted message written to .sink. Please check your pipeline configuration", message);
});

TODO

  • Add a jsonMode to make automatic parsing of source data optional

rabbitmq-queue-stream's People

Contributors

peterkhayes avatar bttmly avatar

Watchers

James Cloos avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.