Giter VIP home page Giter VIP logo

through2-concurrent's Introduction

through2-concurrent

NPM

A simple way to create a Node.JS Transform stream which processes in parallel. You can limit the concurrency (default is 16) and order is not preserved (so chunks/objects can end up in a different order to the order they started in if the transform functions take different amounts of time).

Built using through2 and has the same API with the addition of a maxConcurrency option.

Non-objectMode streams are supported for completeness but I'm not sure they'd be useful for anything.

Written by Thomas Parslow (almostobsolete.net and tomparslow.co.uk) as part of Active Inbox (activeinboxhq.com).

Build Status

Install

npm install --save through2-concurrent

Examples

Process lines from a CSV in parallel. The order the results end up in the all variable is not deterministic.

var through2Concurrent = require('through2-concurrent');

var all = [];

fs.createReadStream('data.csv')
  .pipe(csv2())
  .pipe(through2Concurrent.obj(
    {maxConcurrency: 10},
    function (chunk, enc, callback) {
      var self = this;
      someThingAsync(chunk, function (newChunk) {
        self.push(newChunk);
        callback();
      });
  }))
  .on('data', function (data) {
    all.push(data)
  })
  .on('end', function () {
    doSomethingSpecial(all)
  })

Contributing

Fixed or improved stuff? Great! Send me a pull request through GitHub or get in touch on Twitter @almostobsolete or email at [email protected]

through2-concurrent's People

Contributors

almost avatar mdlavin avatar millette avatar shinnn avatar tapppi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

through2-concurrent's Issues

Event loop might block if callback not always async

I ran into a pretty insidious bug that has taken me days to unravel, and I wanted to share it for anyone else who runs into the same problem.

The crux of the issue is that through2-concurrent requires that it's callback function (the one you the user provides) always be asynchronous. Clearly this is not strictly required by the library, but If that callback is not always asynchronous, it's possible for the event loop to block.

Let me share my code:

  return throughConcurrent.obj({ maxConcurrency: concurrency }, async function(
    domain,
    enc,
    done
  ) {
    try {
      const url = await frontierList.getNextUrlForDomain(domain);
      if (url) {
        this.push(url);
      }
    } catch (err) {
      console.error(err)
    }
    done();
  });
};

What happens is that when the done() function is invoked inside my callback, my callback is synchronously invoked with a domain. This is all fine, except If that domain also causes frontierList.getNextUrlForDomain(domain) to return immediately without performing an async operation, invoking done causes my callback is once again be invoked synchronously. What happens, then, is that sometimes I get into a pattern where maybe some percentage of async callbacks never finish because every subsequent value causes frontierList.getNextUrlForDomain(domain) to return immediately without performing an async operation. The event loop is quickly blocked, and my async operations never finish.

A workaround for this was pretty simple. I was already using a wrapping around through-concurrent, but I will only share the relevant parts here.

function createThroughConcurrent(concurrency, callback) {
  const stream = throughConcurrent.obj({ maxConcurrency: concurrency }, function(
    domain,
    enc,
    done
  ) {
    setImmediate(() => {
      callback.call(this, domain, enc, done);
    });
  });
  return stream;
};

The idea is that we want to clear the event loop no matter what before invoking our callback. This code accomplishes that.

maxConcurrency variable declared globally

If one is using through2-concurrent across multiple modules at the same time in a node application, which each module defining its own level of maximum concurrency, the last one in wins. That is, all the instances will use that level of max concurrency. This is because the maxConcurrency variable is defined without the 'var' keyword causing it to be defined globally.

Support maintaining order of chunks

As an option, not appropriate for a lot of use cases.

It would need to have a limit on the output buffer (the list of chunks that have arrived out of order and are waiting for the the one in front of them to be ready) otherwise it would be easy for it to use up all available memory if a particular chunk took a long time to process. This probably wouldn't be the same as the maxConcurrency option, this should continue to mean how many things can be being processed at a given time.

Add Git tags

Hi there, great project!

Would it be possible to add git tags? This would automatically create GitHub releases. GitHub releases makes it easy to follow new releases with a RSS reader.

Projects like release-it automate this process.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.