Giter VIP home page Giter VIP logo

paced-work-stream's Introduction

PacedWorkStream

NPM Version Node Build Status Coverage Status dependencies Status

Node.js transform stream working at constant pace and concurrent for object mode

Features

  • Work time at once can be specified. (workMS option)
  • Concurrent workers can be specified. (concurrency option)
  • Fires done event after when all workers have finished asynchrous -processes
  • Counting tag system to call this.countTag(<tag>) in _workPromise, you can get summarized results tagCounts grouped by tag.
  • Node.js 6.10 or later

Targets

  • API client that needs to handle the rate-limit
  • DB client that needs to handle the read/write capacity units like AWS DynamoDB

Install

$ npm install -save paced-work-stream

How to Use

Create a PacedWorkStream

new PacedWorkStream(options, workPromise)

  • options <Object>
    • concurrency is the number of concurrent processes.
    • workMS is milliseconds of work time at once that contains process-time and wait-time.
    • delay is enable to start concurrent process in order delay for a time that divided workMS by concurrency, default is false. workPromise must return functions wrap each promise. Refer to the following figure for detailed operation pattern.
    • highWaterMark is maximum object buffer size. If you use flow mode, you should set it at least concurrency.
  • workPromise is function(item): must return a Promise processing the item or a Function that returns a Promise.

Delay Figure

Create subclass that extends PacedWorkStream

  • super(options) must be called in the constructor.
  • _workPromise method must be overrided and return a Promise processing the item or a Function that returns a Promise.
class MyWorkStream extends PacedWorkStream {
  constructor(options) {
    super(options);
  }
  _workPromise(item) {
    return () => {
      this.countTag(item.tag);
      return Promise.resolve(item.value);
    };
  }
}

Examples

const es = require('event-stream');
const devnull = require('dev-null');
const PacedWorkStream = require('paced-work-stream');

const pwStream = new PacedWorkStream({
    concurrency: 2,
    workMS: 1000,
    highWaterMark: 5
  }, function(item) {
    console.log(new Date().toISOString(), 'Begin', item);

    return new Promise((resolve, reject) => {
        setTimeout(() => {
          this.countTag('workDone');
          console.log(new Date().toISOString(), 'End', item);
          resolve();
        }, 600); // workMS contains the time.
      })
  })
  .on('done', function() {
    console.log(this.tagCounts);
  }).on('error', (err) => {
    console.error(err);
  });

const reader = es.readArray([11, 12, 21, 22, 31])
reader.pipe(pwStream).pipe(devnull({ objectMode: true }));
  • Pay attention to handling done event to get last tagCounts because workers haven't processed items on finish event.
  • If stream need not output, the stream must pipe dev-null.

Console output

$ node example.js
2016-09-11T03:17:50.000Z Begin 11
2016-09-11T03:17:50.003Z Begin 12
2016-09-11T03:17:50.605Z End 11
2016-09-11T03:17:50.605Z End 12
2016-09-11T03:17:51.009Z Begin 21
2016-09-11T03:17:51.009Z Begin 22
2016-09-11T03:17:51.606Z End 21
2016-09-11T03:17:51.606Z End 22
2016-09-11T03:17:52.004Z Begin 31
2016-09-11T03:17:52.607Z End 31
{ workDone: 5 }

Using with Promised Lifestream

Promised Lifestream is useful for stream pipeline. The following example gets the same result as above.

'use strict';

const es = require('event-stream');
const PromisedLife = require('promised-lifestream');

const PacedWorkStream = require('paced-work-stream');

const pacedWorker = new PacedWorkStream({
    concurrency: 2,
    workMS: 1000,
    highWaterMark: 5
  }, function(item) {
    console.log(new Date().toISOString(), 'Begin', item);

    return new Promise((resolve, reject) => {
        setTimeout(() => {
          this.countTag('workDone');
          console.log(new Date().toISOString(), 'End', item);
          resolve();
        }, 600); // workMS contains the time.
      })
  })

PromisedLife([
  es.readArray([11, 12, 21, 22, 31]),
  pacedWorker
])
.then(() => {
  console.log(pacedWorker.tagCounts);
})
.catch(err => {
  console.error(err);
});

License

MIT

paced-work-stream's People

Contributors

tilfin avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar

paced-work-stream's Issues

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.