Giter VIP home page Giter VIP logo

prozess's Introduction

Prozess

Build Status Coverage Status

Prozess is a Kafka library for node.js

Kafka is a persistent, efficient, distributed publish/subscribe messaging system.

There are two low-level clients: The Producer and the Consumer:

##Producer example:

var Producer = require('Prozess').Producer;

var producer = new Producer('social', {host : 'localhost'});
producer.connect();
console.log("producing for ", producer.topic);
producer.on('error', function(err){
  console.log("some general error occurred: ", err);  
});
producer.on('brokerReconnectError', function(err){
  console.log("could not reconnect: ", err);  
  console.log("will retry on next send()");  
});

setInterval(function(){
  var message = { "thisisa" :  "test " + new Date()};
  producer.send(JSON.stringify(message), function(err){
    if (err){
      console.log("send error: ", err);
    } else {
      console.log("message sent");
    }
  });
}, 1000);

##Consumer example:

var Consumer = require('Prozess').Consumer;

var options = {host : 'localhost', topic : 'social', partition : 0, offset : 0};
var consumer = new Consumer(options);
consumer.connect(function(err){
  if (err) {  throw err; }
  console.log("connected!!");
  setInterval(function(){
    console.log("===================================================================");
    console.log(new Date());
    console.log("consuming: " + consumer.topic);
    consumer.consume(function(err, messages){
      console.log(err, messages);
    });
  }, 7000);
});

A Consumer can be constructed with the following options (default values as shown below):

var options = {
  topic: 'test',
  partition: 0,
  host: 'localhost',
  port: 9092,
  offset: null, // Number, String or BigNum
  maxMessageSize: Consumer.MAX_MESSAGE_SIZE,
  polling: Consumer.DEFAULT_POLLING_INTERVAL
};

##Documentation

var producer = new Producer(options)

type Message := String | Buffer

Producer := ({
  topic: String,
  partition?: Number,
  host?: String,
  port?: Number,
  connectionCache?: Boolean
}) => EventEmitter & {
  connect: () => void,
  send: (Array<Message> | Message, opts?: {
    partition?: Number,
    topic?: String
  }, cb: Callback<>) => void
}

To create a producer you must call Producer() with various options. The only required argument is the topic your producing to.

var Producer = require('prozess').Producer

var producer = new Producer({ topic: 'foos' })

options.topic

options.topic must be a String and is the topic in kafka that you will producer to

options.partition

options.partition is an optional Number and defaults to 0. You can specify this if you want to change which partition you publish to.

options.host and options.port

options.host determines the host location of the kafka node you are connecting to and options.port determines the port.

These default to "localhost" and 9092 which are the default kafka ports.

options.connectionCache

options.connectionCache is a Boolean you can set to opt in into connection caching. By default prozess will create one TCP connection per topic.

If you set options.connectionCache to true then prozess will use one TCP connection per host & port.

producer.connect(Callback)

You can call producer.connect(cb) to open your connection to kafka, the cb you pass in will be called once the connection is open.

You must call .connect() before calling .send()

producer.send(Message, opts?: Object, Callback)

To produce messages to kafka you should call .send() with either an array of String's or a single String.

You can pass .send() an optional options argument to customize the topic & partition for this single send() request.

You must also supply a Callback to handle any asynchronous errors.

##Installation:

 npm install prozess

##Checkout the code and run the tests:

 $ git clone https://github.com/cainus/Prozess.git
 $ cd Prozess ; make test-cov && open coverage.html

##Kafka Compatability matrix:

Kakfa 0.8.0 ReleaseNot Supported
Kafka 0.7.2 ReleaseSupported
Kafka 0.7.1 ReleaseSupported
Kafka 0.7.0 ReleaseSupported
kafka-0.6Not Supported
kafka-0.05Not Supported

Versions taken from http://incubator.apache.org/kafka/downloads.html

prozess's People

Contributors

cainus avatar ctavan avatar elee avatar iproctor avatar jwolski avatar mithunsatheesh avatar raynos avatar vincentp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prozess's Issues

Consumer.consume with options

like #41 for the producer, could there be optional parameters on consume for: partition, topic, offset and maxmessage size?

  • makes consumer behave like producer and better resembles the apache kafka low level client
  • allows for single connection per broker with easy consumption

Example code fails to start.

