Comments (7)
You're right that flat()
needs to propagate the concurrent
parameter down in its recursive call. But I believe the underlying behavior you're seeing is working as intended.
Here's a more complete example to illustrate:
const { from, interval } = require('ix/asynciterable/index');
const { map, flat, take } = require('ix/asynciterable/operators/index');
(async () => {
const asynciterable = from([
interval(100).pipe(take(3)),
interval(100).pipe(take(3)),
interval(100).pipe(take(3)),
]);
const result = asynciterable.pipe(
map((_, i) => {
console.log('outer:', i);
return _.pipe(map((y) => `${i}-${y}`));
}),
flat(1, 1),
);
for await (const x of result) {
console.log('inner:', x);
}
})();
// outer: 0
// outer: 1
// outer: 2
// inner: 0-0
// inner: 0-1
// inner: 0-2
// inner: 1-0
// inner: 1-1
// inner: 1-2
// inner: 2-0
// inner: 2-1
// inner: 2-2
As we can see, the inner values are enumerated sequentially.
The outer async iterables are pulled immediately because we must use Promise.race()
(here) to enumerate both the outer and concurrent inner sequence(s) simultaneously.
Without this, the FlattenConcurrentAsyncIterable
wouldn't be notified that the outer sequence has yielded a new sequence that potentially needs to be flattened concurrently with the existing inner sequences. The outer sequence does yield new inner sequences immediately, but flat
doesn't exhaust the inner sequence until the previous inner sequence has completed.
This may not be desirable in the flat(1, 1)
case, so we've kept the previous behavior alive in concatMap()
, which you can use to implement flattenSequential
like this:
const { isAsyncIterable } = require('./targets/ix/util/isiterable');
const { from, interval } = require('./targets/ix/asynciterable/index');
const { map, flat, take, concatMap } = require('./targets/ix/asynciterable/operators/index');
(async () => {
const asynciterable = from([
interval(100).pipe(take(3)),
interval(100).pipe(take(3)),
interval(100).pipe(take(3)),
]);
const result = asynciterable.pipe(
map((_, i) => {
console.log('outer:', i);
return _.pipe(map((y) => `${i}-${y}`));
}),
flattenSequential(1),
);
for await (const x of result) {
console.log('inner:', x);
}
})();
function flattenSequential(depth = -1) {
depth = (depth < 0 ? Infinity : depth);
return function flattenOperatorFunction(source) {
return concatMap((item) => {
if (isAsyncIterable(item)) {
return depth > 0 ? flat(depth - 1)(item) : item;
}
return [item];
})(source);
};
}
// outer: 0
// inner: 0-0
// inner: 0-1
// inner: 0-2
// outer: 1
// inner: 1-0
// inner: 1-1
// inner: 1-2
// outer: 2
// inner: 2-0
// inner: 2-1
// inner: 2-2
from ixjs.
@trxcllnt Thank you for the explanation. I'm trying to use lazy pipelines for data streaming to keep memory footprint and CPU profile low. My current use case:
from(
dbCursor({ pageSize })
).pipe(
map(async (page) => join(page, await apiCall(page))),
flat(1, 1),
map(transform),
...
);
By the time of the first transform
the cursor
may be exhausted, the full dataset is in memory, and it made a dozen of API calls.
Will the Infinite concurrency flood Node's event loop? I tried an example with non uniform timing and it produces results out of order: https://codesandbox.io/s/lingering-shape-kf1nui?file=/src/index.js
from ixjs.
The concurrent
parameter refers to the number of inner sequences that are flattened at the same time. So if you have an outer source AsyncIterable that immediately yields 10 inner sequences, then concurrent=Infinity
will flatten all 10 inner sequences and yield their values in the order they arrive. This aligns with the behavior of Rx's flatMap()
.
I can't tell exactly what you're trying to achieve from your example, but it seems if you know you only need to flatten a single depth, you may want to use concatMap
instead of map
and flat
?
from(
dbCursor({ pageSize })
).pipe(
concatMap(async (page) => join(page, await apiCall(page))),
map(transform),
...
);
from ixjs.
I was saying that readme mentions iterators, so it is not RxJS (observables). The next operation in the chain drives the pipeline. If I have the outer data source that could produce 1000 items, each of which could be flat()
into 1000 more. By the time of the second map(transform)
how many items will be in the memory? 1 million - 1?
I will try concatMap
tomorrow, thank you!
from ixjs.
There will only be 1 element in memory at any given moment (assuming extremely aggressive GC behavior, obviously), and transform
will have been called 1M times. If you introduce an unbounded buffer before transform
(or use an operator that buffers internally, like join()
), then yes there will be 1M elements in the buffer.
from ixjs.
From your comment above:
// outer: 0
// outer: 1
// outer: 2
// inner: 0-0
When you see outer
in my case a DB query is executed. So, by the time of the inner 0-0
3 queries were executed. Each produces 1K items. The query-0
is in memory, 2 others may possibly be too (depends on network, etc). Then flat()
calls itself recursively if the depth wasn't set, which means right after query-0
fulfilled items of that query will be outer
for the internal flat
which will do the same thing again. All this is staying in memory until it could be freed after themap
.
If my explanation is confusing, then please count how many loaded
you see before the first inner
in the console here: https://codesandbox.io/s/ixjs-flat-concurrency-kf1nui?file=/src/index.js
from ixjs.
Could you wrap each outer sequence in AsyncIterable defer
, so the DB query is only executed when the AsyncIterable is enumerated?
from ixjs.
Related Issues (20)
- Adding a helper module for bringing in everything HOT 3
- What happened to the function signature on reduce (and scan)? HOT 4
- takeUntil should complete iterator immediately
- docs(readme): `Array#extras` link is defunct HOT 2
- Spurious 'unhandledRejection' events HOT 1
- Weird memory behaviour with merge HOT 1
- async timeout operator does not pass through its input HOT 2
- pipe() should work recursively, and the free standing pipe() should work inside the pipe chain method HOT 4
- Merge with iter-tools? HOT 3
- Typescript >= 4.3.2 HOT 1
- [Suggestion]: Update `@types/node`
- vite environment ix package load error
- Usage with Skypack CDN? HOT 2
- ES modules do not work HOT 2
- Build errors with Typescript 4.7.4 HOT 4
- Take while inclusive
- from with abort signal HOT 1
- merge function can cause unhandled rejected promise condition
- async iterable merge function that ends upon the first "done" HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from ixjs.