Giter VIP home page Giter VIP logo

zio / zio Goto Github PK

View Code? Open in Web Editor NEW
4.0K 113.0 1.2K 1.15 GB

ZIO — A type-safe, composable library for async and concurrent programming in Scala

Home Page: https://zio.dev

License: Apache License 2.0

Scala 96.99% Shell 0.40% CSS 1.69% Java 0.09% JavaScript 0.74% TypeScript 0.09% Nix 0.01%
effects concurrency asynchronicity functional-programming queues streams promises stm asynchronous concurrent

zio's Introduction

ZIO Logo

Project Stage CI Release Snapshot Issues
Project stage CI Release Artifacts Snapshot Artifacts Average time to resolve an issue
Scaladoc Scaladex Discord Twitter Gitpod
Scaladoc Badge-Scaladex-page Badge-Discord Badge-Twitter Gitpod ready-to-code

Welcome to ZIO

ZIO is a zero-dependency Scala library for asynchronous and concurrent programming.

Powered by highly-scalable, non-blocking fibers that never waste or leak resources, ZIO lets you build scalable, resilient, and reactive applications that meet the needs of your business.

  • High-performance. Build scalable applications with minimal runtime overhead.
  • Type-safe. Use the full power of the Scala compiler to catch bugs at compile time.
  • Concurrent. Easily build concurrent apps without deadlocks, race conditions, or complexity.
  • Asynchronous. Write sequential code that looks the same whether it's asynchronous or synchronous.
  • Resource-safe. Build apps that never leak resources (including threads!), even when they fail.
  • Testable. Inject test services into your app for fast, deterministic, and type-safe testing.
  • Resilient. Build apps that never lose errors, and which respond to failure locally and flexibly.
  • Functional. Rapidly compose solutions to complex problems from simple building blocks.

To learn more about ZIO, see the following references:


Adopters

Following is a partial list of companies happily using ZIO in production to craft concurrent applications.

Want to see your company here? Submit a PR!

Sponsors

Ziverge

Ziverge is a leading contributor to ZIO.

Scalac

Scalac sponsors ZIO Hackathons and contributes work to multiple projects in ZIO ecosystem.

Septimal Mind

Septimal Mind sponsors work on ZIO Tracing and continuous maintenance.

YourKit

YourKit generously provides use of their monitoring and profiling tools to maximize the performance of ZIO applications.



Code of Conduct

See the Code of Conduct


Support

Come chat with us on Badge-Discord.


Legal

Copyright 2017 - 2024 John A. De Goes and the ZIO Contributors. All rights reserved.

zio's People

Contributors

adamgfraser avatar artempyanykh avatar ashabhasa avatar dan-m avatar dkarlinsky avatar ghostdogpr avatar hmemcpy avatar ioleo avatar iravid avatar jdegoes avatar khajavi avatar kitlangton avatar ktonga avatar lglo avatar luis3m avatar mijicd avatar mschuwalow avatar neko-kai avatar nequissimus avatar renovate[bot] avatar saraiva132 avatar scala-steward avatar sideeffffect avatar simpadjo avatar softinio avatar swoogles avatar vasilmkd avatar vigoo avatar vilkina avatar wi101 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

zio's Issues

Inspiration from Concurrent ML / Hopac

Moved from scalaz/scalaz#1868

