Comments (64)
RxJava 2 Observer interface
Changed for JavaScript
interface Observer {
subscribed((subscription: Subscription) => void): void
next((value: any) => void): void
error((err: any) => void): void
complete(() => void): void
}
This solves a few things I've run into while working on RxJS.
- Actually being able to tell when a subscription happens, even though
subscribe
is synchronous, there are use cases for observable likesubscribeOn
, where subscription is delayed. - Giving access to the subscription in the observer means it's trivial for the developer-user to do
this.subscription = subscription
in thesubscribed
handler. This makes it available in all other handlers. (With no closure); - Later on, this gives us the ability to more easily support backpressure (via a request method tacked onto the subscription), which I know isn't a concern for this spec, but will be a concern for RxJS Next, which will be used in Node servers.
(@trxcllnt and @jhusain: discussion of this as it relates to RxJS Next and the backpressure story can be left on the RxJS repo)
from es-observable.
I like those names because they make it clear there's no real relation to generators. I'm less sure about the return types, about complete accepting a value, and about next taking the subscription argument. But I don't have any concrete objections and am willing to be convinced.
from es-observable.
Let's suppose that we go back to accepting/passing observers instead of multiple callbacks. What should the interface for Observer be?
To clarify, this means we still have .forEach
(or if we rename it to something else - that) for a single next
callback callback API that returns promises. Right?
Proposal:
interface Observer {
next(value : any, subscription : Subscription) : any;
error(value: any) : any;
complete() : any;
}
Basically, I agree with @domenic . I think the names are good, and the fact we can keep next
is also nice. Providing a reference in sounds also reasonable.
from es-observable.
To clarify, this means we still have .forEach (or if we rename it to something else - that) for a single next callback callback API that returns promises. Right?
Yes.
from es-observable.
from es-observable.
The second proposal is almost identical to what I've done in ReactiveX/RxJS over the weekend. So no complaints here
from es-observable.
@Blesh what about the return value from complete
though?
from es-observable.
I'm unsure about the subscription passing on next
though. What is the value there?
(I missed it because my phone cut it off)
from es-observable.
@Blesh this is as a convenience to abort a subscription from inside the observer, it's just useful, we can think of it as an optional parameter, like Array#map
takes the index and the array itself as second and third arguments but in the vast majority of cases these aren't used. Also:
@Blesh what about the return value from complete though?
:)
from es-observable.
@Blesh We've identified a need for the "next" method to synchronously short-circuit the subscription. @jhusain suggested returning true
would accomplish that, but I would prefer it if the return value of next
were left open as a general communication channel from observer to observable (for example to pass promises). Passing the subscription into next
would allow the observer to call unsubscribe
on it.
There are a couple of different ways to address the problem. I'm just throwing this possibility out there.
from es-observable.
Only passing subscription
to next
is problematic. There are situations where you'd want to synchronously unsubscribe during error
or complete
as well. Those situations being where you didn't want to continue down either a "happy" or "sad" path under some condition, but rather abort.
from es-observable.
/cc @benjchristensen ... as an RxJava implementor that might be helpful to point out other reasons why they went with this design.
from es-observable.
@Blesh Right, that would be the other way to go about it.
I can't really think of an objection to a "subscribed" callback, other than vague general reluctance to add another method to the core/minimal proposal. : )
Although, I still think that forEach
wants to get the subscription as an argument, and it would be somewhat nice if the function type for next
and the forEach
argument were the same.
from es-observable.
Only passing subscription to next is problematic. There are situations where you'd want to synchronously unsubscribe during error or complete as well.
Originally I was going to add it to all of them, but it didn't seem useful for "error" and "complete". For those, the observer will never get called again anyway, and the observable's cleanup will automatically run, so what's the point of unsubscribing?
from es-observable.
the observer will never get called again anyway
That observer, yes... but if you wanted to prevent subsequent observers in a chain from firing complete
or error
, you'll need to have access to the subscription.
from es-observable.
I still think that forEach wants to get the subscription as an argument, ...
I agree with this.
...and it would be somewhat nice if the function type for next and the forEach argument were the same.
I don't mind this, but it seems unnecessary if you have a subscribed
event.
from es-observable.
We have chosen to go that route after long debate about alternative designs in the Reactive Streams collaboration: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#specification.
This approach allows us to do the following things:
- support efficient unsubscription for both asynchronous and synchronous Observables
- synchronous works in the "cancellation token" approach, checking "isUnsubscribed"
- asynchronous works by callbacks registered with the Subscription
- support composition through
lift
so thatonSubscribed
can either pass-thru, or decorate or replace theSubscription
as needed at each level of composition - support optional flow control (so-called "reactive-pull" backpressure) via a
request(n: number)
method on theSubscription
without adding object allocations (as aPromise next(t)
approach does) - it treats the subscription asynchronously, so it works for both synchronous and asynchronous subscription models
from es-observable.
That observer, yes... but if you wanted to prevent subsequent observers in a chain from firing complete or error, you'll need to have access to the subscription.
Not sure I follow - maybe a short code example will help?
from es-observable.
Here is the conceptual Javascript definition I would advocate for:
// use term Subscription instead of Disposable so it represents the subscription lifecycle, and can be used for `request` semantics if that is added to RxJS
interface Subscription {
isUnsubscribed: boolean; // property or method
cancel(): void; // cancel/unsubscribe/whatever
request(n: number); // optional support for micro-leasing to allow async flow-control, backpressure
}
interface Observer<T> {
subscribed(subscription: Subscription); // onSubscribe - always called when Observable is subscribed to, unless onError is called instead
next<T>(value: T): void; // onNext
throw<E>(error: E): void; // onError
return(): void; // onComplete
}
from es-observable.
support composition through lift so that onSubscribed can either pass-thru, or decorate or replace the Subscription as needed at each level of composition
Oooo... that's nice, actually. I didn't consider that.
from es-observable.
Not sure I follow - maybe a short code example will help?
Here's a contrived example. This is obviously accomplishable with a closure, but arguably so is using subscription from within the next handler.
myObservable.map(x => throw 'sad')::disposeIfError(err => err === 'sad').subscribe(doStuff, neverWillBeHit, doCompletion);
function disposeIfError(predicate) {
return new Observable(observer => {
return this.subscribe({
next(x) { observer.next(x); },
error(err, subscription) {
if(predicate(err)) {
subscription.dispose();
} else {
observer.error(err);
}
},
complete() {
observer.complete();
}
});
});
}
from es-observable.
Cool - thanks! So when we hit the line:
subscription.dispose();
The subscription is already in a completed state, and the observable's cleanup function will run. If we changed that line to an empty statement,
if(predicate(err)) {
;
} else {
observer.error(err);
}
then after returning from the error
method, the observable's cleanup function will be run (in a finally block, essentially).
So it seems to work out either way; the unsubscription walks all the way down the chain. No?
from es-observable.
return this.subscribe({
@Blesh it may be good to start getting examples that use lift
instead, as it is far more efficient than subscribing at each composition step
from es-observable.
@zenparsing ah, you're right. My observation of this was based off of comments @benjchristensen made while I was chatting with him this morning. @benjchristensen did I miss the mark for the use case for subscription
in the errorHandler and completionHandler? It seems I did.
@Blesh it may be good to start getting examples that use lift instead, as it is far more efficient than subscribing at each composition step
Fair... but it's not part of this spec. (Maybe it should be :P haha)
from es-observable.
but it's not part of this spec.
Ahh, sorry! Forgot that context for this thread. Was mixing our conversations.
from es-observable.
disposeIfError
should not be necessary, at least with Rx semantics, since dispose
should always contractually be invoked after an onError
is emitted.
Perhaps more interesting is onErrorResume
/catch
to do error handling. In that case the Subscription
upstream needs to be different than downstream so it can dispose
upstream onError
but continue emitting onNext
downstream.
I can help with sample code if that is of interest.
from es-observable.
@Blesh isn't that just: "source.catch(Observable.empty()).subscribe(next, neverWillBeHit)"?
On Jul 13, 2015, at 12:17, Ben Lesh [email protected] wrote:
Not sure I follow - maybe a short code example will help?
Here's a contrived example. This is obviously accomplishable with a closure, but arguably so is using subscription from within the next handler.
myObservable.map(x => throw 'sad')::disposeIfError(err => err === 'sad').subscribe(doStuff, neverWillBeHit, doCompletion);
function disposeIfError(predicate) {
return new Observable(observer => {
return this.subscribe({
next(x) { observer.next(x); },
error(err, subscription) {
if(predicate(err)) {
subscription.dispose();
} else {
observer.error(err);
}
},
complete() {
observer.complete();
}
});
});
}
—
Reply to this email directly or view it on GitHub.
from es-observable.
@Blesh isn't that just: "source.catch(Observable.empty()).subscribe(next, neverWillBeHit)"?
Yes, yes, it was a dumb example. haha. You and @benjchristensen pointed that out. :P
from es-observable.
IIRC we have agreed backpressure is deferred to a future proposal since backpressure can be built over an observer interface (I think @jhusain said that).
@benjchristensen - why in your interface did you use the names throw
and return
? Just curious.
from es-observable.
why in your interface did you use the names throw and return? Just curious.
I was just aligning with the names @jhusain has been using. Otherwise I prefer onError
and onComplete
.
from es-observable.
Very big fan of the proposal to pass the subscription to the observer. In my opinion, RxJava got this right.
The main benefit of this approach is the fusion between the observer and subscription, which allows the observer to asynchronously unsubscribe. Back pressure is being deferred.
Think we should stick with next, throw, completed.
from es-observable.
@jhusain What about the onSubscribe event?
from es-observable.
No strong opinion on this. Does feel like "on" is appropriate, but that
could be applied to all of them. "subscription"?
On Tue, Jul 14, 2015 at 10:18 PM, Ben Lesh [email protected] wrote:
@jhusain https://github.com/jhusain What about the onSubscribe event?
—
Reply to this email directly or view it on GitHub
#43 (comment)
.
from es-observable.
I'm not keen on on
, myself. But whatever works. subscribe
is fine
from es-observable.
In similar systems I've found onSubscribed analogues to be used for hacky purposes that undermine the more functional nature. Is that a concern here, or is my experience elsewhere misleading me?
next, throw, completed are not consistent in tense. You could do
- next, thrown, completed
- next, errored, completed
- next, throw, complete
I personally think using "throw" is a misnomer when there is only an analogy to thrown exceptions, and no actual throwing is going on. That is why promises use "rejected". Which brings up another possibility... not sure how I feel about it...
- next, fulfilled, rejected
from es-observable.
+1 on not naming it throw
- that confused me initially. The 'it's just method calls proxied' argument for the generator result interface is very cool from a theoretical PoV but I also think users will find it confusing.
from es-observable.
This is JS, so we don't need a method to set a property. We can just set the "subscription" property of the observer.
interface Observer {
subscription : Subscription?;
next(value : any) : any;
error(value: any) : any;
complete(value: any) : any;
}
from es-observable.
Why is the additional property better than the extra argument?
Wouldn't I have to make a closure each time I want to access it?
from es-observable.
@zenparsing I would still like to see a subscribe
event, honestly. In the event of a delayed subscription (like a subscribeOn
), it's nice to have a hook to know when a subscription has actually started.
interface Observer {
subscription:Subscription;
subscribe(subscription: Subscription): void;
next(value: any): any;
error(value: any): any;
complete(value: any): any;
}
Wouldn't I have to make a closure each time I want to access it?
@benjamingr I'm not sure I follow. It's a property on the same instance, why would you have to create a closure.
{
complete(value) {
console.log(this.subscription);
}
}
from es-observable.
@Blesh If you want that, you can just use a getter/setter:
observable.subscribe({
_sub: null,
get subscription() { return this._sub },
set subscription(value) { this._sub = value; /* More logic here */ },
next(x) {},
error(x) {},
complete(x) {},
})
from es-observable.
In similar systems I've found onSubscribed analogues to be used for hacky purposes that undermine the more functional nature
@domenic do you have examples? The only things I've seen this used for are 1. backpressure setup in RxJava, and 2. notifying when a delayed subscription has started. (Rather than forcing a synchronous initial value to be nexted out and doing some take/skip or filter magic)
from es-observable.
Yeah, I'm particularly thinking about the abuses of .on('pipe', ..)
and .on('unpipe', ...)
in Node.js, see e.g. the request
library. Even your 2 seems pretty side-effecty.
from es-observable.
@domenic Yeah, it could be used for side-effects, I suppose. It certainly isn't something compositional in terms of creating operators that would be like "oh, the previous observer has subscribed, I better tell someone". That wouldn't make sense.
The RxJava v2 use case is different because it's more of a "I'm ready, tell me how many you want" because of built-in backpressure. RxJS won't have backpressure support of this type, but the API won't change for an extended Rx library that does support backpressure, because both observers will have a subscribe
event handler.
I'd write it in as a "nice to have" because it has some usefulness, but it's not a deal breaker.
from es-observable.
After experimenting with setting a "subscription" property, I found that it introduced "statefullness" issues, in particular between the target observer and the normalized observer. For example, what happens when you pass a normalized observer to subscribe?
I really like the simplicity of just having "next/error/complete".
from es-observable.
interface Observer {
next(value : any, subscription : Subscription) : any;
error(value: any) : any;
complete(value: any) : any;
}
I would prefer it if the return value of next were left open as a general communication channel from observer to observable (for example to pass promises).
An any
return from the Observer breaks the Observer vs Observable semantics IMHO. An Observer should be just a sink/consumer, but with a "general communication channel from observer to observable", this means the observer is also a producer, in other words, it's "observable". Which means also an Observable should be a consumer.
Can you provide practical use cases for these, and also for the value
parameter in complete(value: any)
? I don't understand "to match iterators" as a valid argument. As a profusive RxJS user I have never needed those features.
I really like the simplicity of just having "next/error/complete".
👍
from es-observable.
Currently, we're experimenting with combining subscription and the"normalized" observer in RxJS. Thus far it looks promising
from es-observable.
Proposal has been updated with "next/error/complete"
from es-observable.
Couldn't find an issue for this already, but I think it's relevant. What's the main reasoning for defining an Observer object rather than just passing the three functions as callbacks to subscribe
like Promises have with then
?
from es-observable.
Rx overloads subscribe
so that it can take an object or the three functions, but I believe that TC39 would prefer to see different methods instead. I'm trying to keep the proposal simple, so I've elected to leave out any additional 3-callback method for now.
The ability for subscribe
to take an object is important for ergonomics in some situations, for example when you want to pass an observer that you've received somehow on to another observable.
If you want to use callbacks, I think the best approach is to use forEach
and use the returned promise object for error handling and completion chaining.
from es-observable.
Ah, I hadn't thought about passing the object around. I was more thinking about the callbacks because like Promises one could potentially just omit the callbacks which aren't relevant to their code.
However your explanation of ergonomics makes sense as to why it is the way it is.
Thanks for clarifying.
from es-observable.
subscribe(duck)
vs subcribe(function, function, function)
QUACK QUACK
from es-observable.
@Blesh I was thinking it'd be something like subscribe(next, complete, error)
which would allow for something like
observable.subscribe(doStuff);
// or
observable.subscribe(doStuff, undefined, displayError);
The reasoning is that it'd be familiar to people working with Promises.
However, I suppose that one key difference that might actually make this more confusing is that the return of subscribe isn't another observable like it is with Promises.
from es-observable.
FWIW, I almost never pass an observer object to subscribe
when developing with Observables as a user, @zenparsing. It's relatively inconvenient to do so.
Now, internal to the RxJS library, it's observers all the way down, but that's for other reasons.
from es-observable.
@Blesh I agree, for user code providing an object is less ergonomic than providing 1 or 2 callbacks. Three callbacks, eh...
I'm thinking that user code will prefer forEach
for easy, ergonomic consumption, since the returned promise fits right in with other promise-y APIs, and it makes chaining on the result (error or otherwise) natural.
from es-observable.
@RangerMauve: However, I suppose that one key difference that might actually make this more confusing is that the return of subscribe isn't another observable like it is with Promises.
One (cough @puffnfresh cough) might argue this is a deficiency in the Promise API.
from es-observable.
@trxcllnt Really? I thought that Futures, which is what Promises are more or less equivalent to, were supposed to be Monads which are all about chaining.
from es-observable.
I'm thinking that user code will prefer forEach for easy, ergonomic consumption, since the returned promise fits right in with other promise-y APIs,
In my experience, once people get their hands on Observable combinator libraries they generally end up not wanting to return promises for anything. Perhaps await
will change that, but I have my doubts.
from es-observable.
Also, as far as I understand, the various events in Observable are synchronous so using forEach(fn).then()
would mean that you would be forced to go async (not that I think that's an issue at all)
from es-observable.
using forEach(fn).then() would mean that you would be forced to go async
@RangerMauve is correct here. That might issue if a user cares about performance and is composing around that for some reason. It could add extra scheduling for no reason.
from es-observable.
@RangerMauve I was being facetious, but my statement is true: since Promise's then
doesn't adhere to the signature of monadic >>=
(a.k.a. bind
, a.k.a. flatMap
), Promises (as specified) aren't monads. More on that later.
the return of subscribe isn't another observable like it is with Promises
There's two points I'd like to stress here and hopefully clear up some confusion.
subscribe
doesn't enable monadic composition, it invokes the behavior of the Observable. Observables are conceptually similar to Functions. In fact, they make a lot of sense when you think of them as functions that return zero or more values between now and infinity until the routine encounters an error or terminates. Both Observables and Functions are lazy. You can create and pass them around, but they don't do any work until you ask them to (by invokingsubscribe
orcall/apply
respectively). And invokingsubscribe
orcall/apply
always runs the Observable or Function's computation again.flatMap
is the operator that enables monadic composition (monad tutorials call this functionbind
). The signature offlatMap
is:flatMap(source:Obs<T>, (value: T) => Obs<R>) : Obs<R>
. In other words,flatMap
takes:- a source Observable (Rx uses
this
as the Observable instead of forcing people to pass it in as an argument) - a function that accepts each value from the source and returns a new Observable
- a source Observable (Rx uses
Promises seem to have conflated the two behaviors (invocation + composition), or maybe it's been left up to the library author in mosts cases, I can't really keep up. Either way, the signature of the onFulfilled
function doesn't match the signature of the flatMap
selector. The flatMap
selector must return an Observable, but the onFulfilled
function can return nothing, a value, or another Promise. So since there's no Promise function that matches the signature of flatMap
, Promises aren't monads.
from es-observable.
@trxcllnt Ah, yes I totally agree with what you're saying. I did find it kinda of odd that Promises weren't using the function signatures that were defined for the type of thing they were trying to represent, however I'd assume that just a result of the way the community has evolved to create Promises as we know now.
Also, Promises aren't lazy since they get invoked right away rather than waiting for something to call .then()
.
from es-observable.
I did find it kinda of odd that Promises weren't using the function signatures that were defined for the type of thing they were trying to represent, however I'd assume that just a result of the way the community has evolved to create Promises as we know now.
I know there was a ton of debate around this behavior (which is why I cc'd @puffnfresh) but it seems the community decided against monadic Promises (and whether that decision was made out of ignorance of the concept is up for debate).
Also, Promises aren't lazy since they get invoked right away rather than waiting for something to call
.then()
.
Yeah, I don't really know what's up with that.
from es-observable.
@trxcllnt I'd assume that the decision to not make them lazy was related to the fact that you'd have situations where you'd expect xhr.post("/example") to execute even if you don't care to attach anything else to the chain. (Obviously a done()
method would have helped that situation)
from es-observable.
FYI, I leave my implementation of observable-like interface using generators.
it('iterator', async () => {
let cnt = 0;
const co = new Coroutine<number, number>(async function* () {
assert(++cnt === 1);
assert(undefined === (yield Promise.resolve(2)));
assert(undefined === (yield Promise.resolve(3)));
return Promise.resolve(4);
});
assert(cnt === 1);
for await (const n of co) {
assert(n === ++cnt);
}
for await (const _ of co) {
assert(false);
}
assert(await co === ++cnt);
assert(++cnt === 5);
});
it('basic', async () => {
const co = new Colistener<Event>(listener => {
document.addEventListener('click', listener);
return () => void document.removeEventListener('click', listener);
});
setTimeout(() => document.body.click(), 100);
setTimeout(() => document.body.click(), 200);
let cnt = 0;
for await (const ev of co) {
assert(ev instanceof Event);
assert(ev.type === 'click');
++cnt === 2 && co.close();
}
assert(await co === undefined);
});
it('basic', async () => {
const co = cofetch('');
const types = new Set<string>();
for await (const ev of co) {
assert(ev instanceof ProgressEvent);
assert(['loadstart', 'progress', 'loadend'].includes(ev.type));
types.add(ev.type);
if (ev.type !== 'loadend') continue;
for await (const _ of co) throw 1;
}
assert.deepStrictEqual([...types], ['loadstart', 'progress', 'loadend']);
assert(await co instanceof XMLHttpRequest);
});
https://github.com/falsandtru/spica/blob/master/src/coroutine.test.ts
from es-observable.
Related Issues (20)
- Invalid test based on Interface. HOT 1
- `obs.subscribe(next, error, complete)` should bind their callbacks to `undefined` when present HOT 5
- Why does `Observable.prototype.subscribe` report thrown errors from `observer.start(sub)` asynchronously instead of just propagating them?
- Minor spec bug WRT cleanup in `subscribe`
- `Observable.from` iteration functions incorrectly assume their observer parameter is native HOT 1
- [ALTERNATIVE] Proposal for an alternative
- Cleanup function should be passed to the SubscriptionObserver
- Simplification of Observable API HOT 69
- End a subscription if a completion token is returned HOT 1
- Even simpler API HOT 4
- Reduced API with async/await support HOT 23
- Observable should be async HOT 5
- Syntax Support HOT 4
- Alternative: Pub/Sub
- Moving to an API with AbortSignal HOT 9
- Retain core API and leave operators to user-land libraries HOT 15
- Permit unsubscribe to return a promise HOT 3
- Is there any update? HOT 34
- Support [Symbol.dispose]() for unsubscribe() HOT 1
- Unsubscribe
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from es-observable.