Giter VIP home page Giter VIP logo

es-observable's Introduction

ECMAScript Observable

This proposal introduces an Observable type to the ECMAScript standard library. The Observable type can be used to model push-based data sources such as DOM events, timer intervals, and sockets. In addition, observables are:

  • Compositional: Observables can be composed with higher-order combinators.
  • Lazy: Observables do not start emitting data until an observer has subscribed.

Example: Observing Keyboard Events

Using the Observable constructor, we can create a function which returns an observable stream of events for an arbitrary DOM element and event type.

function listen(element, eventName) {
    return new Observable(observer => {
        // Create an event handler which sends data to the sink
        let handler = event => observer.next(event);

        // Attach the event handler
        element.addEventListener(eventName, handler, true);

        // Return a cleanup function which will cancel the event stream
        return () => {
            // Detach the event handler from the element
            element.removeEventListener(eventName, handler, true);
        };
    });
}

We can then use standard combinators to filter and map the events in the stream, just like we would with an array.

// Return an observable of special key down commands
function commandKeys(element) {
    let keyCommands = { "38": "up", "40": "down" };

    return listen(element, "keydown")
        .filter(event => event.keyCode in keyCommands)
        .map(event => keyCommands[event.keyCode])
}

Note: The "filter" and "map" methods are not included in this proposal. They may be added in a future version of this specification.

When we want to consume the event stream, we subscribe with an observer.

let subscription = commandKeys(inputElement).subscribe({
    next(val) { console.log("Received key command: " + val) },
    error(err) { console.log("Received an error: " + err) },
    complete() { console.log("Stream complete") },
});

The object returned by subscribe will allow us to cancel the subscription at any time. Upon cancelation, the Observable's cleanup function will be executed.

// After calling this function, no more events will be sent
subscription.unsubscribe();

Motivation

The Observable type represents one of the fundamental protocols for processing asynchronous streams of data. It is particularly effective at modeling streams of data which originate from the environment and are pushed into the application, such as user interface events. By offering Observable as a component of the ECMAScript standard library, we allow platforms and applications to share a common push-based stream protocol.

Implementations

Running Tests

To run the unit tests, install the es-observable-tests package into your project.

npm install es-observable-tests

Then call the exported runTests function with the constructor you want to test.

require("es-observable-tests").runTests(MyObservable);

API

Observable

An Observable represents a sequence of values which may be observed.

interface Observable {

    constructor(subscriber : SubscriberFunction);

    // Subscribes to the sequence with an observer
    subscribe(observer : Observer) : Subscription;

    // Subscribes to the sequence with callbacks
    subscribe(onNext : Function,
              onError? : Function,
              onComplete? : Function) : Subscription;

    // Returns itself
    [Symbol.observable]() : Observable;

    // Converts items to an Observable
    static of(...items) : Observable;

    // Converts an observable or iterable to an Observable
    static from(observable) : Observable;

}

interface Subscription {

    // Cancels the subscription
    unsubscribe() : void;

    // A boolean value indicating whether the subscription is closed
    get closed() : Boolean;
}

function SubscriberFunction(observer: SubscriptionObserver) : (void => void)|Subscription;

Observable.of

Observable.of creates an Observable of the values provided as arguments. The values are delivered synchronously when subscribe is called.

Observable.of("red", "green", "blue").subscribe({
    next(color) {
        console.log(color);
    }
});

/*
 > "red"
 > "green"
 > "blue"
*/

Observable.from

Observable.from converts its argument to an Observable.

  • If the argument has a Symbol.observable method, then it returns the result of invoking that method. If the resulting object is not an instance of Observable, then it is wrapped in an Observable which will delegate subscription.
  • Otherwise, the argument is assumed to be an iterable and the iteration values are delivered synchronously when subscribe is called.

Converting from an object which supports Symbol.observable to an Observable:

Observable.from({
    [Symbol.observable]() {
        return new Observable(observer => {
            setTimeout(() => {
                observer.next("hello");
                observer.next("world");
                observer.complete();
            }, 2000);
        });
    }
}).subscribe({
    next(value) {
        console.log(value);
    }
});

/*
 > "hello"
 > "world"
*/

let observable = new Observable(observer => {});
Observable.from(observable) === observable; // true

Converting from an iterable to an Observable:

Observable.from(["mercury", "venus", "earth"]).subscribe({
    next(value) {
        console.log(value);
    }
});

/*
 > "mercury"
 > "venus"
 > "earth"
*/

Observer

An Observer is used to receive data from an Observable, and is supplied as an argument to subscribe.

