reactivex / rxjavareactivestreams Goto Github PK
View Code? Open in Web Editor NEWAdapter between RxJava and ReactiveStreams
License: Apache License 2.0
Adapter between RxJava and ReactiveStreams
License: Apache License 2.0
We need some examples for the README to demonstrate usage.
Perhaps we can add unit test examples with dependencies on other libraries to demonstrate interop?
I am trying to use RxReactiveStreams
to convert from rx.Single
to org.reactivestreams.Publisher
, but it's always throwing an Exception when type is Void
.
Basically, there is one interface for operating asynchronously with a method called doSomethingAsync()
. The return value of this method is converted to a rx.Single
. The problem comes when one implementator of doSomethingAsync()
returns type Void
(because there is nothing to return. )
It looks like the exception is thrown here as if it was not allowed to have a null
value.
A workaround I'm using now is to convert the rx.Single
to rx.Completable
before calling RxReactiveStreams.toPublisher()
.
Here is a test case reproducing the issue:
public Void doSomethingAsync() {
// Processing...
return null;
}
@Test
public void emptySingleErrorExit() {
Observable<Void> empty = Observable.fromCallable(this::doSomethingAsync);
Single<Void> single = empty.toSingle();
Publisher<Void> publisher = RxReactiveStreams.toPublisher(single);
Subscriber<? super Void> subscriber = new Subscriber<Void>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Unexpected subscription");
}
@Override
public void onNext(Void aVoid) {
System.out.println(String.format("Unexpected value %s", aVoid));
}
@Override
public void onError(Throwable throwable) {
System.out.println("Exit with error");
}
@Override
public void onComplete() {
System.out.println("Exit success");
}
};
publisher.subscribe(subscriber);
StepVerifier.create(publisher).verifyError(NullPointerException.class);
StepVerifier.create(publisher).verifyErrorMessage("value");
}
Hi! And firstly, thanks for this project, really helpful being able to design for future standards.
Also, I really appreciate that you ship your bundles with OSGi headers, it makes it much easier to use in these environments!
The RxReactiveStreams class is in package rx, but unfortunately this causes a split package between this bundle and the main RxJava distribution. http://wiki.osgi.org/wiki/Split_Packages. This means the bundle doesn't resolve:
Error executing command: Uses constraint violation. Unable to resolve resource io.reactivex.rxjava-reactive-streams [io.reactivex.rxjava-reactive-streams/1.0.1] because it exports package 'rx' and is also exposed to it from resource io.reactivex.rxjava [io.reactivex.rxjava/1.0.14] via the following dependency chain:
io.reactivex.rxjava-reactive-streams [io.reactivex.rxjava-reactive-streams/1.0.1]
import: (&(osgi.wiring.package=rx.internal.operators)(version>=1.0.0)(!(version>=2.0.0)))
|
export: osgi.wiring.package: rx.internal.operators; uses:=rx
export: osgi.wiring.package=rx
io.reactivex.rxjava [io.reactivex.rxjava/1.0.14]
Thanks, Dan.
I've made some refinements, and AFAIK the implementation is now at RC status. At this early stage, I think we should just go ahead and roll out 0.4.
Before the next release, I'd like to get some more test coverage into the build by way of tests that actually exercise interop. At the moment we only have that with Ratpack.
@benjchristensen note that we've gone back to a multi project build now, so we may have to work through those issues.
I have released to Maven Central rxjava-reactive-streams 0.1.0
/cc @alkemist
RxJava allows "hot" Observables in a different manner than Reactive Streams so we need to account for this.
In Reactive Streams a Publisher can never emit more than has been requested. An Observable however can choose to just be hot and emit.
See https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=114 and https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=21
This means that when converting from Observable
to Reactive Stream Publisher
we may have an Observable
that does not obey request(n)
either because it can't (it's hot) or the Observable
just doesn't support it (thus it acts hot even if it is a cold source).
I think this means we need to put in some logic in the bridge that throws MissingBackpressureException
if we receive more onNext
than are requested so it complies with the RxJava contract and doesn't emit more to the downstream Reactive Stream implementation out of contract.
This would then force a choice of strategy for dealing with this: https://github.com/ReactiveX/RxJava/wiki/Backpressure and https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=89
@benjchristensen 0.2 had a problem that the TCK didn't catch, but the tests with Ratpack did.
Would you mind releasing again for @rkuhn's demos?
The release process might be broken though. It's now a multiproject build and I'm not sure I did it right for @quidryan 's sweet sweet tooling to handle the release stuff.
Hi,
would you release a new version for RxJava2 (soon) ?
Thanks
Do we want to provide Processor
impls?
If so, I think it would look something like this…
public static Processor<T, T> backpressureBufferingProcessor(Subject<T, T> subject) {}
public static Processor<T, T> backpressureDroppingProcessor(Subject<T, T> subject) {}
The article uses internal implementation code from the 0.3 version, as follows:
val sub = new Subscriber[String]() {
...
}
val subSink = SubscriberSink(new RxSubscriberToRsSubscriberAdapter(sub));
This use case seems reasonable, at least in the context of integration with Akka Streams
That would require that toSubscriber
methods be added to RxReactiveStreams
This isn't released yet. We can't use M2 because of reactive-streams/reactive-streams-jvm#165.
We need to upgrade to RC4 and make sure everything is good before Reactive Streams goes to 1.0.0.Final.
The subscribe(Subscriber s)
method calls s.onSubscribe(...)
, which in turn calls request(n) and goes into infinite loop waiting for wrapped single to complete. But actual Single
has not been subscribed to yet and therefore infinite loop will never end.
@Override
public void subscribe(Subscriber<? super T> s) {
SingleAsPublisherSubscriber<T> parent = new SingleAsPublisherSubscriber<T>(s);
s.onSubscribe(parent); // apparently, this goes into infinite loop and never returns
single.subscribe(parent);
}
Observable
all the time?Subscriber
s may be in better option to detect multiple subscriptions by the fact they receive multiple Subscription
s.n < 1
then it should return and not fall-through.request
and cancel
methods on the Subscription
has to be called sequentially because it doesn't mandate such calls to be threadsafe when implementing a Subscription
. We may need to defensively serialize these method calls.The tests for the Reactive-Streams project need to be implemented.
See reactive-streams/reactive-streams-jvm#91 (comment) for discussion.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.