Giter VIP home page Giter VIP logo

Comments (25)

benlesh avatar benlesh commented on August 18, 2024

/cc @trxcllnt @benjchristensen

from rxjs.

trxcllnt avatar trxcllnt commented on August 18, 2024

@Blesh heh, this is identical to the Observer interface from my experiments this weekend. I approve!

from rxjs.

benlesh avatar benlesh commented on August 18, 2024

👍

from rxjs.

trxcllnt avatar trxcllnt commented on August 18, 2024

@Blesh one slight difference -- instead of a setter for subscription, my interface defined a public subscription property, which allows us to read the subscription from the "destination" observer and share it with the "source" observer. Check it:

interface IDisposable {
    isDisposed: boolean;
    dispose(): void;
}

interface ICompositeDisposable extends IDisposable {
    length: number;
    add(disposable: IDisposable | (() => void) | void): ICompositeDisposable;
    remove(disposable: IDisposable): ICompositeDisposable;
}

interface IObserver<T> extends IDisposable {
    disposable: ICompositeDisposable;
    next<T>(value: T): void;
    throw<E>(error: E): void;
    return(): void;
}

from rxjs.

benlesh avatar benlesh commented on August 18, 2024

@trxcllnt to further flush this out, the there is a Subscriber type that Rx Java uses that we're currently calling Observer or "safe" Observer:

class Subscriber implements Observer {
  subscribed((subscription: Subscription) => void): void
  next((value: any) => void): void
  error((err: any) => void): void
  complete(() => void): void

  // adds these
  add(subscription: Subscription): void
  unsubscribe(): void
  isUnsubscribed: boolean;
}

Then technically Observable ctor would get a Subscriber:

class Observable {
  constructor((subscriber: Subscriber) => Subscription|Function|void ): Subscription

  /** other stuff here **/
}

from rxjs.

headinthebox avatar headinthebox commented on August 18, 2024

By far the easiest thing is to merge observer and subscriber (i.e. collapse both into observer). You never need to distinguish between them.

from rxjs.

benjchristensen avatar benjchristensen commented on August 18, 2024

This approach is where RxJava is heading with v2 as discussed here: ReactiveX/RxJava#2450

In particular, it will adopt the Reactive Streams API which are the result of 1+ year of collaboration with various companies. We explored many different improvements to the core Observable/Observer interface while adding backpressure, and the resulting onSubscribe(Subscription s) works elegantly, while also being efficient, which many alternatives (such as next returning a Future) were not. We have considered a Javascript definition: https://github.com/reactive-streams/reactive-streams-js.

The backpressure piece is not required for this to be a good design for RxJS, though I suggest it also does future-proof RxJS to allow that being added to support Node.js apps, which will end up needing it or all the same issues as we had with RxJava will result.

@trxcllnt the property model is great for a concrete implementation of IObserver. I suggest however that the interface be defined with onSubscribe(IDisposable) so that concrete implementations can behave however they wish.

// use term Subscription instead of Disposable so it represents the subscription lifecycle, and can be used for `request` semantics if that is added to RxJS
interface Subscription {
    isUnsubscribed: boolean;
    cancel(): void; // cancel/unsubscribe/whatever
    request(n: number); // optional support for micro-leasing to allow async flow-control, backpressure
}

interface Observer<T> {
    subscribed(subscription: Subscription); // always called when Observable is subscribed to, unless onError is called instead
    next<T>(value: T): void;
    throw<E>(error: E): void;
    return(): void;
}

from rxjs.

benlesh avatar benlesh commented on August 18, 2024

By far the easiest thing is to merge observer and subscriber (i.e. collapse both into observer). You never need to distinguish between them.

@headinthebox I agree. I guess what I'm trying to do is linguistically distinguish between "Observer The Interface" (as handed to subscribe) and "Observer: the safe thing we give to people building Observables" (as given in the Observable constructor) ?

from rxjs.

headinthebox avatar headinthebox commented on August 18, 2024

There should only be one (and only one type of subscription)

trait Subscription {
  def unsubscribe(): Unit
  def isUnsubscribed: Boolean
  def +=(child: Subscription): Unit
  def -=(child: Subscription): Unit
}

trait Observer[-T] extends Subscription {
  def onNext(value: T): Unit 
  def onError(error: Throwable): Unit 
  def onCompleted(): Unit 
 }

from rxjs.

headinthebox avatar headinthebox commented on August 18, 2024

@benjchristensen Why have two separate interfaces?

from rxjs.

benlesh avatar benlesh commented on August 18, 2024

@headinthebox I think we're all saying the same thing. The Subscriber implements both interfaces.

from rxjs.

headinthebox avatar headinthebox commented on August 18, 2024

What I am saying is that we do not need a separate type for observer. Less abstractions == better abstractions.