All methods are optional.

interface Observer {

    // Receives the subscription object when `subscribe` is called
    start(subscription : Subscription);

    // Receives the next value in the sequence
    next(value);

    // Receives the sequence error
    error(errorValue);

    // Receives a completion notification
    complete();
}

SubscriptionObserver

A SubscriptionObserver is a normalized Observer which wraps the observer object supplied to subscribe.

interface SubscriptionObserver {

    // Sends the next value in the sequence
    next(value);

    // Sends the sequence error
    error(errorValue);

    // Sends the completion notification
    complete();

    // A boolean value indicating whether the subscription is closed
    get closed() : Boolean;
}

es-observable's People

Contributors

aicest avatar appsforartists avatar bterlson avatar dimovelev avatar disnet avatar haroenv avatar jhusain avatar ljharb avatar lrowe avatar markbennett avatar shanewholloway avatar srijs avatar stefanpenner avatar tetsuharuohzeki avatar valscion avatar zenparsing avatar zerkms avatar zloirock avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

es-observable's Issues

Use cases for synchronous subscription.

It would be great if we had a list of use cases for synchronous subscription in order to better understand if/when [Symbol.observer] is needed.

Please include:

  • Description - Why is this use case important and who are the users.
  • Code - a minimal example demonstrating the use case and showcasing it. preferably codepen/jsbin/jsfiddle but a gist is also fine, demonstrating the issue.
  • Why synchronous subscription? - why would asynchronous subscription not work here, what benefit does synchronous subscription bring to the table?

Pinging people who want synchronous inspection or have expressed interest in it before: @jhusain @Blesh @tgriesser

Bonus points for weakness of async subscription compared to existing DOM or Node/io APIs. If you use "performance" as a reason make sure you understand what an async trampoline means (batching asynchronous function calls), how async tagging works (only defer actions that were not already async and only once per chain) and so on.

Generators are not observers

We've been struggling for quite a while now with the issue described and discussed in #14. The original question was: should canceling the subscription result in calling return on the observer?

RxJS does not invoke onComplete on cancelation. From the Rx point of view, "canceling the subscription" means that the consumer is telling the producer that it doesn't want any more notifications, so it wouldn't make sense to send it a completion signal.

The desire to call observer.return on cancelation was motivated by the fact that observers can be implemented using generator functions. A generator is state-full and may require explicit cleanup of resources upon termination. When the subscription is canceled, we need to signal the generator that it should execute its finally blocks and perform cleanup. Therefore, we need to invoke return on the generator.

There have been a couple of different proposals for solving this dilemma:

  1. Add a .NET-like dispose protocol to JavaScript and add a dispose method to generators. The dispose method would be executed on cancelation. For generators, it would simply call this.return(undefined).
  2. Add additional methods to the subscription object which would allow the subscription to be terminated with arbitrary abrupt completions. The holder of the subscription would choose the appropriate cancelation behavior.

Both solutions require adding complexity to the system. They only differ in where that complexity is located. Where is this complexity coming from?

The complexity arises from the fact that we are attempting to express Rx observers with generators, and there is an impedance mismatch between them. The complexity creep arises when we attempt to resolve this mismatch.

Some of the differences include:

Generator Rx Observer
Methods return IteratorResult Callbacks return void
Returning {done: true} signals cancelation Calling subscription.dispose signals cancelation
Requires cleanup with return Does not require explicit cleanup

I believe that pursuing the current strategy of conflating Rx observers with generators is going to result in a confusing mess. I think we need to choose between them. Either we take the observer-callback approach:

let observable = new Observable((onNext, onError, onComplete) => {
    // ...
});

observable.subscribe(
    val => console.log("item: " + val),
    err => console.log("error: " + error),
    ret => console.log("complete: " + ret)
);

Or we take the generator approach:

let observable = new Observable(sink => {
    // ...
});

observable.subscribe(function*() {
    try {
        // Yield loop here
    } finally {
        // Cleanup
    }
});

// Or with a manually written generator:
observable.subscribe({
    next(val) { },
    throw(err) { },
    return(val) { },
});

If we can choose one of these models, I think the rest of the design will fall into place.

Should we use an anonymous function for CancelSubscription?

An alternative to the current [[Cancel]] internal slot usage would be to introduce a new built-in anonymous function (CancelSubscription functions), which would have the following internal slots:

  • [[AlreadyCancelled]]
  • [[CancelFunction]]

It would first test [[AlreadyCancelled]] and then call [[CancelFunction]]. If there was no [[CancelFunction]] defined, then it would somehow need perform the default action (subscriptionObserver.return).