Hello,
I've witnessed with great excitement the recent development in effect-full/asynchronous programming, like the IO monad in scalaz by @jdegoes (and other Scala libs). Also, the need to work around the lack of green threads is pressing in other languages. I've been reading @wingo 's path to implement concurrency in Guile (and Lua) -- he drew inspiration from Concurrent ML. Indeed it seems like, although it has a branding problem, Concurrent ML is what smart people recommend/use.
So my question would be, if there is something we could learn from CML or, more specifically, Hopac (which is a CML inspired library for F#)?

There's this hierarchy like

                           Job<_>
                            |
                           Alt<_>
                            |
      +-------+--------+----+-----+--------+--------+
      |       |        |          |        |        |
     Ch<_>  Latch   Mailbox<_>  MVar<_>  Proc   Promise<_>
                                                    |
                                                  IVar<_>

The most interesting types there are MVar, IVar and Channel. In Scala(z) it could be described by a typeclass hierarchy like

trait AutoCloseableAsync[C, M[+_, _]] {
  def close(c:C): M[Exception, Unit]
}

trait CloseableAsync[C, M[+_, _]] extends AutoCloseableAsync[C, M] {
  def close(c:C): M[IOException, Unit]
}

trait Job[Jb[_, _]] extends Monad[Jb] {
  def delay[E, A](j: Unit => Jb[E, A]): Jb[E, A]
  def thunk[E, A](j: Unit => A): Jb[E, A]
  def raises[E, A](e: E): Jb[E, A]
  def tryIn[E1, E2, A, B](j: Jb[E1, A])(fa: A => Jb[E2, B])(fe: E1 => Jb[E2, B]): Jb[E2, B]
  def catches[E1, E2, A](j: Jb[E1, A]): Jb[E2, E1 \/ A]
  def using[E, A : AutoCloseableAsync, B](x: A)(fa: A => Jb[E, B]): Jb[Exception \/ E, B]
  def using[E, A : CloseableAsync, B](x: A)(fa: A => Jb[E, B]): Jb[IOException \/ E, B]
  def using[E, A, B](x: A)(fa: A => Jb[E, B])(dispose: A => Jb[E, Unit]): Jb[E, B]
}

trait Event[Evnt[_, _]] extends Job[Evnt] {
  def never[E, A]: Evnt[E, A]
  def choose[E, A](e1:Evnt[E, A], e2:Evnt[E, A]): Evnt[E, A]
  def choose[E, A](es:Evnt[E, A]*): Evnt[E, A]
  def choose[E, A](es:Iterable[Evnt[E, A]]): Evnt[E, A]

  def withNackJob[E, A, Prms[_, _]:Promise_, Jb[_, _]:Job](f: Prms[E, Unit] => Jb[E, Evnt[E, A]]): Evnt[E, A]
}

trait Promise_[Prms[_, _]] extends Event[Prms] {
  def read[E, A, Evnt[_, _]:Event](p:Prms[E, A]): Evnt[E, A]
  def start[E, A, Jb[_, _]](j:Jb[E, A]): Jb[Void, Prms[E, A]]
}

trait IVar[Ivr[_, _]] extends Promise_[Ivr] {
  def empty[E, A]: Ivr[E, A]
  def fill[E, A, Jb[_, _]](i:Ivr[E, A])(x:A): Jb[E, Unit]
}

trait Channel[Ch[_, _]] extends Event[Ch] {
  def give[E, A, Evnt[_, _]:Event](ch:Ch[E, A])(x:A): Evnt[E, Unit]
  def send[E, A, Jb[_, _]:Job](ch:Ch[E, A])(x:A): Jb[E, Unit]
  def take[E, A, Evnt[_, _]:Event](ch:Ch[E, A]): Evnt[E, A]

  def tryGive[E, A, Jb[_, _]:Job](ch:Ch[E, A])(x:A): Jb[E, Boolean]
  def tryTake[E, A, Jb[_, _]:Job](ch:Ch[E, A]): Jb[E, Maybe[A]]
}

trait Latch[Ltch[_, Unit]] extends Event[Ltch] {
  def create[E](count:Int): Ltch[E, Unit]
  def await[E, Evnt[_, _]:Event](l: Ltch[E, Unit]): Evnt[E, Unit]
  def decrement[E, Jb[_, _]:Job](l: Ltch[E, Unit]): Jb[E, Unit]
  def increment[E, Jb[_, _]:Job](l: Ltch[E, Unit]): Jb[E, Unit]
}

trait MVar[Mvr[_, _]] extends Event[Mvr] {
  def empty[E, A]: Mvr[E, A]
  def read[E, A, Evnt[_, _]:Event](m:Mvr[E, A]): Evnt[E, A]
  def fill[E, A, Jb[_, _]](m:Mvr[E, A])(x:A): Jb[E, Unit]
  def take[E, A, Evnt[_, _]:Event](m:Mvr[E, A]): Evnt[E, A]
}

trait Mailbox[Mlbx[_, _]] extends Event[Mlbx] {
  def empty[E, A]: Mlbx[E, A]
  def send[E, A, Jb[_, _]](m:Mlbx[E, A])(x:A): Jb[E, Unit]
  def take[E, A, Evnt[_, _]:Event](m:Mlbx[E, A]): Evnt[E, A]
}

trait Proc[Prc[_, Unit]] extends Event[Prc] {
  def start[E, A, Jb[_, _]:Job](j:Jb[E, Unit]): Jb[Void, Prc[E, Unit]]
  def bind[E, A, Jb[_, _]:Job](f: Prc[E, Unit] => Jb[E, A]): Jb[E, A]
  def self[E, Jb[_, _]:Job]: Jb[E, Prc[E, Unit]]
  def join[E, Evnt[_, _]:Event](p:Prc[E, Unit]): Evnt[E, Unit]
}

Then there's also a Stream implementation -- kind of like

sealed trait Cons[A]
case object Nil extends Cons[Void]
case class ConsCase[A](value: A, next: Promise[Cons[A]])

type Stream[A] = Promise[Cons[A]]

What do you guys think of this? Can Scalaz 8 take something from CML/Hopac? This is not a deeply thought through proposal, just very rough (not even complete) sketch to kick off a discussion.

Church-encoded Maybe

@jdegoes you in scalaz/scalaz#1798 specify that replacements for Maybe and \/ should be church-encoded. Why exactly? as opposed to sum types.

So far I've got this, but dunno how to get rid of B type parameter, it should be using some sort of Haskell's forall but I can't figure out how to express that in Scala.

final class Perhaps[A, B] private (val perhaps: (B, A => B) => B) extends AnyVal {
  def map[C](g: A => C): Perhaps[C, B] = new Perhaps((b, f) => perhaps(b, f.compose(g)))
  def toOption: Option[A] = perhaps(None.asInstanceOf[B], a => Some(a).asInstanceOf[B]).asInstanceOf[Option[A]]
}

object Perhaps {
  def present[A, B](a: A): Perhaps[A, B] = new Perhaps((_, f) => f(a))
  def absent[A, B]: Perhaps[A, B] = new Perhaps((b, _) => b)
}

Simplify and unify naming conventions

We have inconsistent naming conventions, and somewhat redundant given everything is scoped to the zio package. Since we are quickly approaching 1.0 of the library, we should make all changes now before the names are locked in for some time.

  • IOApp => App
  • IOQueue => Queue
  • IORef => Ref

Opportunities for other clear wins should also be investigated.

Implement `shift` combinator

I think one of the useful features that we need to add to ZIO is a thread shifting.
Like io.executeOn(ioScheduler) in Monix

`Error` typeclass

In the same vein as Haskell's Exception

In addition to be just a marker TC it could have functions such as defaultError which could be useful to define a meaningful MonadPlus for IO

IO.forkAll should return a composite fiber

Right now we have:

def forkAll[E, A](ios: List[IO[E, Unit]]): IO[E, Unit]

Instead we should have:

def forkAll[E, E2, A](as: List[IO[E, A]]): IO[E2, Fiber[E, List[A]]]

Capture all throwables in ExitResult

ExitResult should have a List of throwables, which collectively represent all reasons for termination in order—including defects in finalizers.

object ExitResult {
  final case class Completed[E, A](value: A)          extends ExitResult[E, A]
  final case class Failed[E, A](error: E)             extends ExitResult[E, A]
  final case class Terminated[E, A](errors: List[Throwable]) extends ExitResult[E, A]
}

Various changes are possible, including changing interrupt:

def interrupt[E](causes: Throwable*): IO[E, Unit]

This way it is not necessary to specify a cause, or in fact, multiple causes can be specified.

All causes for a termination will have to be collected, but much of the work is already done, since defects in finalizers are already aggregated into a List[Throwable].

Feature: Async Socket IO

So I wanted to get someones thoughts on implementing an async IO implementation using ZIO's fibers. I think this is doable in a clean way. Essentially I could use an AsynchronousSocketChannel and initiate it with the RTS thread pool. I could then proceed to implement support for async IO using fibers. My concern would be stealing threads from the RTS , would this be a problem (I have not evaluated the fiber implementation in detail)? My idea behind using this thread pool is to try and keep the application using 1 thread pool to prevent contention for resources with other pools. I would assume this would not be a problem if I ensure we never block threads (preventing starvation).

I could add this implementation as another open source library or add it to this one if you feel its appropriate.

WIP: Improve definition of supervisor (DO NOT IMPLEMENT)

We now have Fiber.onComplete, which looks like this:

def onComplete(k: ExitResult[E, A] => IO[Nothing, Unit]): IO[Nothing, Unit]

This looks like a type of "supervisor", because anyone who calls this method can now see if and how the fiber exits (including all errors).

Currently, when a fiber is forked, an unhandled error handler may be specified, in the form List[Throwable] => IO[Nothing, Unit]. If this is not specified, then the uncaught error handler is used from the fiber that forks the fiber (all the way up to the root uncaught error handler).

These two methods of "supervising" a fiber look very similar, and suggest the two notions of "supervising" could be unified in some fashion.

Meanwhile, we have a separate supervise combinator:

IO.supervise {
  for {
    ...
  } ...
}

The supervise combinator allows collection of forked fibers into a weak set, so that cleanup may be performed upon exit of the specified IO.

This notion is similar to the other in the sense that it's a way of supervising the activity of child fibers.

It's possible all three notions of "supervising" can be unified somehow.

Add `IO.mergeAll`

Uses par for parallel composition.

object IO {
 def mergeAll[E, A](in: TraversableOnce[IO[E, A]])(zero: B, f: (B, A) => B): IO[E, B]
}

Remove for comprehensions in RTS.scala

For comprehensions in RTS.scala add a useless final map operation, which slows down performance. They can all be eliminated by rewriting in terms of flatMap.

Fix issues in finalization

Add Test Cases

Add test cases from #7 and #15.

Uninterruptible Finalization

During finalization, there cannot be any possibility of interruption.

Take this:

// Do not interrupt finalization:
this.noInterrupt += 1

curIo = ensuringUninterruptibleExit(finalization[E] *> completer)

Change to:

// Do not interrupt finalization (FOREVER, until the end of the fiber's life):
this.noInterrupt += 1

curIo = finalization[E] *> completer

Take this:

// Do not interrupt finalization:
this.noInterrupt += 1

curIo = ensuringUninterruptibleExit(finalization[E] *> completer)

Change to:

// Do not interrupt finalization (FOREVER, until the end of the fiber's life):
this.noInterrupt += 1

curIo = finalization[E] *> completer

Volatile Single-Fiber State

Take this:

private[this] var noInterrupt                               = 0
private[this] var supervised: List[Set[FiberContext[_, _]]] = Nil
private[this] var supervising   
@volatile private[this] var noInterrupt                               = 0
@volatile private[this] var supervised: List[Set[FiberContext[_, _]]] = Nil
@volatile private[this] var supervising   

Fix Enter Async Logic

enterAsyncStart does not handle Interrupting state. The method can be changed to return -1 if the state is interrupting.

Something like:

def enterAsyncStart(): Int = {
  ...
    case Interrupting(_, _, _) => -1
  ...
}

The return value can be checked:

val id = enterAsyncStart()

if (id < 0) {
  // The fiber is being interrupted, we do not want to continue
  // Do not interrupt interruption (FOREVER, until the end of the fiber's life):
  noInterrupt += 1

  curIo = IO.terminate(interruptingError)
} else {
  ...
}

where interruptingError is a new method that extracts out the error from an Interrupting state.

def interruptingError: Throwable = state.get match {
  case Interrupting(error, _, _) => error
  case v => new Error("Defect in Fiber: There is no interrupting error because state is " + v)
}

Question on behavior

Hi all,

In order to ensure that Cats-Effect 1.0.0 supports ZIO and Monix's Task auto-cancelable behavior, I may have discovered a problem with an assumption about start (fork) that I need to verify with you.

In this piece of code, is the specified release guaranteed to run?

val task = IO(1).bracket(release)(use)

task.fork.flatMap(f => f.cancel *> f.join)

The answer is probably no, but want to double check with you.

Thanks,

Implement support for Scala.js

  • Extract Platform from RTS to encapsulate platform-specific functionality.
  • Create PlatformJs and PlatformJvm
  • Create RTS using PlatformJs, and RTS using PlatformJvm
  • Make all tests pass, or split up tests if some require multithreading

`Fiber.interrupt` sometimes returns before fiber is actually interrupted

By spec, Fiber.interrupt should not return until the fiber is interrupted and all finalizers have been successfully run. Yet this test and at least one other in IOQueueSpec demonstrate that interrupt is returning prematurely. This may be as simple as a @volatile bug somewhere or something more complex. It may be related to #7.

 def e9 = unsafePerformIO(
    for {
      queue <- IOQueue.make[Void, Int](100)
      f     <- queue.take[Void].fork
      _     <- f.interrupt(new Exception("interrupt fiber in e9"))
      _     <- waitForSize(queue, 0)
      size  <- queue.size[Void]
    } yield size must_=== 0
  )

Introduce Void

Using Nothing is too painful. We should introduce a simple Void:

abstract final class Void {
  def absurd[A]: A
}

And use wherever Nothing is currently used.

Add `IO.stream`?

The following (from J. Gibbons):

  final def stream[E, A, B, C](consume: B => Option[IO[E, (C, B)]])(produce: (B, A) => IO[E, B])(seed: B)(in: List[A]): IO[E, List[C]] =
    consume(seed) match {
      case Some(io) => for {
        r <- io
        (c, b) = r
        cs <- stream(consume)(produce)(b)(in)
      } yield c :: cs

      case None => in match {
        case Nil => IO.now(Nil)
        case x :: xs => produce(seed, x).flatMap(stream(consume)(produce)(_)(xs))
      }
    }

models (potentially infinite) interaction between processes. Other primitives (merging, forking, zipping, chunking, etc.) can be built on top of it. Not reactive stream grade but principled.

List is not lazy, however one could imagine a type IOStream[E, A], schematically:

abstract class IOStream[E, A] {
  def head: IO[E, A]
  def tail: IO[E, IOStream[E, A]]
}

I was wondering if it was of interest, or completely beyond the scope of IO?

Add `IO.reduceAll`

Uses par for parallel composition.

object IO {
 def reduceAll[E, A](a: IO[E, A], as: TraversableOnce[IO[E, A]])(f: (A, A) => A): IO[E, A]
}

Remove bracket as a primitive, express using other more core operations

General sketch of idea:

def bracket[E, A, B](acquire: IO[E, A])(release: ExitResult[E, A] => IO[Void, Unit])(use: A => IO[E, B]): IO[E, B] = 
  for {
    ref <- IORef[Option[ExitResult[E, A]]](None)
    b   <- (for {
             e <- acquire.run.flatMap(e => ref.write(Some(e)) *> IO.now(e)).uninterruptibly
             b <- e match {
                    case ExitResult.Completed(b)  => IO.now(b)
                    case ExitResult.Terminated(t) => IO.terminate(t)
                    case ExitResult.Failed(e)     => IO.fail(e)
                  }
           } yield b).ensuring(ref.read.flatMap(_.fold(IO.unit)(release)).uninterruptibly)
  } yield b
  • Introduce derived bracket
  • Introduce primitive Ensuring
  • Remove primitive Bracket, replace with derived functionality

Add `IO.raceAll`

raceAll will rest any collection of IO actions and return the first one, or if all of them fail, it will return the last error.

object IO {
   def raceAll[E, A](t: TraversableOnce[IO[E, A]]): IO[E, A]
}

Add `IO.sequence`

Uses par for parallel composition.

object IO {
  def sequence[E, A, M[X] <: TraversableOnce[X]](in: M[IO[E, A]])(
    implicit cbf: CanBuildFrom[M[IO[E, A]], A, M[A]]): IO[E, M[A]]
}

Interrupt blocking IO

Extend the fiber interruption framework to interrupt blocking IO via Thread.interrupt.

Add `IO.traverse`

Uses par for parallel composition.

object IO {
  def traverse[E, A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => IO[E, B])(
    implicit cbf: CanBuildFrom[M[A], B, M[B]]): IO[E, M[B]]
}

Fix issues with non-interruption after failure state

Following #53, interruption can be made more precise, which may address #64.

Currently uninterruptibly is implemented like so:

                    curIo = for {
                      _ <- enterUninterruptible
                      v <- io.io
                      _ <- exitUninterruptible
                    } yield v

This is not actually precise, because if io.io. fails, then the fiber will be in a permanently non-interruptible state. In theory, this can be solved by:

                    curIo = for {
                      _ <- enterUninterruptible
                      e <- io.io.attempt
                      _ <- exitUninterruptible
                      v <- e.fold(IO.fail)(IO.now)
                    } yield v

But this makes it much slower. With Ensuring becoming a fast operation thanks to #53, we can do better:

                    curIo = enterUninterruptible.flatMap(_ => io.io.ensuring(exitUninterruptible))

Because enterUninterruptible cannot fail, and because once in such a state, the fiber cannot be interrupted during the flatMap, the only place for the computation to fail is io.io, which is now attached with a finalizer that exits the uninterruptible section.

Now some care is needed elsewhere. In particular, when a Finalizer is being run.

def apply(a: Any): IO[E, Any] = finalizer.widenError[E] *> IO.now(a)

In the failure scenario, if the finalizer is popped off the stack during error or interruption, then it's uninterruptible status must be guaranteed (and, it seems like this is true!).

But in the success scenario, if the Finalizer is popped off the stack via nextInstr, then it's quite possible that the action will be interrupted before anything inside the Finalizer.finalizer is actually run.

In order to guard against this, one option is to modify nextInstr so that it checks to see if the next instruction is a Finalizer, and if so, it enters an uninterruptible section (noInterrupt += 1), and pushes another action on the stack (_ => exitUninterruptible) to exit the interruptible section after the finalizer has been executed.

Note there is no need and of course it would create a circular loop for ensuring to be used here. The reason is that if the finalizer fails (which is the only way the above composite io can fail), then the fiber will be terminated, so the lack of exiting the uninterruptible section has no effect.

Other logic around interruption / finalizers should be checked and confirmed in this ticket.

Promise example looks weird

This is the current example for using Promise in Promise.scala:

for {
  promise <- Promise.make[Nothing, Int]
  _ <- IO.sleep(1.second).promise.complete(42).fork
  value   <- promise.get // Resumes when forked fiber completes promise
} yield value

If I understand correctly, it should be

_ <- (IO.sleep(1.second) *> promise.complete(42)).fork

Motivation for Void

I believe we made a mistake by introducing Void as a newtype. Could someone provide some motivation for it?

Name clash with java.lang.Void

It is really annoying.

Dead code warnings

Dead code warnings are not real warnings in pure code. The only case where they make sense is if you "accidentally" create a "value" of type Nothing, e.g.

def foo: Unit = {
  doStuff
  val x = throw new Foo // A "value" of type Nothing
  doMoreStuff(x) // Dead code
}

It does not make any sense in pure code, because the only way we can obtain a value of type Nothing is if someone gives it to us:

def thisWillNeverRun(v: Nothing): Int = {
  (v : List[List[Int]]).head.head
}

this code is indeed dead, but how is it a useful warning? What should we be worried about? That we accidentally put Nothing as an argument?

Resolution issues

The original motivation for Void (at least in leibniz) was to work around the implicit resolution bugs, that apparently existed back in the day. There is no indication that they still exist in 2.12.6 (I asked around and no one knew any examples), and regardless of that,

type Bottom <: Nothing

would work equally well w.r.t. implicit resolution.

Scalaz depending on ZIO for Void

It's not a problem, but it is confusing IMHO.

Implement a high-performance ring buffer for RTS built using CAS primitives.

The ring buffer must provide support for a limited number of readers (equal to thread count), and potentially a higher number of writers. That is, many places in the code will add work to the queue, but few will read them.

In addition, metrics are important to create good preemption. Metrics on how many ops per second, metrics on how many work units per queue, and so forth, can all be good to feed into an algorithm that decides whether or not a current thread should cooperatively yield to another fiber.

Create IOQueue.bounded, IOQueue.unbounded

The documentation should state that bounded queues do not lose elements when they reach maximum capacity—rather, they suspend new offers.

The unbounded implementation can simply create a bounded queue with Int.MaxValue. Because there is no pre-allocation, this should work fine.

/cc @adamw

Bug in finalization?

Reported by @alexandru here. This may be the same bug identified by @wi101.

Ah, you're right, so you are sequencing the execution of finalisers. Except that I found a bug, some sort of race condition.
Sample 1 — in which the inner finalizer never gets executed:

    val task = IO.unit[Void].bracket[Unit] { _ =>
      IO.sync[Void, Unit](println("start 1")) *> IO.sleep(1.second) *> IO.sync(println("release 1"))
    } { _ =>
      IO.unit[Void].bracket[Unit] { _ =>
        IO.sync[Void, Unit](println("start 2")) *> IO.sleep(1.second) *> IO.sync(println("release 2"))
      } { _ =>
        IO.never[Void, Unit]
      }
    }
    task.fork.flatMap(f => f.interrupt(new RuntimeException("cancel")) *> IO.never)

This fails with:

java.lang.Error: Defect: Fiber is not in executing or async state
	at scalaz.ioeffect.RTS$FiberContext.enterAsyncStart(RTS.scala:931)
	at scalaz.ioeffect.RTS$FiberContext.evaluate(RTS.scala:514)
	at scalaz.ioeffect.RTS$FiberContext.$anonfun$fork$1(RTS.scala:761)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scalaz.ioeffect.RTS$$anon$1.run(RTS.scala:117)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
start 1
release 1

Sample 2 — the first specified finalizer gets executed twice somehow, which is really odd:

    IO.unit[Void]
      .bracket[Unit](_ => IO.sync[Void, Unit](println("start 1")) *> IO.sleep(1.second) *> IO.sync(println("release 1")))(_ => IO.unit[Void])
      .bracket[ExitStatus](_ => IO.sync[Void, Unit](println("start 2")) *> IO.sleep(1.second) *> IO.sync(println("release 2")))(_ => IO.sync(ExitStatus.ExitNow(0)))
      .fork.flatMap(f => f.interrupt(new RuntimeException("cancel")) *> IO.never)

Output:

start 1
start 1
java.lang.Error: Defect: Fiber is not in executing or async state
	at scalaz.ioeffect.RTS$FiberContext.enterAsyncStart(RTS.scala:931)
	at scalaz.ioeffect.RTS$FiberContext.evaluate(RTS.scala:514)
	at scalaz.ioeffect.RTS$FiberContext.$anonfun$fork$1(RTS.scala:761)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scalaz.ioeffect.RTS$$anon$1.run(RTS.scala:117)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
release 1

And this behavior is what made me jump to conclusions.

Implement composable RTS

sealed abstract class ThreadPool extends Serializable
object ThreadPool {
  def fromExecutorService(e: ExecutorService): ThreadPool = ???
  def fromExecutionContext(e: ExecutionContext): ThreadPool = ???
}

private[zio] trait ZioThreadPool extends ThreadPool {
  def concurrency: Int
  def capacity: Int 
  
  // Metrics
  def size: Int
  def enqueuedCount: Int 
  def dequeuedCount: Int

  def submit(runnable: Runnable): Boolean
  def shutdown(wait: Duration): Unit
  def shutdownNow: Unit
}

case class Environment(
  unhandled: Cause[Any] => IO[Nothing, Unit], 
  threadPool: ThreadPool)

trait RTS {
  def unsafeRun[E, A](env: Env)(io: IO[E, A]): A
  def unsafeRunSync[E, A](env: Env)(io: IO[E, A]): ExitResult[E, A]
  def unsafeRunAsync[E, A](env: Env)(io: IO[E, A], f: ExitResult[E, A] => Unit): Unit
}

Add IO.done

object IO {
  ...
  def done[E, A](r: ExitResult[E, A]): IO[E, A] = e match {
    case ExitResult.Completed(b)  => IO.now(b)
    case ExitResult.Terminated(t) => IO.terminate(t)
    case ExitResult.Failed(e)     => IO.fail(e)
  }
  ...
}

Interruption doesn't work on fibers which have used bracket

It's not that easy to reproduce but here's the minimal example I arrived at:

object X extends App with RTS {
  import scala.concurrent.duration._

  val c = new AtomicInteger(0)

  val test: IO[Throwable, Unit] = IO
    .syncThrowable {
      println(System.currentTimeMillis() + " working")
      Thread.sleep(100)
      if (c.incrementAndGet() <= 1) {
        throw new RuntimeException("x")
      } else {
        ()
      }
    }
    .forever
    .ensuring(IO.sync(println("CLEANUP; RESTARTING")))
    .attempt
    .forever

  unsafePerformIO[Void, Unit](
    test
      .fork[Void]
      .flatMap(f => IO.sleep[Void](1.second) *> f.interrupt(new RuntimeException("y"))))
}

Even though the fiber is interrupted after 1 second, it ends up printing working infinitely. Interruption works correctly if the bracket (ensuring) never runs (so if throw new RuntimeException("x") is never executed) or if there's no bracket at all.

The double-forever-loop is there as the original concept was to acquire a resource, use it in a loop, and if there's an error clean up and acquire again (hence the second loop).

fs2 `join` doesn't work with zio `IO` via `Effect` instance ported from `ioeffect`

The simplest join such as this one hangs forever while with cats IO and monix Task it works perfectly:

val one: F[Int] = Effect[F].delay(1)
val s: Stream[F, Int] = Stream.eval(one)
val ss: Stream[F, Stream[F, Int]] = Stream.emits(List(s, s))
ss.join(2).compile.toList

I've already got the fix for this, raising a PR in a minutes

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.