from rxjs.

benjchristensen avatar benjchristensen commented on August 18, 2024

Why have two separate interfaces?

@headinthebox What are the separate interfaces you're referring to?

from rxjs.

benlesh avatar benlesh commented on August 18, 2024

What I am saying is that we do not need a separate type for observer. Less abstractions == better abstractions.

Yes... 100% agreed. I think that's where we're all heading.

from rxjs.

headinthebox avatar headinthebox commented on August 18, 2024

@benjchristensen Observer and Subscription. Why doesn't Observer extend Subscription.

from rxjs.

benjchristensen avatar benjchristensen commented on August 18, 2024

@headinthebox The types I would envision are:

interface Observable {
    subscribe(o: Observer<T>) : void; // or could return Subscription
    lift(operator): Observable<R>;
}

interface Subscription {
    unsubscribe(): void;
    isUnsubscribed(): boolean;
    request(n: number): void // optional if backpressure is added
}

interface Observer<T> {
   onSubscribe(s: Subscription): void;
   onNext(t: T): void;
   onError(e: any): void;
   onComplete(): void;
}

A concrete implementation of the Observer interface could be a Subscriber like this:

class Subscriber<T> extends Subscription {
... the implementation here ...
}

The reason for Observer not extending Subscription is that we need onSubscribe being passed from the Observable if we ever want to support backpressure, which will be needed if RxJS wants to work in Node.js environments, for the same reasons Java needed it. In other words, it's not just a "cancellation token" any longer, but represents the subscription lifecycle. It also eliminates the need for Subscription to have concrete add/remove functionality, as the Observable can now emit a Subscription with callbacks as needed for both async and sync cases, and lift can decorate it as needed.

RxJava has chosen to go this route after long debate about alternative designs in the Reactive Streams collaboration: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#specification.

This approach allows us to do the following things:

  • support efficient unsubscription for both asynchronous and synchronous Observables
    • synchronous works in the "cancellation token" approach, checking "isUnsubscribed"
    • asynchronous works by callbacks registered with the Subscription
  • support composition through lift so that onSubscribed can either pass-thru, or decorate or replace the Subscription as needed at each level of composition
  • support optional flow control (so-called "reactive-pull" backpressure) via a request(n: number) method on the Subscription without adding object allocations (as a Promise next(t) approach does)
  • it treats the subscription asynchronously, so it works for both synchronous and asynchronous subscription models

Even if backpressure is not being pursued immediately for RxJS (which I suggest would be a mistake as it will end up being needed), this model is very clean as it allows Observer to be just an interface rather than a concrete type, while still allowing all the needed use cases. A concrete type of Subscriber can still be used if wanted that implements Observer and extends Subscription.

from rxjs.

benjchristensen avatar benjchristensen commented on August 18, 2024

Of course, the Observer implements Subscription approach works fine if backpressure is never going to be wanted. I suggest the two key points of discussion are:

  • support composition through lift so that onSubscribed can either pass-thru, or decorate or replace the Subscription as needed at each level of composition (without Subscription.add/remove)
  • support optional flow control (so-called "reactive-pull" backpressure) via a request(n: number) method on the Subscription without adding object allocations (as a Promise next(t) approach does)

from rxjs.

benlesh avatar benlesh commented on August 18, 2024

I feel like I have 3-4 designs conflated in my head now.

from rxjs.

headinthebox avatar headinthebox commented on August 18, 2024

The reason for Observer not extending Subscription is that we need onSubscribe
being passed from the Observable if we ever want to support backpressure

I need to chew on that, not sure that I understand that is a necessary condition (intuition: back pressure is like an asynchronous version of iterable, and iterator extends subscription, in .NET).

from rxjs.

benjchristensen avatar benjchristensen commented on August 18, 2024

necessary condition

You'll end up with a different object then ... such as RxJava which had to add the Producer type in addition to Subscription.

from rxjs.

benlesh avatar benlesh commented on August 18, 2024

It seems to me that both things can be supported. Observer extending Subscription and having an onSubscribe-type handler aren't mutually exclusive.

from rxjs.

benjchristensen avatar benjchristensen commented on August 18, 2024

It seems to me that both things can be supported

Correct, though it could be confusing. For example, the Observable should probably not receive the Subscription in "through the top" and then also be emitting via onSubscribed.

from rxjs.

benlesh avatar benlesh commented on August 18, 2024

Closing as this has gone stale, and I think it's been decided this is the approach we're going for.

from rxjs.

jachenry avatar jachenry commented on August 18, 2024

@Blesh What was the final decision for matching the onSubscribe functionality of RxJava? I tried searching through the source but don't see a way to get notified if someone subscribes.

from rxjs.

lock avatar lock commented on August 18, 2024

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

from rxjs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.