Giter VIP home page Giter VIP logo

Comments (69)

benlesh avatar benlesh commented on June 26, 2024 6

@gre: The signal also serves as a mechanism to check whether or not cancellation is closed during synchronous emission. Consider the following:

const source = new Observable((next, error, complete) => {
  let i = 0;
  while (true) { next(i++); }
  return () => { /* doesn't matter what you do here, it's never reached */ };
});

const controller = new AbortController();

source.subscribe(
  value => {
    if (value > 4) controller.abort();
  }
),
null,
null,
controller.signal
);

In the above, there's really no way to signal that we want to stop the while loop in the Observable. A token or signal solves that cleanly:

const source = new Observable((next, error, complete, signal) => {
  let i = 0;
  while (!signal.aborted) { next(i++); }
});

@zenparsing ... I'm game to explore other cancellation primitives, (AbortController isn't the best, but it's a good start), using a subclassed Observable might be nice. The main thing is that is needs to be something we can:

  1. synchronously check to see if it's "closed" or "aborted" or whatever, for the firehose case.
  2. Create "children" of, such that operators like mergeMap, etc. can be implement by the community.

from es-observable.

runarberg avatar runarberg commented on June 26, 2024 6

I decided to take this proposal for a test so I created a bunch of operators based on the simplified API. These include race, chunks, and even catchError.

https://github.com/runarberg/tc39-observable-proposal/tree/operators/src/extras

Overall I must say it was quite pleasant working with this API, I found it beneficial not having to have manage individual subscriptions in combination operators (such as flatMap).

Some minor downsides include creating a controller hierarchy to clean up upstream in take and having to have to manage a set of controllers in race. But overall I found the benefits outweigh the downsides.

I do think this proposal can do without some of the included methods (Observer.prototype.{first,last,forEach}) as they can all easily live in userland libraries (particularly Observer.prototype.last). Maybe keep forEach since most collections come with a forEach defined.

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024 6

I don’t really know though when you would prefer one over the other in real life.

I can literally talk for 12 hours on the advantages of push streams (like observables) vs. pull streams (like async iterators) but to name a few:

  • observables guarantee they never buffer so you have a much stronger memory guarantee.

  • observables are super-strict (read: eager) which means you get data "as soon as possible" from the producer.

  • observables provide a much simpler interface since all they do is push data - the protocol is just "you subscribe to me and get updates".

  • observables (mostly) have to be "functions to actions" since because they are so eager - they require the extra step from observable to observer.

  • observables give all the rate control to the producer.

  • async iterators can buffer - if I have an iterator for click events and I do not consume it - it will "buffer" the click events for me and when I iterate it I can get events "from the past".

  • async iterators are lazy, which means they can support backpressure very easily (that is, controlling how much data you ask from the producer to even-load chained iterators).

  • async iterators are a pretty complicated interface making them harder to implement. They build on promises so their timing guarantees are different though that's just a detail and not push vs. pull.

  • If you have an async iterator - you already have a thing for consuming the resources (like an observer and unlike an observable) which you can only get away with because iteration is itself lazy.

  • with async iterators all the control is at the consumer side.

Note RxJS already supports Symbol.asyncIterator and since it's the most popular implementation anyway I recommend you experiment with that.

from es-observable.

appsforartists avatar appsforartists commented on June 26, 2024 5

I'm not a TC39 member, so I don't have enough context to know what makes this proposal easier/harder than the previous one.

As a user, I prefer Observer because named parameters have better usability than positional ones.

new Observable(
  // That's a lot to remember
  (next, error, complete, abort) => {}
)
// vs
new Observable(
  // I can use only the parts I need, and order doesn't matter
  ({ complete, next }) => {}
)

To your point, Subject is a very useful tool, and it's trivial when the shape of Observer is well-defined.

from es-observable.

benlesh avatar benlesh commented on June 26, 2024 5

@staltz, I think the real place Observables will be most useful will be as a common API for pushed events, such as DOM events, SSE, WebSockets, Animations, etc. Generally, something better to use than the current EventTarget API.

The point of this issue was to show that we might be able to not have to standardize quite as much, and maybe try to make this look at little more "JavaScripty", ala Promises, as concessions to get the primitive into the language.

Having a uniform API for event handling that can also be leveraged to create other streams of events, such that resource teardown from all of those things is also uniform, could have a very positive effect on developer ergonomics and reducing the likelihood of memory leaks in apps. Right now you have a situation where you need to keep N function references so you can removeEventListener with them, and you have to remember to call close on some types, or abort on others, maybe hold onto a number so you can clearWhatever with it later. It's a bit of madness. Not to mention there are different ways to subscribe to events from all of these things, and virtually no way to compose them in declarative ways. Getting observable into the standard would be a solid step in that direction. Any observable type, really. Just so long as it's lazy and it doesn't force multicasting.

I think for Node, async iterators will solve a solid number of usecases around file reads, and network IO. However for the frontend, allocating a promise-per-turn for things like mouse movements, or even web sockets, is a bit of madness IMO.

Callbacks, of course, will always work, but they're unfortunately not at all prescriptive, so it results in a Wild West of APIs, no uniformity, and a larger pit of failure for releasing resources.

from es-observable.

gre avatar gre commented on June 26, 2024 4

That's a great simplification. One downside I see is the need to remember the ordering of params especially for the subscribe.

Also another suggestion is that maybe you don't need a signal but just a function:

  • On create side I find it really great to return a unsubscribe func (like react effect or in rxjs)
  • On subscribe side, why not returning just a unsubscribe func?
