Giter VIP home page Giter VIP logo

rxjavareactivestreams's People

Contributors

akarnokd avatar andrey-radchenko avatar benjchristensen avatar daschl avatar emanuelpalm avatar fengdai avatar ldaley avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxjavareactivestreams's Issues

Examples with Interop

We need some examples for the README to demonstrate usage.

Perhaps we can add unit test examples with dependencies on other libraries to demonstrate interop?

SingleAsPublisher throws NullPointerException with Single<Void>

I am trying to use RxReactiveStreams to convert from rx.Single to org.reactivestreams.Publisher, but it's always throwing an Exception when type is Void.

Basically, there is one interface for operating asynchronously with a method called doSomethingAsync(). The return value of this method is converted to a rx.Single. The problem comes when one implementator of doSomethingAsync() returns type Void (because there is nothing to return. )

It looks like the exception is thrown here as if it was not allowed to have a null value.

A workaround I'm using now is to convert the rx.Single to rx.Completable before calling RxReactiveStreams.toPublisher().

Here is a test case reproducing the issue:

public Void doSomethingAsync() {
    // Processing...
    return null;
}

@Test
public void emptySingleErrorExit() {
    Observable<Void> empty = Observable.fromCallable(this::doSomethingAsync);
    Single<Void> single = empty.toSingle();
    Publisher<Void> publisher = RxReactiveStreams.toPublisher(single);
    Subscriber<? super Void> subscriber = new Subscriber<Void>() {
        @Override
        public void onSubscribe(Subscription subscription) {
            System.out.println("Unexpected subscription");
        }

        @Override
        public void onNext(Void aVoid) {
            System.out.println(String.format("Unexpected value %s", aVoid));
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("Exit with error");
        }

        @Override
        public void onComplete() {
            System.out.println("Exit success");
        }
    };
    publisher.subscribe(subscriber);
    StepVerifier.create(publisher).verifyError(NullPointerException.class);
    StepVerifier.create(publisher).verifyErrorMessage("value");
}

OSGi Split Packages

Hi! And firstly, thanks for this project, really helpful being able to design for future standards.
Also, I really appreciate that you ship your bundles with OSGi headers, it makes it much easier to use in these environments!

The RxReactiveStreams class is in package rx, but unfortunately this causes a split package between this bundle and the main RxJava distribution. http://wiki.osgi.org/wiki/Split_Packages. This means the bundle doesn't resolve:

Error executing command: Uses constraint violation. Unable to resolve resource io.reactivex.rxjava-reactive-streams [io.reactivex.rxjava-reactive-streams/1.0.1] because it exports package 'rx' and is also exposed to it from resource io.reactivex.rxjava [io.reactivex.rxjava/1.0.14] via the following dependency chain:

  io.reactivex.rxjava-reactive-streams [io.reactivex.rxjava-reactive-streams/1.0.1]
    import: (&(osgi.wiring.package=rx.internal.operators)(version>=1.0.0)(!(version>=2.0.0)))
     |
    export: osgi.wiring.package: rx.internal.operators; uses:=rx
    export: osgi.wiring.package=rx
  io.reactivex.rxjava [io.reactivex.rxjava/1.0.14]

Thanks, Dan.

Release 0.4

I've made some refinements, and AFAIK the implementation is now at RC status. At this early stage, I think we should just go ahead and roll out 0.4.

Before the next release, I'd like to get some more test coverage into the build by way of tests that actually exercise interop. At the moment we only have that with Ratpack.

@benjchristensen note that we've gone back to a multi project build now, so we may have to work through those issues.

Hot Observable Handling - MissingBackpressureException

RxJava allows "hot" Observables in a different manner than Reactive Streams so we need to account for this.

In Reactive Streams a Publisher can never emit more than has been requested. An Observable however can choose to just be hot and emit.

See https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=114 and https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=21

screen shot 2014-11-08 at 8 10 18 am

This means that when converting from Observable to Reactive Stream Publisher we may have an Observable that does not obey request(n) either because it can't (it's hot) or the Observable just doesn't support it (thus it acts hot even if it is a cold source).

I think this means we need to put in some logic in the bridge that throws MissingBackpressureException if we receive more onNext than are requested so it complies with the RxJava contract and doesn't emit more to the downstream Reactive Stream implementation out of contract.

This would then force a choice of strategy for dealing with this: https://github.com/ReactiveX/RxJava/wiki/Backpressure and https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=89

Release 0.3.

@benjchristensen 0.2 had a problem that the TCK didn't catch, but the tests with Ratpack did.

Would you mind releasing again for @rkuhn's demos?

The release process might be broken though. It's now a multiproject build and I'm not sure I did it right for @quidryan 's sweet sweet tooling to handle the release stuff.

Support for Processor

Do we want to provide Processor impls?

If so, I think it would look something like this…

public static Processor<T, T> backpressureBufferingProcessor(Subject<T, T> subject)  {}
public static Processor<T, T> backpressureDroppingProcessor(Subject<T, T> subject)  {}

Add toSubscriber conversion

The article uses internal implementation code from the 0.3 version, as follows:

 val sub = new Subscriber[String]() {
      ...
}
    val subSink = SubscriberSink(new RxSubscriberToRsSubscriberAdapter(sub));

This use case seems reasonable, at least in the context of integration with Akka Streams

That would require that toSubscriber methods be added to RxReactiveStreams

SingleAsPublisher hangs on subscribe(Subscriber) call

The subscribe(Subscriber s) method calls s.onSubscribe(...), which in turn calls request(n) and goes into infinite loop waiting for wrapped single to complete. But actual Single has not been subscribed to yet and therefore infinite loop will never end.

    @Override
    public void subscribe(Subscriber<? super T> s) {
        SingleAsPublisherSubscriber<T> parent = new SingleAsPublisherSubscriber<T>(s);
        s.onSubscribe(parent); // apparently, this goes into infinite loop and never returns

        single.subscribe(parent);
    }

Questions/issues about the wrappers

  1. Why synchronize the source Observable all the time?
  2. Tracking subscribers is not thread safe: concurrent subscriptions may step on each other's updates. Subscribers may be in better option to detect multiple subscriptions by the fact they receive multiple Subscriptions.
  3. If n < 1 then it should return and not fall-through.
  4. If I understand the reactive-streams spec correctly, the request and cancel methods on the Subscription has to be called sequentially because it doesn't mandate such calls to be threadsafe when implementing a Subscription. We may need to defensively serialize these method calls.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.