Giter VIP home page Giter VIP logo

interop-reactive-streams's Introduction

ZIO Interop Reactive Streams

This library provides an interoperability layer between ZIO and reactive streams.

Production Ready CI Badge Sonatype Releases Sonatype Snapshots javadoc ZIO Interop Reactive Streams

Introduction

ZIO integrates with Reactive Streams by providing conversions from zio.stream.Stream to org.reactivestreams.Publisher and from zio.stream.Sink to org.reactivestreams.Subscriber and vice versa. Simply import import zio.interop.reactivestreams._ to make the conversions available.

Installation

In order to use this library, we need to add the following line in our build.sbt file:

libraryDependencies += "dev.zio" %% "zio-interop-reactive-streams" % "2.0.0"

Examples

First, let's get a few imports out of the way.

import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactivestreams._
import zio.stream._

We use the following Publisher and Subscriber for the examples:

val publisher = new RangePublisher(3, 10)
val subscriber = new SyncSubscriber[Int] {
  override protected def whenNext(v: Int): Boolean = {
    print(s"$v, ")
    true
  }
}

Publisher to Stream

A Publisher used as a Stream buffers up to qSize elements. If possible, qSize should be a power of two for best performance. The default is 16.

val streamFromPublisher = publisher.toZIOStream(qSize = 16)
streamFromPublisher.run(Sink.collectAll[Integer])

Subscriber to Sink

When running a Stream to a Subscriber, a side channel is needed for signalling failures. For this reason toZIOSink returns a tuple of a callback and a Sink. The callback must be used to signal Stream failure. The type parameter on toZIOSink is the error type of the Stream.

val asSink = subscriber.toZIOSink[Throwable]
val failingStream = ZStream.range(3, 13) ++ ZStream.fail(new RuntimeException("boom!"))
ZIO.scoped {
  asSink.flatMap { case (signalError, sink) => // FIXME
    failingStream.run(sink).catchAll(signalError)
  }
}

Stream to Publisher

val stream = Stream.range(3, 13)
stream.toPublisher.flatMap { publisher =>
  UIO(publisher.subscribe(subscriber))
}

Sink to Subscriber

toSubscriber returns a Subscriber and an IO which completes with the result of running the Sink or the error if the Publisher fails. A Sink used as a Subscriber buffers up to qSize elements. If possible, qSize should be a power of two for best performance. The default is 16.

val sink = Sink.collectAll[Integer]
ZIO.scoped {
  sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) => 
    UIO(publisher.subscribe(subscriber)) *> result
  }
}

Documentation

Learn more on the ZIO Interop Reactive Streams homepage!

Contributing

For the general guidelines, see ZIO contributor's guide.

Code of Conduct

See the Code of Conduct

Support

Come chat with us on Badge-Discord.

License

License

interop-reactive-streams's People

Contributors

scala-steward avatar runtologist avatar renovate[bot] avatar mijicd avatar ghostdogpr avatar adamgfraser avatar khajavi avatar vigoo avatar zeal18 avatar mvillafuertem avatar darl avatar timothyklim avatar softinio avatar pjfanning avatar nightscape avatar lglo avatar neko-kai avatar geymed avatar asakaev avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.