new Observable((next, error, complete) => {
  next(1);
  next(2);
  next(3);
  complete();
  return () => {};
});
const unsub = source.subscribe();
unsub();

from es-observable.

benlesh avatar benlesh commented on June 26, 2024 4

FWIW: RxJS observables support interop with async iterables, and there's also a 3rd party library for converting observables to async iterables: https://github.com/benlesh/rxjs-for-await. In that library I identified the four most common ways developers might want to convert pushed events to async iterables.

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024 3

@benlesh

@benjamingr the "fluff" is actually what makes Observable an important primitive.

Yeah I agree. I am just not convinced that the "fluff" is bad or that the original proposal was blocked on the fluff rather than the heavy lifting. I am perfectly content with a battle tested API over a more minimal one to be honest :]

(I am enjoying the mental exercise though!)

from es-observable.

FireyFly avatar FireyFly commented on June 26, 2024 3

Wrt web and supplementing callback-based APIs with Observable ones, it seems to be the direction argued in the WHATWG issue on Observables. I would guess that if TC39 specifies a standard Observable interface, it would (hopefully) end up being used in Web specs and APIs in the future too.

from es-observable.

gre avatar gre commented on June 26, 2024 3

@acutmore the fear I have with some of this pattern as well as the one exposed with AbortController is what is going on with the left behind listeners. they are leaking aren't they?, it can be a big deal if you use a lot of observable (I can source.subscribe() many times).

const source = new Observable((next, error, complete, signal) => {
  const interval = setInterval(next, 1000);
  return signal.addEventListener(() => clearInterval(interval) /* here */);
});

Also, there should be a guarantee that this won't be called twice.

The only way I anticipate this can be solved is that if that signal param that gets send in the Observable function is NOT the one that comes from subscribe but a new proxy object that is created individually per subscription to bring these guarantee and "release" on end of the observable? This would need to be specified if this was the idea.

Similarly, let's say you want to implement the race(...observable) function, you would have to create an intermediary signal for each?

I understand that this doesn't solve the "firehose sync data" case but this is so much more straightforward.

const source = new Observable((next, error, complete) => {
  const interval = setInterval(next, 1000);
  return () => clearInterval(interval);
});

to avoid the confusion that subscribe's signal is not the same as one in observable, wouldn't this work with a simple callback?

const source = new Observable((next, error, complete, onUnsubscribe) => {
  const interval = setInterval(next, 1000);
  onUnsubscribe(() => clearInterval(interval));
});

from es-observable.

spaceribs avatar spaceribs commented on June 26, 2024 3

Jumping on to what @benjamingr said, ReactiveX has already built an interface around async iterators, and can be seen here: https://github.com/ReactiveX/IxJS

These technologies are not in opposition, they are different tools for different jobs.

from es-observable.

gre avatar gre commented on June 26, 2024 2

@benlesh I don't find this necessarily too verbose:

const source = new Observable((next, error, complete) => {
  let i = 0;
  let aborted;
  while (!aborted) { next(i++); }
  return () => {
    aborted = true;
  }
});

If the idea is to remove the types like Observer and Subscription, why not simplifying this too.

Moreover, your example is to me the most uncommon one, most of the time you would "clean up" things like timeout or event unlistening.
And for these, this is general much simple to me:

const source = new Observable((next, error, complete, signal) => {
  const interval = setInterval(next, 1000);
  return () => clearInterval(interval);
});

Now try to solve this with signal: it sounds like you're going to need an event listener. Who is cleaning up the listener on the signal? Is it getting called even when the observable was unsubscribe? Is it leaking if you reuse a signal at many places? what if I consume my observable more than once?

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024 2

Would it make sense for RxJS to migrate to this API (experimentally)?


An observable is just a setter for a setter (basically why RxJS and things like MobX or Vue are fundamentally the same idea :] ). so if we want a much more minimal API the following could work:

(By the way, if anyone other than Ben (whom is probably familiar with the material) wants a good intro I recommend @headinthebox who authored Rx's https://channel9.msdn.com/Events/Lang-NEXT/Lang-NEXT-2014/Keynote-Duality )

type Iterable<T> = () => (() => T);
type Observable<T> = (observer: (arg: T) => void) => void;
// would have preffered being able to write `T => () => ()`

Then things like error handling and completion can be build on top of that. Your example could be:

  test("should at least work", () => {
    const source = (next) => { next(1); next(2); next(3); }

    let results = [];

    source(value => results.push(value));

    expect(results).toEqual([1, 2, 3]);
  });

After all in regular RxJS subscribe (without the fluff) is just:

class Observable {
  constructor(subscribe) { this.subscribe = subscribe; }
}

Although to be fair I am fine with either API and I don't think that's why this proposal has stalled :]

from es-observable.

benlesh avatar benlesh commented on June 26, 2024 2

@benjamingr the "fluff" is actually what makes Observable an important primitive.

  1. Guarantees that "teardown" occurs on error, completion, and unsubscription (consumer disinterest).
  2. Guarantees that calling next, complete and error handlers can't be called out of order or otherwise unreasonably.
  3. Guarantees that unhandled errors are propagated in a way that makes sense and doesn't cause strange, hard to debug errors.

If I thought we could do this reliably and safely with ordinary functions, I wouldn't be arguing it belonged in the language, as we already have functions.

from es-observable.

staltz avatar staltz commented on June 26, 2024 2

