zio / zio Goto Github PK
View Code? Open in Web Editor NEWZIO — A type-safe, composable library for async and concurrent programming in Scala
Home Page: https://zio.dev
License: Apache License 2.0
ZIO — A type-safe, composable library for async and concurrent programming in Scala
Home Page: https://zio.dev
License: Apache License 2.0
raceAll
will rest any collection of IO
actions and return the first one, or if all of them fail, it will return the last error.
object IO {
def raceAll[E, A](t: TraversableOnce[IO[E, A]]): IO[E, A]
}
We now have Fiber.onComplete
, which looks like this:
def onComplete(k: ExitResult[E, A] => IO[Nothing, Unit]): IO[Nothing, Unit]
This looks like a type of "supervisor", because anyone who calls this method can now see if and how the fiber exits (including all errors).
Currently, when a fiber is forked, an unhandled error handler may be specified, in the form List[Throwable] => IO[Nothing, Unit]
. If this is not specified, then the uncaught error handler is used from the fiber that forks the fiber (all the way up to the root uncaught error handler).
These two methods of "supervising" a fiber look very similar, and suggest the two notions of "supervising" could be unified in some fashion.
Meanwhile, we have a separate supervise
combinator:
IO.supervise {
for {
...
} ...
}
The supervise
combinator allows collection of forked fibers into a weak set, so that cleanup may be performed upon exit of the specified IO
.
This notion is similar to the other in the sense that it's a way of supervising the activity of child fibers.
It's possible all three notions of "supervising" can be unified somehow.
Uses par
for parallel composition.
object IO {
def mergeAll[E, A](in: TraversableOnce[IO[E, A]])(zero: B, f: (B, A) => B): IO[E, B]
}
It requires cats testkit to be updated to latest scalacheck
In the same vein as Haskell's Exception
In addition to be just a marker TC it could have functions such as defaultError
which could be useful to define a meaningful MonadPlus
for IO
Uses par
for parallel composition.
object IO {
def reduceAll[E, A](a: IO[E, A], as: TraversableOnce[IO[E, A]])(f: (A, A) => A): IO[E, A]
}
This is the current example for using Promise in Promise.scala
:
for {
promise <- Promise.make[Nothing, Int]
_ <- IO.sleep(1.second).promise.complete(42).fork
value <- promise.get // Resumes when forked fiber completes promise
} yield value
If I understand correctly, it should be
_ <- (IO.sleep(1.second) *> promise.complete(42)).fork
The simplest join
such as this one hangs forever while with cats IO
and monix Task
it works perfectly:
val one: F[Int] = Effect[F].delay(1)
val s: Stream[F, Int] = Stream.eval(one)
val ss: Stream[F, Stream[F, Int]] = Stream.emits(List(s, s))
ss.join(2).compile.toList
I've already got the fix for this, raising a PR in a minutes
This ticket is to create machinery to give us a microsite at http://scalaz.github/scalaz-zio/
.
Copy and examples can be added later, and are not considered part of this ticket.
Edit by @NeQuissimus:
The microsite contains various example snippets in scala
code blocks that do not currently compile.
We should replace those with tut
blocks that are type-checked and executed as part of the microsite generation.
Moved from scalaz/scalaz#1868
Hello,
I've witnessed with great excitement the recent development in effect-full/asynchronous programming, like the IO
monad in scalaz
by @jdegoes (and other Scala libs). Also, the need to work around the lack of green threads is pressing in other languages. I've been reading @wingo 's path to implement concurrency in Guile (and Lua) -- he drew inspiration from Concurrent ML. Indeed it seems like, although it has a branding problem, Concurrent ML is what smart people recommend/use.
So my question would be, if there is something we could learn from CML or, more specifically, Hopac (which is a CML inspired library for F#)?
There's this hierarchy like
Job<_>
|
Alt<_>
|
+-------+--------+----+-----+--------+--------+
| | | | | |
Ch<_> Latch Mailbox<_> MVar<_> Proc Promise<_>
|
IVar<_>
The most interesting types there are MVar
, IVar
and Ch
annel. In Scala(z) it could be described by a typeclass hierarchy like
trait AutoCloseableAsync[C, M[+_, _]] {
def close(c:C): M[Exception, Unit]
}
trait CloseableAsync[C, M[+_, _]] extends AutoCloseableAsync[C, M] {
def close(c:C): M[IOException, Unit]
}
trait Job[Jb[_, _]] extends Monad[Jb] {
def delay[E, A](j: Unit => Jb[E, A]): Jb[E, A]
def thunk[E, A](j: Unit => A): Jb[E, A]
def raises[E, A](e: E): Jb[E, A]
def tryIn[E1, E2, A, B](j: Jb[E1, A])(fa: A => Jb[E2, B])(fe: E1 => Jb[E2, B]): Jb[E2, B]
def catches[E1, E2, A](j: Jb[E1, A]): Jb[E2, E1 \/ A]
def using[E, A : AutoCloseableAsync, B](x: A)(fa: A => Jb[E, B]): Jb[Exception \/ E, B]
def using[E, A : CloseableAsync, B](x: A)(fa: A => Jb[E, B]): Jb[IOException \/ E, B]
def using[E, A, B](x: A)(fa: A => Jb[E, B])(dispose: A => Jb[E, Unit]): Jb[E, B]
}
trait Event[Evnt[_, _]] extends Job[Evnt] {
def never[E, A]: Evnt[E, A]
def choose[E, A](e1:Evnt[E, A], e2:Evnt[E, A]): Evnt[E, A]
def choose[E, A](es:Evnt[E, A]*): Evnt[E, A]
def choose[E, A](es:Iterable[Evnt[E, A]]): Evnt[E, A]
def withNackJob[E, A, Prms[_, _]:Promise_, Jb[_, _]:Job](f: Prms[E, Unit] => Jb[E, Evnt[E, A]]): Evnt[E, A]
}
trait Promise_[Prms[_, _]] extends Event[Prms] {
def read[E, A, Evnt[_, _]:Event](p:Prms[E, A]): Evnt[E, A]
def start[E, A, Jb[_, _]](j:Jb[E, A]): Jb[Void, Prms[E, A]]
}
trait IVar[Ivr[_, _]] extends Promise_[Ivr] {
def empty[E, A]: Ivr[E, A]
def fill[E, A, Jb[_, _]](i:Ivr[E, A])(x:A): Jb[E, Unit]
}
trait Channel[Ch[_, _]] extends Event[Ch] {
def give[E, A, Evnt[_, _]:Event](ch:Ch[E, A])(x:A): Evnt[E, Unit]
def send[E, A, Jb[_, _]:Job](ch:Ch[E, A])(x:A): Jb[E, Unit]
def take[E, A, Evnt[_, _]:Event](ch:Ch[E, A]): Evnt[E, A]
def tryGive[E, A, Jb[_, _]:Job](ch:Ch[E, A])(x:A): Jb[E, Boolean]
def tryTake[E, A, Jb[_, _]:Job](ch:Ch[E, A]): Jb[E, Maybe[A]]
}
trait Latch[Ltch[_, Unit]] extends Event[Ltch] {
def create[E](count:Int): Ltch[E, Unit]
def await[E, Evnt[_, _]:Event](l: Ltch[E, Unit]): Evnt[E, Unit]
def decrement[E, Jb[_, _]:Job](l: Ltch[E, Unit]): Jb[E, Unit]
def increment[E, Jb[_, _]:Job](l: Ltch[E, Unit]): Jb[E, Unit]
}
trait MVar[Mvr[_, _]] extends Event[Mvr] {
def empty[E, A]: Mvr[E, A]
def read[E, A, Evnt[_, _]:Event](m:Mvr[E, A]): Evnt[E, A]
def fill[E, A, Jb[_, _]](m:Mvr[E, A])(x:A): Jb[E, Unit]
def take[E, A, Evnt[_, _]:Event](m:Mvr[E, A]): Evnt[E, A]
}
trait Mailbox[Mlbx[_, _]] extends Event[Mlbx] {
def empty[E, A]: Mlbx[E, A]
def send[E, A, Jb[_, _]](m:Mlbx[E, A])(x:A): Jb[E, Unit]
def take[E, A, Evnt[_, _]:Event](m:Mlbx[E, A]): Evnt[E, A]
}
trait Proc[Prc[_, Unit]] extends Event[Prc] {
def start[E, A, Jb[_, _]:Job](j:Jb[E, Unit]): Jb[Void, Prc[E, Unit]]
def bind[E, A, Jb[_, _]:Job](f: Prc[E, Unit] => Jb[E, A]): Jb[E, A]
def self[E, Jb[_, _]:Job]: Jb[E, Prc[E, Unit]]
def join[E, Evnt[_, _]:Event](p:Prc[E, Unit]): Evnt[E, Unit]
}
Then there's also a Stream
implementation -- kind of like
sealed trait Cons[A]
case object Nil extends Cons[Void]
case class ConsCase[A](value: A, next: Promise[Cons[A]])
type Stream[A] = Promise[Cons[A]]
What do you guys think of this? Can Scalaz 8 take something from CML/Hopac? This is not a deeply thought through proposal, just very rough (not even complete) sketch to kick off a discussion.
When it's close to be released and main libraries have adopted it
Probably should be done after #22 so the framework is already in place.
@jdegoes you in scalaz/scalaz#1798 specify that replacements for Maybe
and \/
should be church-encoded. Why exactly? as opposed to sum types.
So far I've got this, but dunno how to get rid of B
type parameter, it should be using some sort of Haskell's forall
but I can't figure out how to express that in Scala.
final class Perhaps[A, B] private (val perhaps: (B, A => B) => B) extends AnyVal {
def map[C](g: A => C): Perhaps[C, B] = new Perhaps((b, f) => perhaps(b, f.compose(g)))
def toOption: Option[A] = perhaps(None.asInstanceOf[B], a => Some(a).asInstanceOf[B]).asInstanceOf[Option[A]]
}
object Perhaps {
def present[A, B](a: A): Perhaps[A, B] = new Perhaps((_, f) => f(a))
def absent[A, B]: Perhaps[A, B] = new Perhaps((b, _) => b)
}
I believe we made a mistake by introducing Void
as a newtype. Could someone provide some motivation for it?
It is really annoying.
Dead code warnings are not real warnings in pure code. The only case where they make sense is if you "accidentally" create a "value" of type Nothing
, e.g.
def foo: Unit = {
doStuff
val x = throw new Foo // A "value" of type Nothing
doMoreStuff(x) // Dead code
}
It does not make any sense in pure code, because the only way we can obtain a value of type Nothing
is if someone gives it to us:
def thisWillNeverRun(v: Nothing): Int = {
(v : List[List[Int]]).head.head
}
this code is indeed dead, but how is it a useful warning? What should we be worried about? That we accidentally put Nothing
as an argument?
The original motivation for Void
(at least in leibniz
) was to work around the implicit resolution bugs, that apparently existed back in the day. There is no indication that they still exist in 2.12.6 (I asked around and no one knew any examples), and regardless of that,
type Bottom <: Nothing
would work equally well w.r.t. implicit resolution.
It's not a problem, but it is confusing IMHO.
sealed abstract class ThreadPool extends Serializable
object ThreadPool {
def fromExecutorService(e: ExecutorService): ThreadPool = ???
def fromExecutionContext(e: ExecutionContext): ThreadPool = ???
}
private[zio] trait ZioThreadPool extends ThreadPool {
def concurrency: Int
def capacity: Int
// Metrics
def size: Int
def enqueuedCount: Int
def dequeuedCount: Int
def submit(runnable: Runnable): Boolean
def shutdown(wait: Duration): Unit
def shutdownNow: Unit
}
case class Environment(
unhandled: Cause[Any] => IO[Nothing, Unit],
threadPool: ThreadPool)
trait RTS {
def unsafeRun[E, A](env: Env)(io: IO[E, A]): A
def unsafeRunSync[E, A](env: Env)(io: IO[E, A]): ExitResult[E, A]
def unsafeRunAsync[E, A](env: Env)(io: IO[E, A], f: ExitResult[E, A] => Unit): Unit
}
For comprehensions in RTS.scala
add a useless final map
operation, which slows down performance. They can all be eliminated by rewriting in terms of flatMap
.
Uses par
for parallel composition.
object IO {
def traverse[E, A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => IO[E, B])(
implicit cbf: CanBuildFrom[M[A], B, M[B]]): IO[E, M[B]]
}
For consistency with others IO abstractions.
Rename the following three methods in RTS.scala
:
unsafePerformIO
=> unsafeRun
unsafePerformIOAsync
=> unsafeRunAsync
tryUnsafePerformIO
=> unsafeRunSync
This includes but is not limited to:
IOQueue
IOQueueSpec
IORefSpec
KleisliIO
KleisliIOSpec
PromiseSpec
And miscellaneous bug fixes and helpers added to IO
, RTS
, etc.
Add test cases from #7 and #15.
During finalization, there cannot be any possibility of interruption.
Take this:
// Do not interrupt finalization:
this.noInterrupt += 1
curIo = ensuringUninterruptibleExit(finalization[E] *> completer)
Change to:
// Do not interrupt finalization (FOREVER, until the end of the fiber's life):
this.noInterrupt += 1
curIo = finalization[E] *> completer
Take this:
// Do not interrupt finalization:
this.noInterrupt += 1
curIo = ensuringUninterruptibleExit(finalization[E] *> completer)
Change to:
// Do not interrupt finalization (FOREVER, until the end of the fiber's life):
this.noInterrupt += 1
curIo = finalization[E] *> completer
Take this:
private[this] var noInterrupt = 0
private[this] var supervised: List[Set[FiberContext[_, _]]] = Nil
private[this] var supervising
@volatile private[this] var noInterrupt = 0
@volatile private[this] var supervised: List[Set[FiberContext[_, _]]] = Nil
@volatile private[this] var supervising
enterAsyncStart
does not handle Interrupting
state. The method can be changed to return -1
if the state is interrupting.
Something like:
def enterAsyncStart(): Int = {
...
case Interrupting(_, _, _) => -1
...
}
The return value can be checked:
val id = enterAsyncStart()
if (id < 0) {
// The fiber is being interrupted, we do not want to continue
// Do not interrupt interruption (FOREVER, until the end of the fiber's life):
noInterrupt += 1
curIo = IO.terminate(interruptingError)
} else {
...
}
where interruptingError
is a new method that extracts out the error from an Interrupting
state.
def interruptingError: Throwable = state.get match {
case Interrupting(error, _, _) => error
case v => new Error("Defect in Fiber: There is no interrupting error because state is " + v)
}
Extend the fiber interruption framework to interrupt blocking IO via Thread.interrupt
.
The ring buffer must provide support for a limited number of readers (equal to thread count), and potentially a higher number of writers. That is, many places in the code will add work to the queue, but few will read them.
In addition, metrics are important to create good preemption. Metrics on how many ops per second, metrics on how many work units per queue, and so forth, can all be good to feed into an algorithm that decides whether or not a current thread should cooperatively yield to another fiber.
object IO {
...
def done[E, A](r: ExitResult[E, A]): IO[E, A] = e match {
case ExitResult.Completed(b) => IO.now(b)
case ExitResult.Terminated(t) => IO.terminate(t)
case ExitResult.Failed(e) => IO.fail(e)
}
...
}
Using Nothing
is too painful. We should introduce a simple Void
:
abstract final class Void {
def absurd[A]: A
}
And use wherever Nothing
is currently used.
shouldn't the main class not be called ZIO
? 😄
Then IO[a]
could be an alias to ZIO[Throwable, a]
Interrupting fiber join or fiber interrupt should not leak resources.
fiber.interrupt.fork(_.interrupt)
fiber.join.fork(_.interrupt)
This requires modifying the implementation of join
and interrupt
to return Async.maybeLater
values, with cancelers that do the right thing when invoked.
Right now we have:
def forkAll[E, A](ios: List[IO[E, Unit]]): IO[E, Unit]
Instead we should have:
def forkAll[E, E2, A](as: List[IO[E, A]]): IO[E2, Fiber[E, List[A]]]
Hi all,
In order to ensure that Cats-Effect 1.0.0 supports ZIO and Monix's Task auto-cancelable behavior, I may have discovered a problem with an assumption about start
(fork
) that I need to verify with you.
In this piece of code, is the specified release
guaranteed to run?
val task = IO(1).bracket(release)(use)
task.fork.flatMap(f => f.cancel *> f.join)
The answer is probably no, but want to double check with you.
Thanks,
The following (from J. Gibbons):
final def stream[E, A, B, C](consume: B => Option[IO[E, (C, B)]])(produce: (B, A) => IO[E, B])(seed: B)(in: List[A]): IO[E, List[C]] =
consume(seed) match {
case Some(io) => for {
r <- io
(c, b) = r
cs <- stream(consume)(produce)(b)(in)
} yield c :: cs
case None => in match {
case Nil => IO.now(Nil)
case x :: xs => produce(seed, x).flatMap(stream(consume)(produce)(_)(xs))
}
}
models (potentially infinite) interaction between processes. Other primitives (merging, forking, zipping, chunking, etc.) can be built on top of it. Not reactive stream grade but principled.
List
is not lazy, however one could imagine a type IOStream[E, A]
, schematically:
abstract class IOStream[E, A] {
def head: IO[E, A]
def tail: IO[E, IOStream[E, A]]
}
I was wondering if it was of interest, or completely beyond the scope of IO
?
So I wanted to get someones thoughts on implementing an async IO implementation using ZIO's fibers. I think this is doable in a clean way. Essentially I could use an AsynchronousSocketChannel and initiate it with the RTS thread pool. I could then proceed to implement support for async IO using fibers. My concern would be stealing threads from the RTS , would this be a problem (I have not evaluated the fiber implementation in detail)? My idea behind using this thread pool is to try and keep the application using 1 thread pool to prevent contention for resources with other pools. I would assume this would not be a problem if I ensure we never block threads (preventing starvation).
I could add this implementation as another open source library or add it to this one if you feel its appropriate.
General sketch of idea:
def bracket[E, A, B](acquire: IO[E, A])(release: ExitResult[E, A] => IO[Void, Unit])(use: A => IO[E, B]): IO[E, B] =
for {
ref <- IORef[Option[ExitResult[E, A]]](None)
b <- (for {
e <- acquire.run.flatMap(e => ref.write(Some(e)) *> IO.now(e)).uninterruptibly
b <- e match {
case ExitResult.Completed(b) => IO.now(b)
case ExitResult.Terminated(t) => IO.terminate(t)
case ExitResult.Failed(e) => IO.fail(e)
}
} yield b).ensuring(ref.read.flatMap(_.fold(IO.unit)(release)).uninterruptibly)
} yield b
Platform
from RTS
to encapsulate platform-specific functionality.PlatformJs
and PlatformJvm
RTS
using PlatformJs
, and RTS
using PlatformJvm
We have inconsistent naming conventions, and somewhat redundant given everything is scoped to the zio
package. Since we are quickly approaching 1.0 of the library, we should make all changes now before the names are locked in for some time.
IOApp
=> App
IOQueue
=> Queue
IORef
=> Ref
Opportunities for other clear wins should also be investigated.
ExitResult
should have a List
of throwables, which collectively represent all reasons for termination in order—including defects in finalizers.
object ExitResult {
final case class Completed[E, A](value: A) extends ExitResult[E, A]
final case class Failed[E, A](error: E) extends ExitResult[E, A]
final case class Terminated[E, A](errors: List[Throwable]) extends ExitResult[E, A]
}
Various changes are possible, including changing interrupt
:
def interrupt[E](causes: Throwable*): IO[E, Unit]
This way it is not necessary to specify a cause, or in fact, multiple causes can be specified.
All causes for a termination will have to be collected, but much of the work is already done, since defects in finalizers are already aggregated into a List[Throwable]
.
Instead of being a function A => IO[E, B]
, it should have a field called run
which is a function A => IO[E, B]
.
IOApp
can be modified to add a shutdown hook that interrupts the main fiber.
This will allow clean termination of main programs.
Runtime.getRuntime().addShutdownHook(new Thread...)
Uses par
for parallel composition.
object IO {
def sequence[E, A, M[X] <: TraversableOnce[X]](in: M[IO[E, A]])(
implicit cbf: CanBuildFrom[M[IO[E, A]], A, M[A]]): IO[E, M[A]]
}
Following #53, interruption can be made more precise, which may address #64.
Currently uninterruptibly
is implemented like so:
curIo = for {
_ <- enterUninterruptible
v <- io.io
_ <- exitUninterruptible
} yield v
This is not actually precise, because if io.io
. fails, then the fiber will be in a permanently non-interruptible state. In theory, this can be solved by:
curIo = for {
_ <- enterUninterruptible
e <- io.io.attempt
_ <- exitUninterruptible
v <- e.fold(IO.fail)(IO.now)
} yield v
But this makes it much slower. With Ensuring
becoming a fast operation thanks to #53, we can do better:
curIo = enterUninterruptible.flatMap(_ => io.io.ensuring(exitUninterruptible))
Because enterUninterruptible
cannot fail, and because once in such a state, the fiber cannot be interrupted during the flatMap
, the only place for the computation to fail is io.io
, which is now attached with a finalizer that exits the uninterruptible section.
Now some care is needed elsewhere. In particular, when a Finalizer
is being run.
def apply(a: Any): IO[E, Any] = finalizer.widenError[E] *> IO.now(a)
In the failure scenario, if the finalizer is popped off the stack during error or interruption, then it's uninterruptible status must be guaranteed (and, it seems like this is true!).
But in the success scenario, if the Finalizer
is popped off the stack via nextInstr
, then it's quite possible that the action will be interrupted before anything inside the Finalizer.finalizer
is actually run.
In order to guard against this, one option is to modify nextInstr
so that it checks to see if the next instruction is a Finalizer
, and if so, it enters an uninterruptible section (noInterrupt += 1
), and pushes another action on the stack (_ => exitUninterruptible
) to exit the interruptible section after the finalizer has been executed.
Note there is no need and of course it would create a circular loop for ensuring
to be used here. The reason is that if the finalizer fails (which is the only way the above composite io can fail), then the fiber will be terminated, so the lack of exiting the uninterruptible section has no effect.
Other logic around interruption / finalizers should be checked and confirmed in this ticket.
By spec, Fiber.interrupt
should not return until the fiber is interrupted and all finalizers have been successfully run. Yet this test and at least one other in IOQueueSpec
demonstrate that interrupt
is returning prematurely. This may be as simple as a @volatile
bug somewhere or something more complex. It may be related to #7.
def e9 = unsafePerformIO(
for {
queue <- IOQueue.make[Void, Int](100)
f <- queue.take[Void].fork
_ <- f.interrupt(new Exception("interrupt fiber in e9"))
_ <- waitForSize(queue, 0)
size <- queue.size[Void]
} yield size must_=== 0
)
I think one of the useful features that we need to add to ZIO is a thread shifting.
Like io.executeOn(ioScheduler)
in Monix
The documentation should state that bounded queues do not lose elements when they reach maximum capacity—rather, they suspend new offer
s.
The unbounded
implementation can simply create a bounded queue with Int.MaxValue
. Because there is no pre-allocation, this should work fine.
/cc @adamw
Reported by @alexandru here. This may be the same bug identified by @wi101.
Ah, you're right, so you are sequencing the execution of finalisers. Except that I found a bug, some sort of race condition.
Sample 1 — in which the inner finalizer never gets executed:
val task = IO.unit[Void].bracket[Unit] { _ =>
IO.sync[Void, Unit](println("start 1")) *> IO.sleep(1.second) *> IO.sync(println("release 1"))
} { _ =>
IO.unit[Void].bracket[Unit] { _ =>
IO.sync[Void, Unit](println("start 2")) *> IO.sleep(1.second) *> IO.sync(println("release 2"))
} { _ =>
IO.never[Void, Unit]
}
}
task.fork.flatMap(f => f.interrupt(new RuntimeException("cancel")) *> IO.never)
This fails with:
java.lang.Error: Defect: Fiber is not in executing or async state
at scalaz.ioeffect.RTS$FiberContext.enterAsyncStart(RTS.scala:931)
at scalaz.ioeffect.RTS$FiberContext.evaluate(RTS.scala:514)
at scalaz.ioeffect.RTS$FiberContext.$anonfun$fork$1(RTS.scala:761)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scalaz.ioeffect.RTS$$anon$1.run(RTS.scala:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
start 1
release 1
Sample 2 — the first specified finalizer gets executed twice somehow, which is really odd:
IO.unit[Void]
.bracket[Unit](_ => IO.sync[Void, Unit](println("start 1")) *> IO.sleep(1.second) *> IO.sync(println("release 1")))(_ => IO.unit[Void])
.bracket[ExitStatus](_ => IO.sync[Void, Unit](println("start 2")) *> IO.sleep(1.second) *> IO.sync(println("release 2")))(_ => IO.sync(ExitStatus.ExitNow(0)))
.fork.flatMap(f => f.interrupt(new RuntimeException("cancel")) *> IO.never)
Output:
start 1
start 1
java.lang.Error: Defect: Fiber is not in executing or async state
at scalaz.ioeffect.RTS$FiberContext.enterAsyncStart(RTS.scala:931)
at scalaz.ioeffect.RTS$FiberContext.evaluate(RTS.scala:514)
at scalaz.ioeffect.RTS$FiberContext.$anonfun$fork$1(RTS.scala:761)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scalaz.ioeffect.RTS$$anon$1.run(RTS.scala:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
release 1
And this behavior is what made me jump to conclusions.
It's not that easy to reproduce but here's the minimal example I arrived at:
object X extends App with RTS {
import scala.concurrent.duration._
val c = new AtomicInteger(0)
val test: IO[Throwable, Unit] = IO
.syncThrowable {
println(System.currentTimeMillis() + " working")
Thread.sleep(100)
if (c.incrementAndGet() <= 1) {
throw new RuntimeException("x")
} else {
()
}
}
.forever
.ensuring(IO.sync(println("CLEANUP; RESTARTING")))
.attempt
.forever
unsafePerformIO[Void, Unit](
test
.fork[Void]
.flatMap(f => IO.sleep[Void](1.second) *> f.interrupt(new RuntimeException("y"))))
}
Even though the fiber is interrupted after 1 second, it ends up printing working
infinitely. Interruption works correctly if the bracket (ensuring
) never runs (so if throw new RuntimeException("x")
is never executed) or if there's no bracket at all.
The double-forever-loop is there as the original concept was to acquire a resource, use it in a loop, and if there's an error clean up and acquire again (hence the second loop).
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.