Giter VIP home page Giter VIP logo

Comments (20)

ldaley avatar ldaley commented on May 15, 2024

I'm not sure we need to do anything here. The RS contract effectively means that back pressure has to be supported somehow. So, trying to convert an Observable that doesn't support back pressure (in some way) should fail, which really means that it can't be subscribed to because we can't know ahead of time.
This puts it on the caller toPublisher() to ensure they've set up an observable that supports back pressure one way or another.

This hinges on .subscribe() failing the subscribe deterministically and immediately though. I don't know enough to know whether this holds in all, or enough, cases.

from rxjavareactivestreams.

ldaley avatar ldaley commented on May 15, 2024

@benjchristensen could observables that can't respect back pressure throw as soon as rx.Subscriber.request() is called?

If so, that looks like it would solve this problem, as these adapters always call request(«») in onStart().

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on May 15, 2024

An Observable that doesn't respect backpressure will never emit a Producer (Subscription in Reactive Streams) and thus there will never be anything to call request on. On an Observable like this the call to request(n) is a no-op. This is supported by design to allow hot sources. This is a Reactive Stream requirement to not support hot sources without a pre-defined strategy thus handling this disconnect between the two must live in the bridge.

If a consumer is fast it may never have an issue, but if the consumer is slower than the producer then the producer will not respect the request(n). It is for these cases that we need to throw a MissingBackpressureException and force the developer to choose a strategy such as onBackpressureDrop or onBackpressureBuffer. This is exactly how RxJava itself behaves if an async operator like observeOn is used with a hot source that doesn't obey the request(n) requests.

Even if they are using temporal operators like throttleFirst, sample, debounce, etc. a developer will still need to use onBackpressureDrop or onBackpressureBuffer in case even the temporally slowed producer is faster than the consumer is requesting.

One way to know if the operator immediately above supports reactive pull is whether it calls setProducer on you. That by itself though only tells you about the operator immediately above you, there is no way to know if the entire stream above you supports it.

This is why I suggest the RxReactiveStreams bridge needs to maintain the count and throw a MissingBackpressureException if the requested amount is exceeded so it acts as the intermediate between the looser RxJava contract and stricter Reactive Stream contract.

from rxjavareactivestreams.

ldaley avatar ldaley commented on May 15, 2024

Understood. It's a rather unfortunate situation though. I guess we'll have to see how much of a problem it is in practice that the stream configuration can fail under some circumstances.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on May 15, 2024

Well the alternative is what Reactive Streams requires ... predefining a strategy upfront. This can always be adopted in this bridge, as that's what Reactive Streams requires.

For example: RxReactiveStreams.toPublisher(stream, DefaultStrategy.DROP)

That is effectively what is always required by Reactive Streams since a hot source MUST obey the request(n) thus must have a default strategy.

from rxjavareactivestreams.

ldaley avatar ldaley commented on May 15, 2024

Any idea what the overhead is when defining an unnecessary strategy? i.e. the ideal case where the consumer is fast enough so nothing needs to be dropped/buffered.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on May 15, 2024

onBackpressureDrop is quite low, it's just the cost of reading/decrementing an AtomicLong: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java#L56

onBackpressureBuffer is more costly (and dangerous as it would cause buffer bloat defeating the point of backpressure) as it puts everything through a queue: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java#L70

We should not ever choose one by default for a developer, but we could force the developer to choose one of these, or others such as (off the top of my head):

  • fail (if the source is considered slow, or should support backpressure, then failing is appropriate ... and is the only choice we could make on behalf of a user)
  • buffer then drop
  • drop oldest (instead of the simple drop which drops new messages ... but more costly as it involves queueing and popping items off the front whereas normal drop is just a simple counter)
  • throttleLast ... while back pressured we keep remembering the last value so when a request comes in we emit the last value but don't enqueue anything else.

from rxjavareactivestreams.

ldaley avatar ldaley commented on May 15, 2024

There's no good option here. So, I think we should go with your original suggestion and put it on the user.

I'll add the counting, throwing of MissingBackpressureException and docs on toPublisher() to make it clear that callers should set up a backpressure strategy.

I don't like to the idea of any kind of enum/switch to toPublisher() as it shuts the door to some extent on other, more sophisticated, strategies that may come along in the future.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on May 15, 2024

Agreed. I don't like using an enum for this either.

The use of Reactive Streams is still new enough that I haven't seen a pattern emerge for handling user-provided strategies for hot sources. Have you seen anything elsewhere?

from rxjavareactivestreams.

ldaley avatar ldaley commented on May 15, 2024

I have not, but the extent of my experience with this is Rx and Ratpack.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on May 15, 2024

/cc @rkuhn @viktorklang @smaldini @jbrisbin Have any of you established patterns yet for how a Reactive Stream publisher gets a strategy defined by a user when it is a hot source and the user needs to choose what to do when it produces faster than the consumer?

from rxjavareactivestreams.

jbrisbin avatar jbrisbin commented on May 15, 2024

@benjchristensen We define overflow handling on the Stream like these methods:

onOverflowBuffer()
onOverflowBuffer(Supplier<CompletableQueue<O>>)
onOverflowDrop()

The CompletableQueue<O> is actually our PersistentQueue<O> which uses Chronicle Queue to store items on disk.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on May 15, 2024

@jbrisbin What happens if someone doesn't choose one of those? Will a hot source be capable of overflowing the requested amount? In other words, if someone defines a Stream from a hot source can it ignore the request(n) even when passed as a Reactive Streams Publisher?

from rxjavareactivestreams.

viktorklang avatar viktorklang commented on May 15, 2024

@benjchristensen I think that's a really good (and frequent!) question I hear. One of the most pleasing parts of Reactive Streams is that it pushes the knowledge of how much demand for data exists downstream right to the top, so any and all "unstoppable" Publishers know that there's nobody who's able to receive their data. The good thing about that is that it is the Publisher (or the source) of that data that is the most qualified to know what to do if nobody wants its data.

A sensor for instance, with very limited memory and a variable frequency of new output, may decide to only keep the latest reading, or perhaps keep an internal buffer of significant/recent samples of data to offer downsteam.

TL;DR: Passing in an OverflowStrategy when creating the "unstoppable" Publisher seems to be the most frequent, and best, solution I've seen so far.

from rxjavareactivestreams.

viktorklang avatar viktorklang commented on May 15, 2024

@benjchristensen:

In other words, if someone defines a Stream from a hot source can it ignore the request(n) even when passed as a Reactive Streams Publisher?

That, per definition, is not a Publisher (it does not obey its contract)

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on May 15, 2024

That, per definition, is not a Publisher (it does not obey its contract)

Yup, understood. I'm trying to clarify how the Stream behaves in this regard since it offers operators for choosing how to do overflow rather than defining them as part of the Stream as construction time (if I understand correctly).

Passing in an OverflowStrategy when creating the "unstoppable" Publisher seems to be the most frequent, and best, solution I've seen so far.

Great, thanks @viktorklang for the confirmation of this approach. The unfortunate side-effect of this is that it requires knowledge of what to do upfront when constructing the object.

from rxjavareactivestreams.

viktorklang avatar viktorklang commented on May 15, 2024

Yup, understood. I'm trying to clarify how the Stream behaves in this regard since it offers operators for choosing how to do overflow rather than defining them as part of the Stream as construction time (if I understand correctly).

Sorry, I didn't mean to answer on @jbrisbin's behalf—I too would like to know how they do it in Reactor :)

Great, thanks @viktorklang for the confirmation of this approach. The unfortunate side-effect of this is that it requires knowledge of what to do upfront when constructing the object.

I guess there's also the case where the publisher implementation "knows" what's the most appropriate way of dealing with the situation.

I think you're raising a great topic, one that I think would make total sense to have as a part of the "Implementor's Handbook"—section of the Reactive Streams documentation.

from rxjavareactivestreams.

jbrisbin avatar jbrisbin commented on May 15, 2024

To clarify: A Reactor Stream is an aggregation of Processors so the overflow characteristics are declared for downstream publishers at creation time but a given Stream is not a monolithic Publisher, it is just an action that has a reference to an object that exposes methods for attaching other Processors.

Every stage in a Reactor Stream is an essentially independent Publisher and Subscriber and can have publishing characteristics different from both upstream and downstream actions. In fact many actions create new streams internally. This allows us to introduce parallel operations, buffering, and other operations that shield downstream actions from what's happening upstream. A single Stream could have actions that are preceded by buffering overflow and could themselves drop items if downstream actions create a bottleneck.

Overflow is handled in the ReactiveSubscription which buffers in a queue if the Dispatcher has no capacity or downstream has requested less than is being signaled from upstream.

from rxjavareactivestreams.

smaldini avatar smaldini commented on May 15, 2024

To complete @jbrisbin, onse of the few rules that currently lead us to creating a buffering capabable queue (tracking requests):

  • Is upstream/Publisher dispatcher different from downstream/Subscriber dispatcher, a Reactor async boundary
    • Buffering ReactiveSubscription
  • Is Publisher a Combining action / Non Linear (zip, merge...)
    • Buffering ReactiveSubscription
  • Is Hot Stream without Synchronous Dispatching (currently reviewed)
    • Buffering ReactiveSubscription
  • Is Publisher not a Reactor implementation
    • Defer to Publisher Subscription handling, and just request it
  • Is Subscriber not a Reactor implementation
    • Buffering ReactiveSubscription
  • Anything else (e.g. same dispatcher between publisher subscriber, or synchronous dispatcher)
    • No-Buffering PushSubscription

from rxjavareactivestreams.

akarnokd avatar akarnokd commented on May 15, 2024

I'm closing this as being very old. RxJava 1.x now supports backpressure, except a few sources and operators. In those cases, I'd expect the user to apply the onBackpressureX operators on the RxJava side before the conversion or apply the onBackpressureX operator after wrapping the resulting Publisher (Flowable.onBackpressureX, Flux.onBackpressureX).

from rxjavareactivestreams.

Related Issues (16)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.