What is actually the goal of the proposal? I've seen a lot of discussion and bikeshed around the API details, but what will this actually achieve? This repo's readme says:

By offering Observable as a component of the ECMAScript standard library, we allow platforms and applications to share a common push-based stream protocol.

But callbacks are legitimately push-based stream primitives, and are widely used already. They just don't have a contract for next/error/complete semantics. I don't see a need for an Observable primitive in the language if that primitive won't be used everywhere where it makes most sense, e.g. HTTP or for a sequence of push messages (WebWorker postMessage/onmessage) or DOM events. We know the web can't afford a breaking change, so at best we would be adding alternative APIs to supplement the callback-based APIs. But libraries already work as supplements.

But this is a JS spec, not a Web spec. Perhaps Observable could be more useful as a primitive in Node.js? I don't think so, they are making good use of AsyncIterable over there for readable streams.

So – honestly – what's the point of this spec?

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024 2

things get unsubscribed when observable is garbage-collected

I don't think that's an approach any committee would like and I think deterministic cleanup is important.

Could you elaborate?

Sure, 90% of the time when writing code that uses both observables and promises I have something like:

async function doFoo() {
  // setup page
  await observable.forEach(() => {
    // update page
  });
  // do more work
}

Though I guess this can be by implementing Symbol.asyncIterator (which is probably more spec work?)


I'd like to emphasize the observable proposal isn't actually blocked on either of those things though. It's not that cleanup or forEach went to the committee and the proposal got blocked on them - the proposal just doesn't have an active champion and hasn't been presented to tc39 or actively worked on in a long while.

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024 2

If we ignore how important resource cleanup and safe error handling semantics are - observables are very simple - you can see callbags for a take on it. I don't think there is any interest in standardizing semantics without cancellation and error handling semantics.

An observable with that stripped is just:

// (T => ()) => ()     or in TS syntax something like (T => void) => void
const observable = (fn) => {
  let i = 0;
  setInterval(() => fn(i++), 1000);
};

observable((data) => console.log('got data from observable', data));

Erik goes in-depth into it in his talks about Rx (About getters to getters and setters to setters).

As for why do we need everything around it? For the same reason we need clear defined semantics for iterators and why an iterator isn't just a getter to a getter the same way this is a setter to a setter:

// Note this has the exact same signature with all the arrows inverted, this is 
// what people refer to by duality in the repo or when talking about observables
// () => (() => T) or in TS syntax something like () => () => T
function iterator() {
  let i = 0;
  return () => i++;
}

iterator(); // get next value

The reason we need an iteration protocol rather than just tell people to pass functions around is twofold:

  • First because conventions are important for libraries to interact with eachother with clear error handling and resource management semantics
  • Second because it means the language and platform itself can use observables (or iterators or promises) in the language and specifications itself. For example in my case it would enable a better story for observables in Node.

from es-observable.

benlesh avatar benlesh commented on June 26, 2024 1

@benjamingr I've expressed my concerns and opinions about the emitter proposal here: tc39/proposal-emitter#26 (comment)

from es-observable.

benlesh avatar benlesh commented on June 26, 2024 1

@gre that cannot possibly work, as it will enter an infinite loop before returning that cancellation function.

It's also not as uncommon as you might think. Converting array to observable, or creating a range observable, or an iterable (the iterable could even have side effects on each turn). All of these things require that we have the ability to check and existing flag of some sort to make sure we aren't looping unnecessarily.

from es-observable.

benlesh avatar benlesh commented on June 26, 2024 1

@gre The reason it works is because the signal provided was created before the for loop was ever entered, If any action that occurs on next (called in the body of the for loop) aborts the signal, the signal will be flagged as aborted synchronously. Which means on the next pass through the loop, it will see it has been aborted, and stop the loop.

from es-observable.

zenparsing avatar zenparsing commented on June 26, 2024 1

@benlesh In the interest of taking a fresh look at things, let me offer my perspective on the firehose issue.

I've never liked it, although I realize that users typically don't complain about it, given typical Observable usage scenarios. But in my experience with implementing combinators, it's a reoccuring footgun. The naive solution is typically wrong due to it.

In zen-observable (which has some usage, although certainly not anywhere near as much as RxJS), I ended up disallowing firehosing completely. Instead, the "firehosed" items get queued up, and "of" and "from" don't firehose (they wait a tick before sending). The change went in almost two years ago, and although I've gotten (awesome) bug reports over that time, I've never seen a single complaint about the queuing behavior.

If you avoid the firehose problem, then you can just return a cleanup function from the Observable init function. Then you can have a clean layering: Observables don't need to assume a cancellation primitive (other than functions of course), and cancellation tokens can be built on top of Observable.

from es-observable.

acutmore avatar acutmore commented on June 26, 2024 1

Personally I quite like the AbortController-AbortSignal, though can see how that makes this proposal a little more difficult to digest on the Node side, not having those APIs already.

