Giter VIP home page Giter VIP logo

Comments (7)

adamgfraser avatar adamgfraser commented on May 28, 2024

@eyalfa This is expected behavior. When you use broadcast you are responsible for consuming each of the broadcasted streams. Otherwise one consumer may not be able to receive more elements because the maximum lag number of elements are already being buffered. The broadcasted streams try to help you out with that by building in unsubscribing as part of their finalization logic. However, you still have to actually run the stream for its finalization logic to be executed.

For example, this will also not terminate:

test("broadcast") {
  ZStream.range(0, 100, 10).broadcast(2, 1).flatMap { streams =>
    ZIO.succeed(Chunk.empty).zipWithPar(streams(1).runCollect) { (left, right) =>
      assertTrue(left == Chunk.empty) &&
        assertTrue(right == Chunk.range(0, 100))
    }
  }
}

Your code is the equivalent of this. stream.take(0).runCollect is equivalent to ZIO.unit. You have said you want to take zero elements from the stream, so there is no need to, and it would not be correct to, even begin executing the stream or acquire or release any resources associated with it. This is a basic consequence of streams being lazy and pull based. This can be illustrated with the following much more simple example:

object Example extends ZIOAppDefault {

  val resource: ZIO[Scope, Nothing, Unit] =
    ZIO.acquireRelease(ZIO.debug("acquiring"))(_ => ZIO.debug("releasing"))

  val stream: ZStream[Any, Nothing, Unit] =
    ZStream.scoped(resource)

  val run =
    stream.take(0).runCollect
}

In the above code acquiring and releasing will never be printed because there is no need to execute the stream, since it can return zero elements without doing so. Of course if you want to fully consume the stream you can use an operator like runDrain to do that.

In your case, if you want to be able to unsubscribe from the broadcast without consuming the stream at all you can use broadcastedQueues instead of broadcast, which will give you a chunk of queues that you can manually unsubscribe from without ever having to consume them.

from zio.

eyalfa avatar eyalfa commented on May 28, 2024

thanks @adamgfraser , while I agree with most of your response I on think there's a slight difference in the broadcast use case (and generally with the ensuring use case), the resource has already been acquired.
in the broadcast case the resource is implicitly acquired and the client code has no way ensuring it's release, I claim this is a bug, using take(0) just exposes it.
regarding using broadcastedQueues I'm aware of this option, however I find it breaks the stream abstraction making it very hard to write general code that 'mix and match' streams, pipelines and sinks.

I think this also brings out a difference between ZIO and ZStream, ZIO ALWAYS respects the ensuring statement, even when the effect is skipped, i.e.
ZIO.never.when(false).ensuring(ZIO.debug("post") will always print "post"

from zio.

adamgfraser avatar adamgfraser commented on May 28, 2024

@eyalfa No resource was acquired by the stream. You can see that with the example above, where neither acquire nor release is printed. The guarantee of acquireRelease with ZStream is precisely the same as the guarantee with ZIO, that if the acquire action is executed the release action will be executed when the stream or workflow completes execution. Here neither the acquire nor the release actions are executed because stream.take(0).run is equivalent to ZIO.unit which is a necessary consequence of streams being lazy and pull based.

The correct analogy with ZIO would be ZIO.never.ensuring(ZIO.debug("releasing")).when(false). This will also not print releasing for exactly the same reason that zio.when(false) is equivalent to ZIO.unit. The stream equivalent of your example would be ZStream.never.take(0).ensuring(ZIO.debug("releasing")).runCollect which would print releasing.

from zio.

eyalfa avatar eyalfa commented on May 28, 2024

thanks @adamgfraser , I accept my zio analogy is not 100% equivalent to the stream use case,
but I disagree with the claim no resource has been acquired as broadcast did allocate an internal hub and subscribed it multiple times, these subscriptions are resources and broadcast does not provide any means to close it properly regardless of downstream's behavior.

from zio.

adamgfraser avatar adamgfraser commented on May 28, 2024

@eyalfa There is no resource acquired by the stream. The outer ZIO workflow described the broadcast and the broadcast will be shut down when its scope is closed. However, within the scope of the broadcast you are responsible for consuming the broadcasted streams or a subscriber may suspend if the maximum number of elements are already being buffered because another subscriber is not being consumed.

Hopefully the comparison to ZIO helps you understand that stream.take(0).runCollect is exactly equivalent to zio.when(false) and ZIO.unit. So your original example is identical to what I posted above:

test("broadcast") {
  ZStream.range(0, 100, 10).broadcast(2, 1).flatMap { streams =>
    ZIO.succeed(Chunk.empty).zipWithPar(streams(1).runCollect) { (left, right) =>
      assertTrue(left == Chunk.empty) &&
        assertTrue(right == Chunk.range(0, 100))
    }
  }
}

You are specifying that you want to broadcast to two streams, but then you literally never consume one of the streams, so of course the other one will suspend indefinitely once the maximum buffer size is reached. The solution is simple. Either don't specify that you want to broadcast to two streams if you only want to broadcast to one, or actually consume the second stream.

from zio.

eyalfa avatar eyalfa commented on May 28, 2024

@adamgfraser there are scenarios where one knows there are two consumers but not how they're going to consume the stream, seems like the ZStream abstraction just isn't enough in these scenarios as you must somehow specify an additional 'canceller' alongside the stream.
perhaps I'm a bit spoiled after so many years of akka streams 😎

from zio.

adamgfraser avatar adamgfraser commented on May 28, 2024

@eyalfa The stream has a finalizer as part of it but you have to actually consume the stream.

from zio.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.