I am evaluating node.js kafka libraries. While running the Prozess example code (both producer and consumer are affected) I get the following error:
module.js:485
process.dlopen(filename, module.exports);
^
Error: dlopen(/Users/bore/projects/node-playground/node_modules/Prozess/node_modules/buffermaker/node_modules/bignum/build/Release/bignum.node, 1): no suitable image found. Did find:
/Users/bore/projects/node-playground/node_modules/Prozess/node_modules/buffermaker/node_modules/bignum/build/Release/bignum.node: mach-o, but wrong architecture
at Object.Module._extensions..node (module.js:485:11)
at Module.load (module.js:356:32)
at Function.Module._load (module.js:312:12)
at Module.require (module.js:362:17)
at new require (module.js:378:17)
at Object. (/Users/bore/projects/node-playground/node_modules/Prozess/node_modules/buffermaker/node_modules/bignum/index.js:6:14)
at Module._compile (module.js:449:26)
at Object.Module._extensions..js (module.js:467:10)
at Module.load (module.js:356:32)
at Function.Module._load (module.js:312:12)

From reading the stack trace it seems it might be an issue with an underlying library (bignum), but I am too fresh to node to be sure.

I am running node.js v0.8.20 on a mac.

Thanks

Support for 0.8

Do you guys have an release plan for 0.8? I could not find any information on possible support of the new protocol.

Regards

socket error on connect

When using the below program, I am getting error. Though I checked my kafka is running on the mentioned host:port.

var Consumer = require('prozess').Consumer;

var options = {host : 'impetus-1129', port: '9093', topic : 'ajjain', partition : 0, offset : 0};
var consumer = new Consumer(options);
consumer.connect(function(err){
if (err) { throw err; }
console.log("connected!!");
setInterval(function(){
console.log("===================================================================");
console.log(new Date());
console.log("consuming: " + consumer.topic);
consumer.consume(function(err, messages){
console.log(err, messages);
});
}, 10);
});

Error: This socket has been ended by the other party
at Socket.writeAfterFIN as write
at Consumer.getOffsets (/home/nslabs/bam/node_modules/prozess/lib/Consumer.js:190:15)
at Consumer.consume (/home/nslabs/bam/node_modules/prozess/lib/Consumer.js:149:12)
at null. (/home/nslabs/bam/consumer.js:12:14)
at wrapper as _onTimeout
at Timer.listOnTimeout as ontimeout

Bignum 0.6.2

Would it be possible to bump up the versions for...

bignum --> 0.6.2
buffermaker --> 1.0.0 (just updated with the latest bignum too)

Many thanks in advance

unable to install prozess module

i am using kafka_2.11-0.10.2.0. the server is working properly.as a producer i am sending messages and as a consumer retrieving messages also.but for the ui integration i am using express frame work to connect kafka by using Proozess module.

ECONNREFUSED from Prozess

I'm getting the following error when trying to connect to Kafka via Prozess.

Error: connect ECONNREFUSED
at errnoException (net.js:670:11)
at Object.afterConnect as oncomplete

I know Kafka is listening on 9092 since I can telnet to that port and establish a connection. Kafka samples work.

I'm using Kafka 0.7.2 and nodejs v0.6.19

My code is straight from the example on README.md. Any ideas?

var Consumer = require('prozess').Consumer;

var options = {
host : 'localhost',
port : 9092,
topic : 'test',
partition : 0,
offset : 0
};

var consumer = new Consumer(options);

consumer.connect(function(err) {
if (err) { throw err; }
console.log("connected!!");
setInterval(function(){
console.log("=======================");
console.log(new Date());
console.log("consuming: " + consumer.topic);
consumer.consume(function(err, messages){
console.log(err, messages);
});
}, 7000);
});

Update bignum package

NPM install fails on Node.js 0.10.x with node-waf: command not found message, when trying install bignum dependency.

'This socket has been ended by the other party' error is not handled

I had an issue of Prozess not reconnecting if I take Kafka down. Debugging through the code it seems like 'This socket has been ended by the other party' error I was getting is not handled in Producer.js:79. and callback is being called without an attempt to "_reconnect".

Am I missing anything?

Thanks,
Vladimir

production concerns

Excellent initiative to create this module.

I have some concerns regarding using this module in production. Therefore I hope you could answer these simple questions.

  1. could you list the Zookeeper and Kafka versions for which this module is developed? Do you intend to update Prozess when new kafka versions are released?

  2. do you reckon this module is production ready, or will be within a number of months? are you using it in production, or is anyone else?

  3. do you plan to continue development of this module in the future?

Thanks a lot.

Fails to compile in node 0.12.0

I get this on mac os 10.10.2 when running npm install prozess:

                                   v8::internal::Arguments
/Users/bdonovan/.node-gyp/0.12.0/deps/v8/include/v8.h:127:7: note: 'v8::internal::Arguments' declared here
class Arguments;
      ^
../bignum.cc:206:36: error: unknown type name 'Arguments'; did you mean 'v8::internal::Arguments'?
  static Handle<Value> Upowm(const Arguments& args);
                                   ^~~~~~~~~

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.