I have forked the original CodeSandBox with an example of how cancelation could be done using Observables to cancel Observables as @zenparsing has already mentioned (though I didn't add a subclass). I also added a few tests to cover the modifications.

https://codesandbox.io/s/tc39-observable-proposal-proposed-change-dqkqd

const source = new Observable((next, err, complete, takeUntil) => {
    let abort = false;
    takeUntil.subscribe(() => (abort = true));
    for (let i = 0; i < 100 && !abort; i++) {
        next(i);
    }
    complete();
});

let abort;
const abortWhen = new Observable(n => {
    abort = n;
});

source.subscribe(
    () => results.length === 3 && abort(),
    null,
    null,
    abortWhen
);

or a more real-life example:

const timeout = (time) => new Observable((next, err, complete, takeUntil) => {
    const id = setTimeout(() => {
        next();
        complete();
    }, time);
   takeUntil.subscribe(() => clearTimeout(id));
});

const onEvent = (elm, event) => new Observable((next, err, complete, takeUntil) => {
    elm.addEventListener(event, next);
    takeUntil.subscribe(() => elm.removeEventListener(event, next));
});

timeout(10 * 1000).subscribe(
    () => launchSatalite(),
    null,
    null,
    onEvent(document.getElementById('cancel'), 'click')
);

To make it work I did also need to change subscribe to return a unsubscribe function.

from es-observable.

yyx990803 avatar yyx990803 commented on June 26, 2024 1

@SerkanSipahi thanks for tagging me, but Vue's "observable" is focusing on a rather different problem domain, which is why we renamed the API to "reactive" in Vue 3. In particular, modeling the stream of changes over time and dealing with cancellation etc. is not part of the concern in Vue's reactivity system. I see the two systems with slight overlap but can be easily used in a complementary fashion.

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024 1

@yyx990803 it's actually exactly the same problem doing the exact same thing although I understand why it might not seem that way. That's the point the inventor of Rx makes in the talk I linked to above.

I know it might seem orthogonal but vue.observable and Rx are two sides of the same coin with observables just being setters for setters.

from es-observable.

yyx990803 avatar yyx990803 commented on June 26, 2024 1

@benjamingr I agree they share the same root at the theory level. But the intention behind their specific designs are aimed at different end user scenarios. So my point is I'm not interested in expanding Vue's reactivity system to be more observable-like. I'd rather it stay minimal and serve its current purpose. Nor am I interested in making observable the default in the Vue system, due to the extra mental overhead it incurs on users. Vue's goal would be making sure its own reactivity system can play nicely with 3rd party observable systems.

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024 1

@yyx990803 I think I am communicating poorly because I really don't think that it needs to be expanded.

Vue's reactivity system (like Knockout, MobX and any other getter/setter or proxy system) is very much already the same thing (not in the root or theory level but in practice) as RxJS observables just with better ergonomics and different goals.

That's what the talk I linked to by Erik shows - an observable is just a setter for a setter with error handling and completion semantics - it's the same thing :]

Regardless I commend your commitment towards a simple and user friendly API :] (which is why I use Vue in my own code)

from es-observable.

acutmore avatar acutmore commented on June 26, 2024 1

@gre in both the signal and observable implementations the value passed through is not the original but a 'child'. Doesn't create a leak.

I like the onUnSubscribe you suggest. As there is only one valid way to use the value, so making that the only way it can be used is quite nice.

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024 1

@staltz

But this is a JS spec, not a Web spec. Perhaps Observable could be more useful as a primitive in Node.js? I don't think so, they are making good use of AsyncIterable over there for readable streams.

If you have interesting ideas for Observables in Node.js we are definitely interested in learning and considering but observables have no straightforward backpressure story and we prefer pull to push for most things at the moment.

I don't speak for the whole project but based on the sessions from the last few collaborator summits our consensus is that AsyncIterables are planned to have a more prominent role in the Node.js async story.

But callbacks are legitimately push-based stream primitives, and are widely used already. They just don't have a contract for next/error/complete semantics.

Precisely, and that's the point I was trying to make here. The issue is that next/error/complete semantics are really important.

Our EventEmitter in Node.js has error semantics and I think EventEmitter can be really confusing and if we could replace that and unify EventEmitter and EventTarget we (Node) would be interested in that and interoping with it.

So – honestly – what's the point of this spec?

Well, ideally the same way AsyncIterables are being used as "unified streams" in universal JavaScript allowing easy access to browser/node code with syntax support - I would love observables (or the events proposal - I also like that one) to be that for event emitters.

I am personally fine with either this proposal, the rx proposal or the events proposal for that purpose.