With the current setup we'll need to create a new bound function or anonymous function when no cancel function is provided (like the polyfill does).

Should unsubscribe invoke return on the observer?

In Rx, observables created with Observable.create have the following unsubscribe (dispose) semantics:

When dispose is called:

  • Close the outer observer so that no more notifications will be sent to the inner observer.
  • Call the cleanup function and drop the reference to it so that it won't be called again.
  • The default cleanup function is a no-op.

The current spec allows the user to call return from the cleanup function in order to shut down the observer's generator. If the user does not provide a cleanup function, then the default behavior will be to invoke "return" on the generator.

The rationale is that, when using a generator function's generator as the observer, we want to give the generator function a chance to clean up resources before we drop the reference to it.

For example:

function* generatorFunction() {
    let resource = getResource();
    try {
        while (true) resource.use(yield);
    } finally { resource.release() }
}

let generator = generatorFunction();
generator.next(); // Prime the generator

let subscription = sequenceOfLines.subscribe(generator);

// Unsubscribe after subscription has started
Promise.resolve().then(_=> subscription.unsubscribe());

// We would expect resources held by the generator to be released

@erights seemed to agree with this intuition.

The other point of view (the "Rx" view) is that calling "unsubscribe" is a way of notifying the producer that no more notifications should be sent (even "return"). Under this interpretation, it wouldn't make much sense to invoke "return", or anything else, after the consumer has explicitly given instruction not to.

What arguments can we come up with to resolve this apparent dilemma?

Discussion: Subscribe returns a cancellation function

In the current version, subscribe returns a cancellation function. Previously, subscribe returned a "subscription" object with an "unsubscribe" method.

Rationale:

  1. Since working on this project, I've typed in "subscription.unsubscribe" many, many times. It's ugly and a real pain.
  2. We currently haven't identified any essential capabilities that need to be returned from "subscribe" other than the cancellation function. It's nice to avoid useless wrapper objects.
  3. Any additional capabilities returned from "subscribe" are probably going to be POLA-problematic.
  4. The return types of "subscribe" and the subscriber function align nicely.

Regarding 4:

We frequently want to "chain" observables like this:

