Giter VIP home page Giter VIP logo

rx-flowable's Introduction

build npm stability-wip

rx-flowable

observables with lossless back-pressure

RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.

rx-flowable makes it possible to back-pressure observables in RxJS 7. This is useful when an observable produces values faster than a subscriber can consume them.

Please consider that a number of solutions for lossless back-pressured streaming of values already exist for the Javascript platform, as listed in the bibliography below. This module is designed for projects that already make extensive use of RxJS, and prefer not to adopt another library, with the associated knowledge overhead.

⚠️ Use of this library requires a solid understanding of RxJS, especially when authoring observable pipelines. This is because native RxJS operators are not designed for back-pressure and may behave unexpectedly if used naively.

Feedback and contributions welcome!

principle

The basic building block of rx-flowable is an Observable of Bites, called a Consumable. A bite is an object containing:

  • a value
  • a function, next(), which invites the source to produce the next value

Therefore, a consumable is simply an observable which does not produce values until it is invited to. In many analyses this is called a "pull" model – we "pull" values from the stream source one at a time; often contrasted with the native "push" model of RxJS in which the source produces values as fast as it can.

Here is an example of subscribing to a consumable:

function consume(values: Consumable<number>) {
  // We use the Observable.subscribe method as normal
  values.subscribe(async nextBite => {
    // Each value supplied to the subscriber is a bite
    try {
      // Hard work takes time, but the consumable will wait for us...
      await hardWork(nextBite.value);
    } finally {
      // ... until we ask for the next bite
      nextBite.next();
      // This subscriber will be called again if there are any more values
    }
  });
}

(Note that Observable.subscribe is already a "pull"-style method. Many observables will not produce anything until subscribed to, irrespective of their subsequent speed. rx-flowable affords this possibility at a granular level.)

The principle of consuming bites by calling next is simple and powerful, but has an important downside. That is, if next is not called, the next value will not be produced, and the source may hang on to some underlying resource indefinitely. This means that consumables are generally more prone to resource leaks than observables. In the example above, we take care to wrap the hard work in a try-finally block to mitigate this – assuming for simplicity that any error is not catastrophic and we can continue processing.

Once subscribed, consumables are "hot". If multiple subscribers are attached to a consumable, late subscribers will only receive values that prior subscribers have invited from the source. Further, the consumable will produce values at the pace of the slowest subscriber. That is, a value is not produced until every subscriber has invited it.

In case we want to stop processing, instead of calling next it is possible to unsubscribe from the consumable. Once the last subscriber has unsubscribed, the consumable is able to release its held resources.

flowable

If the speed of the subscriber is unknown at design time, for example if the streaming is part of a library interface, then a consumable can be wrapped as a Flowable. A flowable is also an observable, directly of values (so that it is straightforward to use in pipelining), but with a consume property to re-enter the back-pressured world:

// Database library code:
function readFromDatabase(query) {
  const stream = this.db.readStream(query);
  const consumable = consume(stream); // See sources, below
  return flowable(consumable);
}
// ... client 1:
{
  const cursor = readFromDatabase('SELECT * FROM DATA');
  // Using the flowable directly as an observable without calling next()
  cursor.pipe(filter(isPrintable)).subscribe(console.log);
}
// ... client 2:
{
  const cursor = readFromDatabase('SELECT * FROM DATA');
  // OR using it as a consumable to back-pressure from an expensive downstream
  cursor.consume.subscribe(({ value, next }) =>
    transformAndLoad(value).finally(next));
}

A subscriber via Observable.subscribe() always receives all data, but it may be delayed by any subscribed consumers.

sources

This library provides implementations of Consumable for the following common sources of values:

  • Javascript Iterables
  • Promises
  • Readables (such as NodeJS Readables)
  • Other observables (non-consumable observables will buffer values)

These can be constructed using the consume function in the consume module.

operators

Consumables can be pipelined using RxJS operators. These native operators will see bites instead of raw values. Care must be taken to ensure that next is called correctly for every input bite, if the pipeline is to complete successfully. Since this can sometimes require non-obvious but actually boilerplate code, this library provides specialised operators which can be used in place of native ones to correctly handle calling of next in common situations.

  • flatMap is a specialisation of concatMap
  • ignoreIf is like an inverse of filter
  • batch is a specialisation of bufferCount
  • Please suggest or Pull Request!

biblio

background

alternatives

rx-flowable's People

Contributors

gsvarovsky avatar

Stargazers

Tony Xiao avatar Eugene Lyulka avatar Adil avatar Toni Villena avatar Kirill Groshkov avatar

Watchers

James Cloos avatar  avatar

rx-flowable's Issues

Discussion: allow flowable to process multiple values simultaneously

Hi there,

I just stumbled upon your trail of thoughts regarding rxjs and backpressure:

I built some kind of ETL using RxJS and I wanted to process "as much values as possible at the same time".

At the moment I have a pipeline (Observable) taking work units as input, doing lots of complex stuff and yielding success/failure status as output.
Obviously, I find myself in a position where I can't ask the pipeline to process all work units at once, so I need to do some kind of backpressure handling.

My idea would be to use a threshold on process.memoryUsage() and process.cpuUsage() to know if the server can handle more work, and in this case, send more work units in the observable without waiting for the current work units to be done.

I'm not sure that would fit the scope of this library, but if it does, I would be happy to contribute.

Thank you for you work <3

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.