The goal (in my opinion - I don't speak for Ben obviously) is to have a unified push primitive that interops through all JavaScript with a contact to guide semantics.

from es-observable.

benlesh avatar benlesh commented on June 26, 2024 1

@gre If we want to go the route of having a returned teardown function, we only need a way to check to see if the subscription has been closed. (Whatever that way is, before it was on subscriber.closed, and I'm okay with that):

new Observable((next, error, complete, isClosed) => {
   let i = 0;
   while (!isClosed()) {
     next(i++);
   }
});

// or

new Observable(subscriber => {
  let i = 0;
  while (!subscriber.closed) {
    subscriber.next(i++); 
  }
});

(I actually prefer the latter of the two, honestly, but it might not be "javascripty" enough for the committee)

Another thing is cancellation tokens would be a nicer fit for other parts of javascript, like promise-based APIs. Cancellation tokens also provide a way of cancelling the forEach means of subscription, which exists for compat with async/await

from es-observable.

mAAdhaTTah avatar mAAdhaTTah commented on June 26, 2024 1

@acutmore It's a function, not a boolean.

from es-observable.

acutmore avatar acutmore commented on June 26, 2024 1

@mAAdhaTTah whoops! Completely missed that somehow. Thanks for correcting me 🙏

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024 1

@benlesh I think AbortController and AbortSignal is just an Observable with the context of cancellation. In code form, it looks like this:

const cancellationObservable = new Observable(next => next('CANCEL'));

This simply shows that for an Observable to know when to cancel, it should listen to another Observable that emits a cancellation.
For instance, take a look this pseudo-code:

interval(1000).subscribe(
  value => console.log(value),
  error => console.log(error),
  () => console.log('done'),
  fromEvent(button, 'click').mapTo('CANCEL') // cancellation observable
)

The subscription function accepts an Observable that emits a context of cancellation and then it passed it to interval Observable to listen for that cancellation.

This idea suggests that a reactive program requires a reactive Observable. It also shows, that cancellation has never been a fundamental property of an Observable but an emergent property of some programs. Simply because there could be other types of data that an Observable can react with.
For instance, a timer Observable, it emits data in the form of time but also listens to an external entity when to propagate or when to pause or when to completely stop. Pausable Observable will be easily achieved using this pattern. Another type of Observable I can think of is Pull Observable, with this pattern a producer can listen for a request when to emit data.

For more information, I've written an article that attempts to redefine Observable at its core. It has examples for synchronous propagation and a speculated specification of Observable. Check out the link below.
https://github.com/cedmandocdoc/redefining-observable

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024 1

Why cancellation cannot be left up to userland libs to implement (like RxJS) and need to be part of spec? Promises don't have cancellation mechanism, and there's cancelable-promise (and others) for that.

Cleanup is fundamental to observables and without cancellation their expressiveness has significantly diminished. Though node and browsers (and others) settled on AbortSignal and that Chrome is blocking any non AbortSignal primitive suggestions in TC39.

What's the point of forEach method here? Isn't that the same as subscribe(next)?

You can await it which helps ergonomically.

from es-observable.

runarberg avatar runarberg commented on June 26, 2024 1

For the record, I experimented with Symbol.asyncIterator back when I was trying out the simplified API for fun a couple of years ago:

https://github.com/runarberg/tc39-observable-proposal/blob/operators/src/Observable.js#L107-L156

I think there are some subtle differences between that particular implementation of Symbol.asyncIterator and the forEach. Unlike forEach the async iterator will not yield a new item until .next() is called. I don’t really know though when you would prefer one over the other in real life. However there has been a situation where I wanted my items chunked by the ticks they were pushed, so I also implemented a chunks operator for that. There could be an argument made that this is how Symbol.asyncIterator should behave.

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024 1

Not cancellation semantics but interaction semantics is what is needed.

On the code you have shown:

// (T => ()) => ()     or in TS syntax something like (T => void) => void
const observable = (fn) => {
  let i = 0;
  setInterval(() => fn(i++), 1000);
};

observable((data) => console.log('got data from observable', data));

It is not just observable that takes place, it is also an observation. When we called the observable that is when the observation takes place. In an observation, interaction is inevitable. That is why we need a structure for interactive observable.

Also I don't think that's entirely true. The inventor of Observables (as we know them) presented them with resource management semantics and error handling

I am not establishing the idea from the current definition of Observable. I am more like to rediscover and find proof of why cancellation is fundamental. To me, it is not because it is simply doesn't exist to all kinds of observable. If you can point me onto where on the study the proof why the cancellation is the fundamental and not the interaction that would be great.

from es-observable.

benlesh avatar benlesh commented on June 26, 2024

NOTE: AbortSignal could be replaced with any other cancellation standard that lands, provided it's a token-type cancellation.

from es-observable.

acutmore avatar acutmore commented on June 26, 2024

Can see that this implementation does not have the semantics of the 'safe' SubscriptionObserver i.e. guaranteed that calling nextHandler, errorHandler or completeHandler would never throw.

Is that because this issue should just focus on discussing just the API design and not other semantics, or because this new design would not come with that guarantee? Didn't want to presume either way.

from es-observable.

zenparsing avatar zenparsing commented on June 26, 2024

Cool - we probably should have went down this "function-first" road from the start, but we had a "no closure" bias, if I recall...

Arguably, the cancel token in this API would best be a "subclass" of observable (with a subscribe method, etc.).

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024

It's alive!🥳

So what about https://github.com/tc39/proposal-emitter ?

from es-observable.

gre avatar gre commented on June 26, 2024

Good point sorry I missed half of the code point. Please ignore the first part of my answer. Second part remain. @benlesh

from es-observable.

gre avatar gre commented on June 26, 2024

Yet I'm curious why in your code you are not prone to the same infinite loop. No way it can works in monothreaded JS 🤔

from es-observable.

benlesh avatar benlesh commented on June 26, 2024

@gre Sorry, I edited my other response. I guess I wasn't expecting you to respond so quickly.

from es-observable.

gre avatar gre commented on June 26, 2024

Ok that's a great usecase for the sync loop then.
So now I'm curious about how the second problem I mentioned would be solved with signal (cleaning up listeners and timeouts) and so it doesn't leak listeners and call listeners once when observable is done (as it's the case in rxjs). The fact signal comes from outside makes it hard to solve.

from es-observable.

SerkanSipahi avatar SerkanSipahi commented on June 26, 2024

Hello @yyx990803 (Vuejs Creator), would you like to get involved in the discussion? As far as I know, vuejs has its own Observable implementation. It would be good if we could create a standard here with RxJx, Mobx, Vue, others (I don't know the others).

The discussion/proposals should not only arise from the RxJs perspective!

from es-observable.

benlesh avatar benlesh commented on June 26, 2024

Cool - we probably should have went down this "function-first" road from the start, but we had a "no closure" bias, if I recall...

@zenparsing, yeah, I recall reading something about a function-first approach before, but I think one of the problems there was we also needed a way to stop the synchronous "firehose", and subscriber.closed provided that. With a cancellation token, we no longer are hindered by that. (And honestly, from a maintainer's perspective, cancellation tokens would clean up a reactive library considerably, there's a lot of dancing around the fact that subscriptions need to exist up front)

from es-observable.

acutmore avatar acutmore commented on June 26, 2024

The Fetch API returns a Promise that rejects on signal.abort.

Should the promise returning methods in this proposal mirror that behaviour? Right now the promise remains in a pending state.

Perhaps the single Fetch example is not enough of a precedent to follow, but can see myself being asked to explain the difference when training developers

from es-observable.

acutmore avatar acutmore commented on June 26, 2024
new Observable((next, error, complete, isClosed) => {
   let i = 0;
   while (!isClosed()) {
     next(i++);
   }
});

Could never work. All false values are immutable.

Edit: I am wrong. It’s a function not a boolean

from es-observable.

acutmore avatar acutmore commented on June 26, 2024

Adding another alternative that avoids currently non-standard Disposable and AbortSignal, but doesn't preclude extending Observable to include passing in a signal parameter in the future if they are added to the language.

Should using new Observable for the firehose case be considered a 'bug', as IMO the primary use case is for async push. Instead a static Observable.from(iterable) would be how to author that code.

Another observation is that events that trigger 'disposal'/'cancellation' can be split into two categories. Events that happen outside of the observable, and events that originate from the observable. There is no need to dispose on 'error', or 'complete' as the source has already stopped, only 'next' needs access to the disposable. These two categories can have two different ways of accessing dispose. The 'next' callback is passed dispose as an additional argument. External events can use the function returned from subscribe.

Con: forEach would not be disposable from an external event.

interface Iterable<T> {
    [Symbol.iterator](): Iterator<T>;
}

interface Observable<T> {
  static from<T>(iterable: Iterable<T>): Observable<T>;

  new (
    initialization: (
      nextHandler: (value: T) => void,
      errorHandler: (err: any) => void,
      completeHandler: () => void
     ) => () => void
  ): Observable<T>

   subscribe(
      nextHandler?: (value: T, dispose: () => void) => void,
      errorHandler?: (err: any) => void,
      completeHandler?: () => void, 
  ): () => void;
}
const fireHose = Observable.from(function* () {
  for (let i = 0; i < 1000000000; i++) {
    yield i;
  }
}());

fireHose.subscribe((v, dispose) => {
  console.log(v);
  if (v === 3) {
    dispose();
  }
});
const interval = new Observable(next => {
 const id = setInterval(next, 1000);
 return () => clearInterval(id); 
});

const dispose = interval.subscribe(v => {
  console.log(v);
});
setTimeout(dispose, 3 * 1000);

If AbortSignal was added to the language:

interface Iterable<T> {
    [Symbol.iterator](): Iterator<T>;
}

interface Observable<T> {
  static from<T>(iterable: Iterable<T>): Observable<T>;

  new (
    initialization: (
      nextHandler: (value: T) => void,
      errorHandler: (err: any) => void,
      completeHandler: () => void,
      signal: AbortSignal
     ) => () => void
  ): Observable<T>

   subscribe(
      nextHandler?: (value: T, dispose: () => void) => void,
      errorHandler?: (err: any) => void,
      completeHandler?: () => void,
      signal?: AbortSignal
  ): () => void;
}

from es-observable.

runarberg avatar runarberg commented on June 26, 2024

A case for keeping the Observer and Subscriber objects—for passing into the constructor and the subscribe method respectively—is for future extensions. Say we want to add a finally handler in the subscription that will fire on any value (error or otherwise; similar to Promise.prototype.finally) we are kind of out of luck if we stick with positional arguments.

from es-observable.

acutmore avatar acutmore commented on June 26, 2024

@cedmandocdoc That API becomes less clean when you want to 'cancel' the observable based on the values from 'next'. e.g. I want to take the first 10 values only.

from es-observable.

matthewwithanm avatar matthewwithanm commented on June 26, 2024

@cedmandocdoc Have you seen the rxjs takeUntil operator? It allows you to use another observable basically the same way as an AbortSignal. With a replay subject you can basically get the same result (an imperative cancelation API via subject.next()) and it's pretty ergonomic!

I think @benlesh's proposal here is more about simplifying the core Observable API by aligning with the de facto standard cancelation mechanism (AbortSignal).

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024

@acutmore The idea of cancellation being an Observable is supported by the definition of an Observable itself, that is, basically an object that emits values.
Cancellation is an action that comes from an entity and represents data in the context of canceling something. Whoever the actor is or even where or when the action takes place doesn't matter, if it represents a cancellation that is fine.

With all that in mind, we can implement different forms of cancellation. To list a few we have:

  • A cancellation that comes from DOM event (previous example). This form of cancellation shows that the actor is a button, and the event takes place from the moment the button was clicked.
interval(1000).subscribe(
  value => console.log(value),
  fromEvent(button, 'click').mapTo('CANCEL') // cancellation observable
)

  • An AbortController-like cancellation (could be an answer, to your feedback). A cancellation where the actor is a function and takes place upon the calling of that function e.g. an Observer.
class Teardown extends Observable {
  constructor(producer) {
    super(producer);
    this.run = () => {};
  }

  subscribe(next, observable) {
    this.run = () => next('CANCEL');
    this.producer(next, observable);
  }
}

const teardown = new Teardown(() => {}); // pass an empty producer

fromArray([1, 2, 3]).subscribe(
  value => value === 2 && teardown.run() // fire a cancellation from the observer
  teardown // pass the Teardown Observable
);

  • An RxJS Subject Cancellation (could be an answer, to your feedback). This implementation of cancellation is generic, Subject could be used for other contexts as well. This may not be a good solution for a one-to-one relation of Observable and Observer but on the other hand it fits with one-to-many relation. One invoke cancellation could cancel many Observables.
const subject = new Subject();

interval(100).subscribe(
  value => value === 2 && subject.next('CANCEL') // fire a cancellation
  subject // pass the subject
);

All examples show that cancellation could come from anywhere and could take place anytime. This could prove that cancellation is indeed just an Observable with the context of canceling something.

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024

@matthewwithanm The operator takeUntil shows that cancellation takes place when an Observable emits something.
I think this commend the idea of cancellation is just another form of Observable.

  • It could happen anywhere or anytime, since takeUntil knows when to emit a cancellation
  • It is a data-cancellation representation that comes from another Observable, takeUntil waits for an emission that represents a cancellation.
  • It could wrap by a Subject to add the ability to cancel imperatively

And as you have said it is pretty ergonomic and yes I agree with that.

But I think the difference from the pattern I've showed compared to takeUntil operator is implementation. As far as I know the operator takeUntil relies on the returned function of the producer to cancel things but with the pattern I've showed it cancel the Observable through an Observable (with a specific context) by default.


I think @benlesh's proposal here is more about simplifying the core Observable API by aligning with the de facto standard cancelation mechanism (AbortSignal).

What do you mean by the de facto standard cancelation mechanism? Does this mean for the whole Javascript API? For example setInterval:

const controller = new AbortController();
const callback = () => {
  // logic
  controller.abort();
};
setInterval(callback, 1000, controller.signal)

I'm not sure about that, but if Javascript will embrace AbortController as a standard to cancel things for all the callback base or async API, I would say there will be contradictions. Different types of async API have different forms of cancellations. For instance if we abort a fetch request it resolves to an error this contradicts to others like setTimeout which doesn't have an error callback.

But from the pattern I've showed we can generalized those inconsistencies. For example we can create an AbortObservable that truly aborts an ObservableFetch.

class AbortObservable extends Observable {
  constructor(producer) {
    super(producer);
    this.run = () => {};
  }

  subscribe(next, observable) {
    this.run => () => next('ABORT'); // this emission will be observed by an ObservableFetch which then aborts the request to resolve to an error
    this.producer(next, observable);
  }
}

const abort = new AbortObservable(() => {}); // pass an empty producer

fromFetch(options)
  .listen(
    value => console.log(value),
    abort
  );

abort.run(); // abort the fetch

This not just limited to just abort fetch you can pass a merged Observable and merges an AbortObservable or plain CancelObservable where just cancel an Observable.

merge(AbortObservable, CancelObservable)

I think the idea of cancellation being an Observable is more simple than the use of AbortController and AbortSignal. Because of the ability to generalized cancellation. Lastly, I think it is more primitive than AbortController because you can create an AbortController-like using an Observable. It just all depends on how Observable should communicate with each other.

from es-observable.

adamf92 avatar adamf92 commented on June 26, 2024

@benlesh Are you planning that changes in some future RxJS release ?
From my perspective, one of the most important features in current Subscription implementation is abitity to create and manage aggregated/nested subscriptions - is it possible with signal/controller or will need additional implementation?
I'm creating a framework (Atomi-Q, I think it could be interesting for you, as it demonstrates the potential of RxJS as a base for the fastest DOM updating solution) with concept called "Reactive Virtual DOM" - it's a tree of objects like Virtual DOM, but every dynamic/changeable node is Observable - thanks to it every state change is causing updates only in connected nodes - it's many times faster than Virtual DOM diffing. So, when elements are created, I can easily add their subscriptions to the parent subscription with add() method and when some element is removed, unsubscribing its subscription,
automatically unsubscribes all children subscriptions, so it works great with my concept. From examples I suppose that passing same signal to multiple subscribe calls will allow unsubscribing multiple streams at once, but it will be nice to have it working same way as currently in RxJS

from es-observable.

dy avatar dy commented on June 26, 2024

Sorry for reiterating on the subject - can someone please explain (again) or give ref to:

  1. Why cancellation cannot be left up to userland libs to implement (like RxJS) and need to be part of spec? Promises don't have cancellation mechanism, and there's cancelable-promise (and other mechanisms) for that. Afaik some committee members are not much fond of AbortController.
  2. What's the point of forEach method here? Isn't that the same as subscribe(next)?

Also a point to add on leaking listeners. Idk was FinalizationRegistry a thing for the time of discussion, but as tested in sube - it's possible to subscribe leak-free without necessary unsubscription.

Feels like cramming too much stuff into spec is the main reason it's stalling.
Just want to ref #210 as minimal meaningful proposal.

from es-observable.

dy avatar dy commented on June 26, 2024

Cleanup is fundamental to observables and without cancellation their expressiveness has significantly diminished.

Maybe I haven't met use-cases for that yet. It's just for UI purposes (subscribable-things, templize, observable-value/value-ref) it's not apparent where that's useful - things get unsubscribed when observable is garbage-collected.
@zenparsing also mentioned that as footgun. Do you possibly have more elaborate example where it is a must?

You can await forEach it which helps ergonomically.

Could you elaborate? Just want to understand where it helps and why it's unavoidable for spec. Recently I witnessed the opposite example: node collections can be detected by presence of forEach method and iterated over immediately (HTMLCollection, NodeList, Array), whereas Observable is expected to be subscribed to, but pretends to be a collection.

from es-observable.

dy avatar dy commented on June 26, 2024
Naive Symbol.asyncIterator:
async *[Symbol.asyncIterator]() {
  let resolve,
      buffer = [],
      unsubscribe,
      promise = new Promise(r => resolve = r)

  { unsubscribe } = this.subscribe(value => {
    buffer.push(value)
    resolve()
    promise = new Promise(r => resolve = r)
  })

  try {
    while (1) yield*  buffer.splice(0), promise
  }
  catch {}

  unsubscribe()
}

from es-observable.

dy avatar dy commented on June 26, 2024

Great, that seems like exhaustive solution to the 2nd point from #201 (comment), which adds to #210 - leave .forEach / [Symbol.asyncIterable] to userland.

Remains cancellation mechanism and firehose case. Maybe there is argumentation to avoid that, like mentioned by @zenparsing? To clearly understand what purpose it properly serves, maybe some good examples (not theoretical purity).

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024

Sorry to slide in, but I don't think cancellation is fundamental to Observable. By itself, it just propagates data and nothing else.

We don't need cancellation as long as there is no observation that will occur. Observation is not equal to observable and it is the observation that creates the structure between the producer and the consumer to interact with one another.

Interaction is inevitable in observation and in this structure that cancellation emerges. It is not the cancellation that needs to be simplified in the API but should be the structure of interaction between the producer and consumer.

Cancellation is just an emergent property like anything else that an application would demand probably like a demand for the next data like iterable. These emergent properties act like just another observable, it propagates data but in reverse, that is, from the consumer to the producer.

In my opinion a much cleaner API:

new Observable((next, error, complete, observable) => {
  // producer pushes data
  // ...

  // the producer listens to external observable
  // which the observation takes place forming
  // a complete interaction
  observable.subscribe(data => {
    // listen for data cancellation?
    // listen for data pulling?
    // or any other emergent properties
  })
});

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024

Sorry to slide in, but I don't think cancellation is fundamental to Observable. By itself, it just propagates data and nothing else.

Observable without clear cleanup and error handling semantics is greatly diminished IMO and is effectively just a function passed around (literally, on an object). You need to specify how to do stuff like "close the websocket" in order for observables to enable most use cases they are used for today.

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024

@benjamingr

Cancellation is still there is just another Observable that is passed around.

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024

@benjamingr

Observable without clear cleanup and error handling semantics is greatly diminished

With the idea that I suggest, what does it make to have not a clear cancellation? It doesn't have explicit cleanup but that doesn't mean it is not clear for the semantics to greatly diminish.

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024

Like, to be honest these discussions are interesting but they happened before (in this repo and elsewhere, the idea of cancellation through an observable was talked about like ±3 times here I think?) and they are not the blocker.

In any case Chrome will block any cancellation scheme that isn't based on the web cancellation primitive (AbortSignal) and other parties will likely block any effort to do disposing of subscriptions that does not work with await using.

So we can discuss (and it's interesting!) why cancellation semantics are important and the issues with using an observable to signal cancellation and what happened the last attempt but I don't want to make any suggestion that it will help progress observables in JS in any way.

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024

Observable doesn't need cancellation semantics. What it needs is a structure for interaction so observation can happen. And in that structure, cancellation can happen but it is not fundamentally part of it.

In observation, interaction is the only inevitable.

Cancellation is application-dependent, observable and observation can happen even without it. It's just we have been accustomed that cancellation is a fundamental part of it. For this reason, we need to ship a cancellation semantics to support most implementation in the wild.

So the idea that I suggest is blocked because of the definition that cancellation in Observable is fundamental, which is not, and it has been widely used. But if we think fundamentally, cancellation has never been in the picture of Observable.

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024

But if we think fundamentally, cancellation has never been in the picture of Observable.

Also I don't think that's entirely true. The inventor of Observables (as we know them) presented them with resource management semantics and error handling from day 1 : http://csl.stanford.edu/~christos/pldi2010.fit/meijer.duality.pdf https://dl.acm.org/doi/pdf/10.1145/2168796.2169076

from es-observable.

benjamingr avatar benjamingr commented on June 26, 2024

To me, it is not because it is simply doesn't exist to all kinds of observable.

You are right and in particular it doesn't exist for the type above (though it is very easy to add by returning a function from it). I warmly recommend that if you are interested in the theoretical bits and types to read on you check my link above in this comment #201 (comment)

from es-observable.

Jopie64 avatar Jopie64 commented on June 26, 2024

I think cancellation should at least be standardized.
If cancellation is not standardized, and up to the user,
how would you implement an operator like switchMap?

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024

@benjamingr, sadly the link on #201 (comment) is redirecting to something else but I would love to read that.

from es-observable.

cedmandocdoc avatar cedmandocdoc commented on June 26, 2024

@Jopie64

On this comment I've explained that cancellation is still there. It is just another observable that is passed around. So you can implement cancellation or even standardized cancellation if needed, but it should not be directly part of the Observable. Observable and observation just manages data flow and nothing else.

How we will implement or standardize cancellation? The answer is, cancellation is observable. But this is not easy to implement primitively in JS now as have mentioned that anything that does not use the current primitive for cancellation is being blocked.

If you are interested in the implementation I've created a library, it is very simple.

from es-observable.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.