Comments (20)
I'm not sure we need to do anything here. The RS contract effectively means that back pressure has to be supported somehow. So, trying to convert an Observable that doesn't support back pressure (in some way) should fail, which really means that it can't be subscribed to because we can't know ahead of time.
This puts it on the caller toPublisher() to ensure they've set up an observable that supports back pressure one way or another.
This hinges on .subscribe() failing the subscribe deterministically and immediately though. I don't know enough to know whether this holds in all, or enough, cases.
from rxjavareactivestreams.
@benjchristensen could observables that can't respect back pressure throw as soon as rx.Subscriber.request()
is called?
If so, that looks like it would solve this problem, as these adapters always call request(«»)
in onStart()
.
from rxjavareactivestreams.
An Observable
that doesn't respect backpressure will never emit a Producer
(Subscription
in Reactive Streams) and thus there will never be anything to call request
on. On an Observable
like this the call to request(n)
is a no-op. This is supported by design to allow hot sources. This is a Reactive Stream requirement to not support hot sources without a pre-defined strategy thus handling this disconnect between the two must live in the bridge.
If a consumer is fast it may never have an issue, but if the consumer is slower than the producer then the producer will not respect the request(n)
. It is for these cases that we need to throw a MissingBackpressureException
and force the developer to choose a strategy such as onBackpressureDrop
or onBackpressureBuffer
. This is exactly how RxJava itself behaves if an async operator like observeOn
is used with a hot source that doesn't obey the request(n)
requests.
Even if they are using temporal operators like throttleFirst
, sample
, debounce
, etc. a developer will still need to use onBackpressureDrop
or onBackpressureBuffer
in case even the temporally slowed producer is faster than the consumer is requesting.
One way to know if the operator immediately above supports reactive pull is whether it calls setProducer
on you. That by itself though only tells you about the operator immediately above you, there is no way to know if the entire stream above you supports it.
This is why I suggest the RxReactiveStreams bridge needs to maintain the count and throw a MissingBackpressureException
if the requested amount is exceeded so it acts as the intermediate between the looser RxJava contract and stricter Reactive Stream contract.
from rxjavareactivestreams.
Understood. It's a rather unfortunate situation though. I guess we'll have to see how much of a problem it is in practice that the stream configuration can fail under some circumstances.
from rxjavareactivestreams.
Well the alternative is what Reactive Streams requires ... predefining a strategy upfront. This can always be adopted in this bridge, as that's what Reactive Streams requires.
For example: RxReactiveStreams.toPublisher(stream, DefaultStrategy.DROP)
That is effectively what is always required by Reactive Streams since a hot source MUST obey the request(n)
thus must have a default strategy.
from rxjavareactivestreams.
Any idea what the overhead is when defining an unnecessary strategy? i.e. the ideal case where the consumer is fast enough so nothing needs to be dropped/buffered.
from rxjavareactivestreams.
onBackpressureDrop
is quite low, it's just the cost of reading/decrementing an AtomicLong: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java#L56
onBackpressureBuffer
is more costly (and dangerous as it would cause buffer bloat defeating the point of backpressure) as it puts everything through a queue: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java#L70
We should not ever choose one by default for a developer, but we could force the developer to choose one of these, or others such as (off the top of my head):
- fail (if the source is considered slow, or should support backpressure, then failing is appropriate ... and is the only choice we could make on behalf of a user)
- buffer then drop
- drop oldest (instead of the simple drop which drops new messages ... but more costly as it involves queueing and popping items off the front whereas normal drop is just a simple counter)
- throttleLast ... while back pressured we keep remembering the last value so when a request comes in we emit the last value but don't enqueue anything else.
from rxjavareactivestreams.
There's no good option here. So, I think we should go with your original suggestion and put it on the user.
I'll add the counting, throwing of MissingBackpressureException
and docs on toPublisher()
to make it clear that callers should set up a backpressure strategy.
I don't like to the idea of any kind of enum/switch to toPublisher()
as it shuts the door to some extent on other, more sophisticated, strategies that may come along in the future.
from rxjavareactivestreams.
Agreed. I don't like using an enum for this either.
The use of Reactive Streams is still new enough that I haven't seen a pattern emerge for handling user-provided strategies for hot sources. Have you seen anything elsewhere?
from rxjavareactivestreams.
I have not, but the extent of my experience with this is Rx and Ratpack.
from rxjavareactivestreams.
/cc @rkuhn @viktorklang @smaldini @jbrisbin Have any of you established patterns yet for how a Reactive Stream publisher gets a strategy defined by a user when it is a hot source and the user needs to choose what to do when it produces faster than the consumer?
from rxjavareactivestreams.
@benjchristensen We define overflow handling on the Stream
like these methods:
onOverflowBuffer()
onOverflowBuffer(Supplier<CompletableQueue<O>>)
onOverflowDrop()
The CompletableQueue<O>
is actually our PersistentQueue<O>
which uses Chronicle Queue to store items on disk.
from rxjavareactivestreams.
@jbrisbin What happens if someone doesn't choose one of those? Will a hot source be capable of overflowing the requested amount? In other words, if someone defines a Stream
from a hot source can it ignore the request(n)
even when passed as a Reactive Streams Publisher
?
from rxjavareactivestreams.
@benjchristensen I think that's a really good (and frequent!) question I hear. One of the most pleasing parts of Reactive Streams is that it pushes the knowledge of how much demand for data exists downstream right to the top, so any and all "unstoppable" Publishers know that there's nobody who's able to receive their data. The good thing about that is that it is the Publisher (or the source) of that data that is the most qualified to know what to do if nobody wants its data.
A sensor for instance, with very limited memory and a variable frequency of new output, may decide to only keep the latest reading, or perhaps keep an internal buffer of significant/recent samples of data to offer downsteam.
TL;DR: Passing in an OverflowStrategy when creating the "unstoppable" Publisher seems to be the most frequent, and best, solution I've seen so far.
from rxjavareactivestreams.
In other words, if someone defines a Stream from a hot source can it ignore the request(n) even when passed as a Reactive Streams Publisher?
That, per definition, is not a Publisher (it does not obey its contract)
from rxjavareactivestreams.
That, per definition, is not a Publisher (it does not obey its contract)
Yup, understood. I'm trying to clarify how the Stream
behaves in this regard since it offers operators for choosing how to do overflow rather than defining them as part of the Stream
as construction time (if I understand correctly).
Passing in an OverflowStrategy when creating the "unstoppable" Publisher seems to be the most frequent, and best, solution I've seen so far.
Great, thanks @viktorklang for the confirmation of this approach. The unfortunate side-effect of this is that it requires knowledge of what to do upfront when constructing the object.
from rxjavareactivestreams.
Yup, understood. I'm trying to clarify how the Stream behaves in this regard since it offers operators for choosing how to do overflow rather than defining them as part of the Stream as construction time (if I understand correctly).
Sorry, I didn't mean to answer on @jbrisbin's behalf—I too would like to know how they do it in Reactor :)
Great, thanks @viktorklang for the confirmation of this approach. The unfortunate side-effect of this is that it requires knowledge of what to do upfront when constructing the object.
I guess there's also the case where the publisher implementation "knows" what's the most appropriate way of dealing with the situation.
I think you're raising a great topic, one that I think would make total sense to have as a part of the "Implementor's Handbook"—section of the Reactive Streams documentation.
from rxjavareactivestreams.
To clarify: A Reactor Stream
is an aggregation of Processors so the overflow characteristics are declared for downstream publishers at creation time but a given Stream
is not a monolithic Publisher
, it is just an action that has a reference to an object that exposes methods for attaching other Processors.
Every stage in a Reactor Stream
is an essentially independent Publisher
and Subscriber
and can have publishing characteristics different from both upstream and downstream actions. In fact many actions create new streams internally. This allows us to introduce parallel operations, buffering, and other operations that shield downstream actions from what's happening upstream. A single Stream
could have actions that are preceded by buffering overflow and could themselves drop items if downstream actions create a bottleneck.
Overflow is handled in the ReactiveSubscription which buffers in a queue if the Dispatcher
has no capacity or downstream has requested less than is being signaled from upstream.
from rxjavareactivestreams.
To complete @jbrisbin, onse of the few rules that currently lead us to creating a buffering capabable queue (tracking requests):
- Is upstream/Publisher dispatcher different from downstream/Subscriber dispatcher, a Reactor async boundary
- Buffering ReactiveSubscription
- Is Publisher a Combining action / Non Linear (zip, merge...)
- Buffering ReactiveSubscription
- Is Hot Stream without Synchronous Dispatching (currently reviewed)
- Buffering ReactiveSubscription
- Is Publisher not a Reactor implementation
- Defer to Publisher Subscription handling, and just request it
- Is Subscriber not a Reactor implementation
- Buffering ReactiveSubscription
- Anything else (e.g. same dispatcher between publisher subscriber, or synchronous dispatcher)
- No-Buffering PushSubscription
from rxjavareactivestreams.
I'm closing this as being very old. RxJava 1.x now supports backpressure, except a few sources and operators. In those cases, I'd expect the user to apply the onBackpressureX
operators on the RxJava side before the conversion or apply the onBackpressureX
operator after wrapping the resulting Publisher
(Flowable.onBackpressureX
, Flux.onBackpressureX
).
from rxjavareactivestreams.
Related Issues (16)
- Release 0.3. HOT 18
- Release 0.4 HOT 13
- Upgrade to 1.0.0.RC1 of standard HOT 1
- Upgrade to Reactive Streams 1.0.0-RC4 HOT 5
- Upgrade to 1.0.0.RC5
- Questions/issues about the wrappers HOT 3
- OSGi Split Packages HOT 4
- SingleAsPublisher hangs on subscribe(Subscriber) call HOT 1
- Feature: RxJava 2 adapters HOT 2
- SingleAsPublisher throws NullPointerException with Single<Void> HOT 4
- Implement Reactive-Streams TCK Tests HOT 6
- Add toSubscriber conversion HOT 1
- Build and Release HOT 2
- Examples with Interop HOT 47
- Support for Processor HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rxjavareactivestreams.