Giter VIP home page Giter VIP logo

node-hole's Introduction

Hole Build Status

Async friendly, stream-based task consuming utility for Node.js.

Concept

After years, it has been more important to write less-state code for me and naturally it has become more data-driven/functional style. But that and async programming are not always a good match. On one hand when you process too many async tasks at once, await Promise.all(promises100000) for example, you'd end up with error messages such as process out of memory, socket hang up, ETIMEDOUT or ECONNRESET. On the other hand, it's not efficient at all when you process whole data one by one in sequence. Reactive Extensions might be a solution though, I didn't want to tune timer functions for the problem; all I want is just to set limit number of parallel execution and finish a task at the best speed.

Then Node Stream object mode with beautiful backpressuring mechanism appears. Object mode lets you flow JavaScript object in a stream with highWaterMark option, which decides limit of number of buffering objects. By the native backpressure implementation, a busy writable stream halts data flowing and takes time before requesting another to upstream. And thanks to parallel-stream, a transform branches out to consume buffer parallely as it keeps order of data.

Node Hole offers a fun, easy and efficient way of parallel data consuming by wrapping solid Node Stream implementation with async/promise friendly API.

Usage

To install hole in your project, run:

$ npm install -S hole

... then utilize it like below. Suppose there is a blog post API where GET /posts/:id to get post detail JSON and GET /posts?page=1 for list of post summaries. Let's say we want to store post details of the latest 3 pages into local DB.

await fromArray([1, 2, 3])                      // Start with pages array
    .pipe(page => getPosts(page))               // (parallel async) Get list of posts for a page
    .split()                                    // Split the array into pieces
    .pipe(post => getPostDetail(post.id), 3)    // (parallel async) Get a detail of post, with maximum parallel request of 3
    .pipe(detail => saveToLocalDB(detail))      // (parallel async) Save the detail to local DB
    .catch(err => console.log(err));            // On any error occurs in the middle, stop the whole process and catch the error in here
console.log('All done :)');

API

All methods are typed by TypeScript.

hole(object)

Typed as hole<T>(object: T): Hole<T>.

A function to start stream with a single JavaScript object of any kind.

Example:

import hole from 'hole';
//...
await hole(998)
    .pipe(n => n + 1)
    .pipe(n => n + 1)
    .pipe(console.log)  // 1000

fromArray(array)

Typed as fromArray<T>(Array<T>): Hole<T>.

A function to start stream with fixed multiple objects with an array.

Example:

import {fromArray} from 'hole';
...
await fromArray([1, 2, 3, 4, 5])
    .pipe(n => n * 10)
    .pipe(console.log); // 10
                        // 20
                        // 30
                        // 40
                        // 50

fromStream(readable)

A function to start stream with an native Node readable stream.

Example:

import {fromStream} from 'hole';
import fs from 'fs';
import csv2 from 'csv2';
...
  const nameColumnIndex = 3;
  await fromStream(fs.createReadableStream('./data.csv'))
      .pipe(csv2())
      .pipe(record => record[nameColumnIndex])
      .pipe(console.log); // James
                          // John
                          // Robert
                          // Michael
                          // ...

.pipe(processor, opts)

Typed as pipe<U>(T => U, PipeOption?): Hole<U>.

When the processor is a synchronous function, it simply gets a value from the previous and passes processed value to the next.

If it returns null or undefined, that means it filters out the data that is not used any more.

Example:

await fromArray([1, 2, 3, 4, 5])
  .pipe(n => {
    if (n > 2) return n;
  })
  .pipe(console.log); // 3
                      // 4
                      // 5

Note that a processor function will be called with a transform's this context: you also can use .push(data) as usual in transform() function.

Example:

await hole(5)
    .pipe(function decrementAndPush(n) {
      if (n <= 0) return;
      this.push(n);
      decrementAndPush.call(this, n - 1);
    })
    .pipe(console.log); // 5
                        // 4
                        // 3
                        // 2
                        // 1

When the processor returns a promise, it's similar but resolved value will be passed to the next.

pipe<U>(T => Promise<U>, PipeOption): Hole<U>

Example:

await fromArray([1, 2, 3, 4])
  .pipe(async page => {
    const posts = await getPosts(page);
    return posts.filter(post => post.author !== 'anonimous');
  }, 2) // Limit maxParallel to 2
  .split()
  .pipe(post => console.log(post.title))  // Lorem ipsum ...
                                          // Ut enim ad...
                                          // Duis aute irure...
                                          // Excepteur sint...

.pipe(transformer, opts)

Typed as pipe<U>(Writable, PipeOption): Hole<U>.

Also .pipe() accepts Node native Transformer where you can utilize csv2 etc.

Example

.split()

Typed as split(): Hole<$ElementType<T, number>>.

Previous value should be array. It splits the array so the next process can handle each piece of it.

Example

.concat(size)

Typed as concat(number): Hole<Array<T>>.

It concatenates sequential data to be specified size of array. This is useful when you post array data at once in the way that Elasticsearch Bulk API does.

Example:

await fromArray([1, 2, 3, 4, 5])
    .concat(2)
    .pipe(console.log); // [ 1, 2 ]
                        // [ 3, 4 ]
                        // [ 5 ]

.collect()

collect(): Promise<Array<T>>

It collects all data that the previous process returns and gives you an array. Note that, when number of data is enormous, the collected array may oppresse room of your memory.

Example:

const results = await fromArray([1, 2, 3, 4, 5])
    .pipe(n => n * 10)
    .collect();
console.log(results); // [10, 20, 30, 40, 50]

Hole

The core class that this package exports.

Hole extends LazyPromise that starts streaming when .then() or .catch() is called. Other extended functions are listed below.

PipeOption

type PipeOption =
  | {
      maxParallel: number,
      highWaterMark: number,
    }
  | number;

If it's a number, it is treated as a maxParallel value.

Option property Default value Valid when first argument of .pipe() is Meaning
maxParallel 5 Async function Maximum number of parallel execution of process
highWaterMark 16 Transformer, Sync and async function Maximum number of buffer that will be consumed by processor. Read more

License

MIT

node-hole's People

Contributors

piglovesyou avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

node-hole's Issues

feature: Typing arguments of each process

Currently:

hole(any)
    .pipe(any => any)
    .pipe(any => any)
    .pipe(any => any)

But for users it's better when we could do:

hole(T)
    .pipe(T => U)
    .pipe(U => V)
    .pipe(V => W)

A problem is, Hole accepts null/undefined to filters out data, but argument of the next process is always not null nor undefined. How can we express that.

hole('a')
    .pipe(s => {
        if (s === 'a') return s;
        return null;
    })
    .pipe(s => {
        // s is always a string, not null
    })

Implement .splitReverse()

When we do

const size = await getUsersPageSize();
const pages = [...new Array(size)].map((_, i) => i + 1);
await holeWithArray(pages)
    .pipe(page => getUsers(page))
    .split()
    .pipe(user => removeUser(user.id))

, since user will be removed from top of the list, the page will not work in latter.

We'll be happy when we do

const size = await getUsersPageSize();
const pages = [...new Array(size)].map((_, i) => i + 1);
await holeWithArray(pages.reverse())
    .pipe(page => getUsers(page))
    .splitReverse()
    .pipe(user => removeUser(user.id))

, since the users will be removed from bottom of the list.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.