function x(observable) {
    return new Observable(observer => observable.subscribe({
        // ... whatever
    });
}

If "subscribe" returns something other than a () => void function, then we have to bake-in some messy overloading in order to make that chaining nice and pleasant. RxJS makes use of overloading in several places (also "subscribe") which I would like to avoid.

Thoughts?

Unifying `forEach` and `subscribe` return value - Subscriptions

I propose the following changes to subscription and the API.

Both forEach and subscribe return a Subscription.

  • A subscription is a thenable (a promise, basically), this means we await subscribe, or forEach, it runs when the observable is done. The returned promise resolves with either the completion or the error of the sequence. The .then method is a getter and it creates the underlying promise only when it is accessed so there is no overhead for then unless we use it. It is a promise for undefined so no value needs to be cached and there is no mismatch with observable semantics
  • A subscription has a dispose method which is _unrelated to promise cancellation. If we choose in the future to also support the cancellable promises API that's nice, but we don't want to tie the two proposals.

In addition, I'm open to any form of sync/async disposal semantics you guys need. This is mainly about unifying the return value of subscribe and forEachwithout conflating the proposal with cancellable promises. I've also found myself wanting (needing) toawait` the result of subscribe in combinators (merge two observables in order for example).

Let me know what you think.

Need to clarify proposed API surface area

Got complaint that it's difficult to ascertain exactly what APIs are being proposed because of the additional combinators in the library. May make sense to remove everything but the polyfill and put combinators in a separate repo. Otherwise just clearly listing proposed API in README would do: (System.observer, subscribe, forEach, Observable.from).

Idea for sync subscription api

Creating a separate ticket for discussion from #25, as the ticket was originally about the naming of Symbol.observer but shifted to discussion of the pros/cons of providing both sync/async subscription apis.

The only question is whether subscription is always async, or if we additionally provide a back-door way to synchronously subscribe.

I agree that there is confusion in presenting multiple entry points for creating a subscription, but a fully async api seems to potentially limit any observable use cases where sync may be desired / beneficial.

What if the subscription object returned from subscribe containing two methods: unsubscribe and connect

var subscription = observable.subscribe(generator)
subscription.connect()

The connect would allow for the subscription consumer to signal to the observable it is ready to receive values (synchronously or otherwise). Calls to connect after subscription or unsubscription have taken place would have no effect.

This would maintain a unified subscribe api, give the consumer more control over both the subscription and the unsubscription behavior (defaulting to the potentially less confusing), and if connect is not called the default behavior would be as currently defined - the scheduledSubscription would take place as it is enqueue'd normally.

Observable.from semantics

The idea of Observable.from is that it takes something that is "observable" and it returns an Observable proper. If the input is actually an Observable, then it simply returns the input.

Currently, it does this by looking for a method named Symbol.observer, which directly implements the core subscription protocol. If you want to make an abstraction "observable" (and compatible with Observable.from) then you would provide a Symbol.observer function:

Promise.prototype[Symbol.observer] = function(sink) {
    this.then(value => {
        sink.next(value);
        sink.return();
    }, error => {
        sink.throw(error);
    });
    return { unsubscribe() { sink.return() } };
};

Notice how the method above isn't really directly usable: we can't just provide any old observer to it. It expects an observer which (at the very least) has "next", "throw", and "return" implemented. Notice also how we have to roll our own subscription object which may or may not uphold any invariants.

Another option would be to create a level of indirection and introduce a new Symbol.observable which would give you back an observable. It would work much like Symbol.iterator.

The example above would become:

Promise.prototype[Symbol.observable] = function() {
    return new Observable(sink => {
        this.then(value => {
            sink.next(value);
            sink.return();
        }, error => {
            sink.throw(error);
        });
    });
};

This seems like a much easier way for programmers to make their abstractions observable, rather than requiring them to implement all of the subscription semantics from scratch.

Of course, one could always choose to implement the raw interface like this:

class C {
    [Symbol.observable]() {
        return {
            [Symbol.observer](sink) { /*...*/ }
        };
    }
}

What do you think?

[Symbol.observer] is unergonomic for people that know what they're doing.

Observable is a functional programming thing. In pure functional programming, there should be no "Zalgo". So anyone that is using Observable properly will want to always use [Symbol.observer]() to subscribe to the observable. I think it's safe to say that I know exactly zero heavy users of Observable, at Netflix at least, that want to do this. (I'll go so far as to say that list probably includes @jhusain. haha) Just playing with it hurts and I find myself wanting to override the whole thing like: Observable.prototype.subscribe = Observable.prototype[Symbol.observer] just to enjoy using the type.

I understand that Zalgo is a footgun for people who are coding imperatively, so we need to build libraries around this defensively. I guess I feel hiding it under [Symbol.observer] is just a little too much.

I'd like to propose that [Symbol.observer] change to either:

  • subscribe - back to the good old days of RxJS, maybe make the "safe" version be named when.
  • forEach - because the name alone makes it sound like sync behavior, honestly.
  • observe - because it's less annoying to type over and over than [Symbol.observer].

At some point people are going to have to put on their big-boy pants about Zalgo. If you're programming functionally, it's really all or nothing, or you better know what you're doing. Observables are not promises. They're not a stateful construct for async imperative programming, they're a functional programming paradigm.

Discussion: Add back an async subscription method?

Continuation of #38

From @jhusain

Reopening because as I'm putting together a summary of changes I'm somewhat confused by conclusion. It's clear we need sync subscription for EventTarget and for certain operators like GroupBy. It seems the original proposal of both an async and sync subscription option still works.

Symbol.observer name

I'm not a fan of the "Symbol.observer" name, because nouns should only be used for function names if it names the thing being returned. In this case the "observer" is the argument and the "subscription" is the return value.

Are there any other options here? Throwing some things out:

  • Symbol.observe : Seems nice, but doesn't leave us with the option of using observe to name functions that return observables. Also, there was overlap with Object.observe, but it's unclear to me that there is still a conflict.
  • Symbol.subscribe : This would be confusing since we also have the "subscribe" method.
  • Symbol.subscribeSync : That's what it does, but bleh!
  • Symbol.startSubscription : Getting longer and longer...

Any thoughts?

Notes on future syntax proposals

Dherman mentioned it would be worthwhile to talk about possible planned syntactical support, to emphasize that we have put some thought into the space. Obviously we want to emphasize that we are not proposing this now, but it's worthwhile to point out that adding the nominal could good enable us to add syntax later on similar to async/await.

Why is `.subscribe`'s subscriber a generator and not an async function?

Wouldn't it make more sense if an async function was passed into Observable.prototype.subscribe(observer)?

Current:

function consumer() {
    let generator = function*() {
        while (true) {
            console.log("Recieved key command: " + (yield));
        }
    }();

    // "Prime" the generator so that it can receive the first value from the producer
    generator.next();
    return generator;
}

commandKeys(inputElement).subscribe(consumer());

Asking about:

async function consumer() {
        while (true) {
            console.log("Recieved key command: " + (await));
        }
}
commandKeys(inputElement).subscribe(consumer);

Interop and Symbols

In speaking with @stefanpenner a few weeks ago, he was saying that Ember might like to use Symbol.observer to test for the "Observable-iness" of a value and subscribe to it when binding templates (I think that was what we discussed).

Anyhow, it occurs to me that Symbols are useful for interoperability because they can be tested for and called cleanly.

Symbol.observer was taken away, AFAIK. What are the planned interop points in both directions? I know from and of have been mentioned in prior threads. What about to read from? Also, how do you feel about using symbols?

Problems I see with the callback style

  1. Polyfill and library implementations will have to create a closure or use Function.prototype.bind to create the next, error and completed callbacks. This is because in order to create those callbacks and have them operate safely, they'll need to share some state. So either they have to live on the same instance of an object, or they need to close over shared variables.
  2. There's no straightforward way to short-circuit an unsubscription. If something down your observable chain asynchronously unsubscribes your observable, there's no boolean to check in your subscriber function (like an observer.isUnsubscribed) to know that you need to abort whatever loop you're in

Should this spec route errors occurring in the next handler to the error handler?

I was talking with @trxcllnt today about the behavior the occurs when you throw in an next handler...

For example:

myObservable.subscribe((x) => {
  // ReferenceError:  let's say x.foo is undefined here.
  someSideEffect(x.foo.blah);
}, err => {
   reportAnError(err);
});

Currently, it will just throw, and will not go to the error handler.

This results in people using RxJS like this as a "best practice", which isn't at all a "best practice", because do and doAction will route thrown errors down the error path:

myObservable.doAction((x) => {
  someSideEffect(x.foo.blah);
})
.subscribe(noop, (err) => {
  reportAnError(err);
});

Apparently teams at Netflix even have their own forks of RxJS that have patched the the onNext handler call to route to the onError handler if it throws.

Show how this could underly EventTarget in the DOM

I think interest in this proposal would be significantly bolstered if you can show how it can serve as a foundational layer upon which EventTarget can be built. Similarly, it's important to make sure there aren't any mismatches here, between e.g. the cancellation and bubbling semantics of EventTarget and the cancellation semantics of this.

/cc @slightlyoff

Bikeshedding: Observer interface

Let's suppose that we go back to accepting/passing observers instead of multiple callbacks. What should the interface for Observer be?

In Rx, it is:

interface Observer {
    onNext(value : any) : void;
    onError(error : any) : void;
    onComplete() : void;
}

I think that the camel-cased "on*" names look a little out-of-place in JS and the DOM. Also, I would like it if the callbacks could return values as well; that would support use cases like async observers without introducing an additional type.

Also, since iterators can technically "return" values, I think we should allow "complete" to accept values as well.

Furthermore, (and more experimentally) I think it might be useful to also provide a reference to the subscription along with the value, so that the method can call unsubscribe at any time.

Proposal:

interface Observer {
    next(value : any, subscription : Subscription) : any;
    error(value: any) : any;
    complete(value: any) : any;
}

Thoughts? Any other proposals?

Add API documentation

At this point, we don't need detailed algorithm descriptions. We just need to create documentation for each of the core API pieces:

  • The Observable constructor
  • The subscribe method

That should do it for now.

Nits regarding internal slots

You use O.[[Done]] in a few places, but dot notation is only for record fields, not object internal slots. (We may move toward O@[[Done]] in the future, so I will use that for brevity, but probably for the spec draft you should stick to "the [[Done]] internal slot of O".)

When you set O@[[Done]] to true, I believe you can reset O@[[Subscription]] and O@[[Observer]] to undefined to release references to them. Technically implementations could do this without you needing to specify, but I think it's clearer to make the strong references more explicit, and when I was researching this for promises I found a few other places in the spec that also clear out references.

Async subscriber functions

For some time, I've been wanting to somehow provide an ergonomic way to use async functions as subscriber functions:

let obs = new Observable(async observer => {
    observer.next("Hello");
    await new Promise(resolve => setTimeout(resolve, 2000));
    observer.next("World");
    await new Promise(resolve => setTimeout(resolve, 2000));
    return "Sweet!";
});

obs.forEach(::console.log).then(
    x => console.log("All done: " + x),
    err => console.log("Error: " + err));

Outputs:

Hello
World
All done: Sweet!

Proposal:

If the subscriber function returns a promise, then listen for that promise's fulfillment value and call observer.error or observer.complete accordingly.

https://github.com/zenparsing/es-observable/blob/403c3084b9d28dec23646e6a37f36d1c0c74a84e/src/Observable.js#L201-L208

Thoughts?

Consider the more general "function class" pattern

A continuation of #11, with more focus, since that was cleaned up. As discussed there, observables are an instance of the "function class" pattern, specifically for functions of the signature { next, return, throw } -> { unsubscribe }.

Function class is to me a more compelling general pattern to consider introducing into the language than the specific instance of observables. For example, a function class built around Promise -> void would allow lazy transformations on single asynchronous values.

I'd like this proposal to give more consideration to how it fits with the general function class idea. For example, it seems likely that Observable should be a subclass of Function. That would also help avoid any bikeshedding over [Symbol.observe] vs. subscribe or similar.

Sanity check on return() and throw()

I have a few things I wanted to check regarding how return() and throw() match up with their generator counterparts, and affect the observable behavior in general. I might not be reading or understanding the spec fully---and that applies to both this spec and the ES2015 generator spec. So please correct me if I get these wrong.

  • %SubscriptionObserverPrototype%.return(v) returns { value: undefined, done: true } if the un-normalized sink has no return method. But { value: v, done: true } seems more correct, based on how generators behave?
  • Both generators and %SubscriptionObserverPrototype% allow multiple calls to return() or throw(), which after the first do nothing but the default action---i.e. they don't affect the state in any way. Did I get this right?
  • For both generators and %SubscriptionObserverPrototype%, calling throw() will then cause next() to always return { value: undefined, done: true }. This is surprising to me---I would have expected it to re-throw the error?
  • I am also curious how the cancellation function maps to generators. It seems like a pretty ad-hoc addition to me and I can't really make the mapping back.

Bike-shedding: `last()` should be `getLast()`

Because it already exists in RxJS and it's popularly used as an operator, having a method named last doesn't make sense.

  1. It doesn't imply that's taking any action to do anything, such as subscribe to the observable.
  2. It seems to imply it's going to give you an Observable of the last value (especially because of prior art, and because of how it's worded: last, first, map, filter, etc.)

I think getLast() is slightly more explanatory about what it's doing. the "get" part implies some action will be taking to get the last value from the Observable and return it to you. And the only way anyone would get one value asynchronously would be with a Promise, so that's implied.

... I actually like getLast(): Promise<T> better than RxJS's toPromise(): Promise<T>... which I think sounds too much like toArray(): Observable<Array<T>>.

What should the return type of subscribe be?

The value returned from "subscribe" should carry with it the capability to unsubscribe. The simplest way to represent that capability is with a function:

// Subscribe returns a function...
let cancel = observable.subscribe({
    next(value) { console.log(value) }
});

// ...which allows you to unsubscribe
cancel();

Do we need to be able to return other capabilities along with unsubscription? The only thing that I can think of is a way to test whether a subscription is currently alive or not. There may also be use-case-specific capabilities that a user might want to expose.

If so, then I suppose we should return an object.

I don't think we should return a generator for the following reasons:

  1. Having the return value be a generator confuses the contract. It's really not clear what "next" and "throw" mean in this context. Nor is the relationship between the observer and the returned generator clear.
  2. There are three actors here: the producer, the consumer, and the subscriber. The subscriber should only have the capability to request the start of a subscription and request the termination of a subscription. It should not have the capability to transmit arbitrary data to the producer.

If we must return an object, then I don't believe we should use "dispose" as Rx does. "Dispose" is a .NET architectural artifact and doesn't really translate to JavaScript. Instead, we should choose something like "cancel" or "unsubscribe".

// Subscribe returns an object...
let subscription = observable.subscribe({
    next(value) { console.log(value) }
});

// ...which allows you to unsubscribe
subscription.unsubscribe();

Remove generators as subscribers.

Per the consensus on #35 .I think the spec should be updated to reflect that agreement.

(Assuming we have synchronous unsubscription regardless of whether or not subscription is sync or async)

Problems with Observable without Observer: multicasting/Subjects

I realize Subject is not a party of this spec, but it is a part of the Observable ecosystem.

If there are no Observers, what is a Subject? How can a Subject subscribe to an Observable? I suppose it can be done with closures, but that sort of breaks Subject.

Why does forEach throw away the return value while map/filter pass it through unchanged?

I realize map/filter are provisional. But in general it's unclear to me how the return value/exception is supposed to integrate with all this. Another related question is why any thrown exception gets passed through as a promise rejection while the return value gets ignored.

Can we use iterator map/filter/forEach to inform the design here? I'm not sure how exactly, but it would be nice, because right now it feels like there's a real tension between observables-as-sequences and observables-as-generator-duals.

Is there precedent for this observables-as-generator-duals in other languages or libraries?

Design assuming cancellable promises

This page is a mental playground assuming we get cancellable promises in JavaScript. It is not suggested for the API of this proposal - it's only here to explore some semantics in that case. The content here is not a suggestion for the API.


So, assuming they eventually land. I have some things I'd like to discuss.

  • I'm not sure that .forEach is a good name for something that returns a thenable, especially if we want to allow it to run without arguments. Dart uses forEach for something similar though. They also have .listen and their subscriptions have .asFuture values.
  • Cancellation should hook to disposing the stream.
  • Generally, the mapping between observables and promises should primarily focus on async/await semantics.

Optimally, assuming we've already had cancellable promises with the proposed semantics, as @jhusain pointed out we won't want to duplicate the done handler but we do need a way to handle errors in the stream on a case by case basis.

So I think .forEach (assuming the name sticks) should allow handling errors - while the converted-to promise can signal being done or an error on the stream as a whole, with current .forEach there is no way to handle a single error in a stream and recover from it.

It should allow running it without passing a handler in - this way you can hook on it and await it easily.

The promise should be for the last element of the stream - this can be useful for single valued streams - while it's not the most correct approach theoretically it's the most pragmatic.

Cancellation translation itself should be pretty straightforward.

Strawman: New subscription interface (cancel, return, throw)

This is a strawman alternative proposal to adding a "terminate" method to observers. It may have fatal problems.

The primary issue with adding "terminate" to observers is the fact that we need to extend the ES generator interface with additional complexity which is only really beneficial for Observable.

Proposal

The subscribe method of observables returns a Subscription object. Subscription objects have the following interface:

interface Subscription {
    return(value);
    throw(error);
    cancel();
}

A Subscription object may be used to early-terminate the communication between an observable and an observer. Each method invokes the cleanup action specified by the observable. In addition:

  • subscription.return calls the return method of the observer with the supplied argument. It terminates the data stream with a completion value.
  • subscription.throw calls the throw method of the observer with the supplied argument. It terminates the data stream with an error.
  • subscription.cancel does not invoke any methods on the observer.

The advantage of this interface is that it lets the holder of the subscription object determine how the observer should be shut down, if at all. This allows a wider range of use cases to be expressed without adding additional complexity to the generator type or adding state variables to keep track of which kind of completion was intended.

Interop: Observable.resolve?

I think [Symbol.observer] is a good interop point for reading from an "Observable-esque" type, but we're lacking for interop is a method akin to Promise's resolve.

Use Case

I'm a library author and I've been given an arbitrary object I want to ensure is an Observable for my library's use. It's likely async or a collection of some type, but it could be a simple value type.

Thoughts

The basic principal is that it would be a method that examined the input for a [Symbol.observer], if it found it, it would subscribe to that and return a new Observable, next it would look for [Symbol.iterator], otherwise it would create an observable from whatever value is passed. I'm really not sure how this would work, just wanted to start discussion around it:

Different scenarios:

  1. Observable.resolve(observablesque); -> subscribe to observablesque with Symbol.observer and output Observable.
  2. Observable.resolve(valueType); -> An observable of one item
  3. Observable.resolve(iterable); -> An observable of N items from the iterable
  4. Observable.resolve(object); -> An observable of one item
  5. Observable.resolve(promise); -> An observable of one item

Pain points

  • I can see Arrays and generators mapping to Observables just fine, but Strings are iterable, and I think most of the time people will want those to be mapped into an observable of item.
  • For promises and single values, do we just make an observable that nexts out one value? Should it complete/return with that one value also?
  • The name resolve doesn't make much sense to me... is there a better name for it?

Proposal: Add a "do" method, like "forEach"

I'm happy with the decision to use an observer object instead of callbacks for "subscribe". However, this can make things a little cumbersome to type:

fromEvent(element, "click").subscribe({
    next(event) { console.log(event.x + ":" + event.y); }
});

RxJS overloads "subscribe" such that it allows a list of callbacks, in addition to an observer. I think this overload is a little questionable for the ES spec though. In ES, all "callables" are objects, so it's entirely possible for a callable object to also implement "next", "error", and "complete". I would like to avoid any heuristic overloading.

Previously, we had a "forEach" method which accepted a single callback and returned a Promise for the completion of the stream. I'd like to bring that back, with some minor adjustments.

Proposal: add a "do" method which accepts a callback, subscribes to the observable and returns a promise for the completion value of the stream.

element.on("click").do(event => {
    console.log(event.x + ":" + event.y);
});

Observable.from(something).do(x => {
    // Process each item in the stream
}).then(_=> {
    // Do something after the stream is complete
}).catch(err => {
    // Catch errors
});

The "do" method would subscribe synchronously, and would provide no built in cancelation mechanism. Instead, a combinator could be used for early completion. For example:

let cancel, cancelToken = new Promise(resolve => cancel = resolve);

element.on("click").takeUntil(cancelToken).do(event => { 
    // Process event
});

cancel();

Compliance test suite

Would be really cool if this could be published to NPM with a full compliance test suite so that framework authors and others could develop some more userland implementations besides RXJS and Bacon. I think this is important to converge on the a shared understanding of the primitive. The Promises A+ compliance suite is a good example of this.

Publish demos

We need to somehow publish the demos. Here's what I'm thinking:

  • Publish to zenparsing's github pages.
  • Use "esdown" to translate and bring in the Observable constructor.
  • Author the code in an unparsed script block.
  • Display the script in a left-hand pane.
  • Use a right-hand pane for DOM manipulation or displaying results.
  • Provide a button to translating the script and executing.

That should be good enough.

Observable.prototype be sealed

I think we want to avoid the hazard of the prototype being modified before we have a chance to add combinators. Don't want to break the web.

Don't use the generator interface

Continuing the discussion from #14 I think that while the idea of using the generator interface as a dual to iterator is novel, there is really no justification to do so.

We see that using it as the basis for the observer does not really maintain duality particularly well (as seen in #14) nor it provides the semantics required for useful observers (as seen in #32 as well as several other issues). Now that I understand the proposal a lot better:

Wouldn't it be better if we just dropped using the ES2015 generator interface for the observer?

On the same note - wouldn't it be better to use more descriptive names for observers (like the names used in Rx, I'm quite fond of those) and not stay bound to the interface allowing us to add cancellation semantics as we please easily?

I think duality is important, but leveraging the experience with the API of people like @jhusain and @Blesh who actually used observables for a great extent is just as important if not more. I'm certain we can prove duality with a better named API too.


I'm well aware that a lot of people put a lot of work into the proposal, work by people who probably have more experience and shinier PhDs than me, you have discussed and uncovered a lot of interesting issues and I don't like delaying the proposal as much as anyone else - I definitely think though that this is worth discussing.

Chat

It would be nice to have a chat channel (IRC, slack or gitter) to toss up non-formal ideas about this spec in.

Currently I talk to Erik in emails, Domenic in IRC, spion in IRC and Facebook and Jafar Ben and Kevin only in GH.

It would be nice if interested parties could discuss ideas (without opening issues first) somewhere. I don't really mind where as long as it's public.

Add Observable.from, Symbol.observer key

We want to support both synchronous and asynchronous subscription, but only synchronous notification. To guard against the dangers of synchronous subscription for casual users, we would like to introduce an easily-discoverable method ("subscribe") which automatically schedules on the micro task scheduler. However to enable those writing combinator libraries to do composition efficiently, we would like to add another method:

Symbol.observer

This method is not expected to Introduce any asynchrony. It is a new contract we are proposing to go along with the nominal type: observable. Objects can implement the observable contract, but are not expected to inject any asynchrony. The nominal static method Observable.from can be used to adapt an object that implements the observable contract to a nominal Observable type. Attempts to call the discoverable subscribe method will inject asynchrony using the micro task scheduler.

Please add an Observable.from and a Symbol.observer key. Have subscribe delegate to the Symbol.observer on the microtask scheduler (you can use a promise to get this behavior today).

chainability of custom operators

How will it be possible to chain custom operators?

var observable = listen('button', 'click')
   .pipe(myCustomOperator()) // should we have a pipe operator?
   .filter();

Is it possible to have a pipe operator instead of extending the prototype?

Lazy semantics are a mismatch with JavaScript

I understand they fit better with C#, but I am not comfortable with how they deviate from JavaScript's iterator and promise designs.

This seems easily fixable, however. Simply execute the executor immediately. To get back laziness, simply use an observable-returning function, as is done today with promise and iterators.

Observable should be a thenable

This was an oversight. We should definitely spec this so that async await works on Observables out of the box.

the return value of the Observable will be come the resolution of the promise. Likewise errors become rejections.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.