Giter VIP home page Giter VIP logo

Comments (7)

trxcllnt avatar trxcllnt commented on May 25, 2024

You're right that flat() needs to propagate the concurrent parameter down in its recursive call. But I believe the underlying behavior you're seeing is working as intended.

Here's a more complete example to illustrate:

const { from, interval } = require('ix/asynciterable/index');
const { map, flat, take } = require('ix/asynciterable/operators/index');

(async () => {
  const asynciterable = from([
    interval(100).pipe(take(3)),
    interval(100).pipe(take(3)),
    interval(100).pipe(take(3)),
  ]);
  const result = asynciterable.pipe(
    map((_, i) => {
      console.log('outer:', i);
      return _.pipe(map((y) => `${i}-${y}`));
    }),
    flat(1, 1),
  );

  for await (const x of result) {
    console.log('inner:', x);
  }
})();

// outer: 0
// outer: 1
// outer: 2
// inner: 0-0
// inner: 0-1
// inner: 0-2
// inner: 1-0
// inner: 1-1
// inner: 1-2
// inner: 2-0
// inner: 2-1
// inner: 2-2

As we can see, the inner values are enumerated sequentially.

The outer async iterables are pulled immediately because we must use Promise.race() (here) to enumerate both the outer and concurrent inner sequence(s) simultaneously.

Without this, the FlattenConcurrentAsyncIterable wouldn't be notified that the outer sequence has yielded a new sequence that potentially needs to be flattened concurrently with the existing inner sequences. The outer sequence does yield new inner sequences immediately, but flat doesn't exhaust the inner sequence until the previous inner sequence has completed.

This may not be desirable in the flat(1, 1) case, so we've kept the previous behavior alive in concatMap(), which you can use to implement flattenSequential like this:

const { isAsyncIterable } = require('./targets/ix/util/isiterable');
const { from, interval } = require('./targets/ix/asynciterable/index');
const { map, flat, take, concatMap } = require('./targets/ix/asynciterable/operators/index');

(async () => {
  const asynciterable = from([
    interval(100).pipe(take(3)),
    interval(100).pipe(take(3)),
    interval(100).pipe(take(3)),
  ]);
  const result = asynciterable.pipe(
    map((_, i) => {
      console.log('outer:', i);
      return _.pipe(map((y) => `${i}-${y}`));
    }),
    flattenSequential(1),
  );

  for await (const x of result) {
    console.log('inner:', x);
  }
})();

function flattenSequential(depth = -1) {
  depth = (depth < 0 ? Infinity : depth);
  return function flattenOperatorFunction(source) {
    return concatMap((item) => {
      if (isAsyncIterable(item)) {
        return depth > 0 ? flat(depth - 1)(item) : item;
      }
      return [item];
    })(source);
  };
}

// outer: 0
// inner: 0-0
// inner: 0-1
// inner: 0-2
// outer: 1
// inner: 1-0
// inner: 1-1
// inner: 1-2
// outer: 2
// inner: 2-0
// inner: 2-1
// inner: 2-2

from ixjs.

the-spyke avatar the-spyke commented on May 25, 2024

@trxcllnt Thank you for the explanation. I'm trying to use lazy pipelines for data streaming to keep memory footprint and CPU profile low. My current use case:

from(
  dbCursor({ pageSize })
).pipe(
  map(async (page) => join(page, await apiCall(page))),
  flat(1, 1),
  map(transform),
  ...
);

By the time of the first transform the cursor may be exhausted, the full dataset is in memory, and it made a dozen of API calls.

Will the Infinite concurrency flood Node's event loop? I tried an example with non uniform timing and it produces results out of order: https://codesandbox.io/s/lingering-shape-kf1nui?file=/src/index.js

from ixjs.

trxcllnt avatar trxcllnt commented on May 25, 2024

The concurrent parameter refers to the number of inner sequences that are flattened at the same time. So if you have an outer source AsyncIterable that immediately yields 10 inner sequences, then concurrent=Infinity will flatten all 10 inner sequences and yield their values in the order they arrive. This aligns with the behavior of Rx's flatMap().

I can't tell exactly what you're trying to achieve from your example, but it seems if you know you only need to flatten a single depth, you may want to use concatMap instead of map and flat?

from(
  dbCursor({ pageSize })
).pipe(
  concatMap(async (page) => join(page, await apiCall(page))),
  map(transform),
  ...
);

from ixjs.

the-spyke avatar the-spyke commented on May 25, 2024

I was saying that readme mentions iterators, so it is not RxJS (observables). The next operation in the chain drives the pipeline. If I have the outer data source that could produce 1000 items, each of which could be flat() into 1000 more. By the time of the second map(transform) how many items will be in the memory? 1 million - 1?

I will try concatMap tomorrow, thank you!

from ixjs.

trxcllnt avatar trxcllnt commented on May 25, 2024

There will only be 1 element in memory at any given moment (assuming extremely aggressive GC behavior, obviously), and transform will have been called 1M times. If you introduce an unbounded buffer before transform (or use an operator that buffers internally, like join()), then yes there will be 1M elements in the buffer.

from ixjs.

the-spyke avatar the-spyke commented on May 25, 2024

From your comment above:

// outer: 0
// outer: 1
// outer: 2
// inner: 0-0

When you see outer in my case a DB query is executed. So, by the time of the inner 0-0 3 queries were executed. Each produces 1K items. The query-0 is in memory, 2 others may possibly be too (depends on network, etc). Then flat() calls itself recursively if the depth wasn't set, which means right after query-0 fulfilled items of that query will be outer for the internal flat which will do the same thing again. All this is staying in memory until it could be freed after themap.

If my explanation is confusing, then please count how many loaded you see before the first inner in the console here: https://codesandbox.io/s/ixjs-flat-concurrency-kf1nui?file=/src/index.js

from ixjs.

trxcllnt avatar trxcllnt commented on May 25, 2024

Could you wrap each outer sequence in AsyncIterable defer, so the DB query is only executed when the AsyncIterable is enumerated?

from ixjs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.