Comments (7)
@eyalfa This is expected behavior. When you use broadcast
you are responsible for consuming each of the broadcasted streams. Otherwise one consumer may not be able to receive more elements because the maximum lag number of elements are already being buffered. The broadcasted streams try to help you out with that by building in unsubscribing as part of their finalization logic. However, you still have to actually run the stream for its finalization logic to be executed.
For example, this will also not terminate:
test("broadcast") {
ZStream.range(0, 100, 10).broadcast(2, 1).flatMap { streams =>
ZIO.succeed(Chunk.empty).zipWithPar(streams(1).runCollect) { (left, right) =>
assertTrue(left == Chunk.empty) &&
assertTrue(right == Chunk.range(0, 100))
}
}
}
Your code is the equivalent of this. stream.take(0).runCollect
is equivalent to ZIO.unit
. You have said you want to take zero elements from the stream, so there is no need to, and it would not be correct to, even begin executing the stream or acquire or release any resources associated with it. This is a basic consequence of streams being lazy and pull based. This can be illustrated with the following much more simple example:
object Example extends ZIOAppDefault {
val resource: ZIO[Scope, Nothing, Unit] =
ZIO.acquireRelease(ZIO.debug("acquiring"))(_ => ZIO.debug("releasing"))
val stream: ZStream[Any, Nothing, Unit] =
ZStream.scoped(resource)
val run =
stream.take(0).runCollect
}
In the above code acquiring
and releasing
will never be printed because there is no need to execute the stream, since it can return zero elements without doing so. Of course if you want to fully consume the stream you can use an operator like runDrain
to do that.
In your case, if you want to be able to unsubscribe from the broadcast without consuming the stream at all you can use broadcastedQueues
instead of broadcast
, which will give you a chunk of queues that you can manually unsubscribe from without ever having to consume them.
from zio.
thanks @adamgfraser , while I agree with most of your response I on think there's a slight difference in the broadcast
use case (and generally with the ensuring
use case), the resource has already been acquired.
in the broadcast
case the resource is implicitly acquired and the client code has no way ensuring it's release, I claim this is a bug, using take(0)
just exposes it.
regarding using broadcastedQueues
I'm aware of this option, however I find it breaks the stream abstraction making it very hard to write general code that 'mix and match' streams, pipelines and sinks.
I think this also brings out a difference between ZIO
and ZStream
, ZIO
ALWAYS respects the ensuring
statement, even when the effect is skipped, i.e.
ZIO.never.when(false).ensuring(ZIO.debug("post")
will always print "post"
from zio.
@eyalfa No resource was acquired by the stream. You can see that with the example above, where neither acquire
nor release
is printed. The guarantee of acquireRelease
with ZStream
is precisely the same as the guarantee with ZIO
, that if the acquire
action is executed the release
action will be executed when the stream or workflow completes execution. Here neither the acquire
nor the release
actions are executed because stream.take(0).run
is equivalent to ZIO.unit
which is a necessary consequence of streams being lazy and pull based.
The correct analogy with ZIO
would be ZIO.never.ensuring(ZIO.debug("releasing")).when(false)
. This will also not print releasing
for exactly the same reason that zio.when(false)
is equivalent to ZIO.unit
. The stream equivalent of your example would be ZStream.never.take(0).ensuring(ZIO.debug("releasing")).runCollect
which would print releasing
.
from zio.
thanks @adamgfraser , I accept my zio analogy is not 100% equivalent to the stream use case,
but I disagree with the claim no resource has been acquired as broadcast
did allocate an internal hub and subscribed it multiple times, these subscriptions are resources and broadcast does not provide any means to close it properly regardless of downstream's behavior.
from zio.
@eyalfa There is no resource acquired by the stream. The outer ZIO
workflow described the broadcast and the broadcast will be shut down when its scope is closed. However, within the scope of the broadcast you are responsible for consuming the broadcasted streams or a subscriber may suspend if the maximum number of elements are already being buffered because another subscriber is not being consumed.
Hopefully the comparison to ZIO
helps you understand that stream.take(0).runCollect
is exactly equivalent to zio.when(false)
and ZIO.unit
. So your original example is identical to what I posted above:
test("broadcast") {
ZStream.range(0, 100, 10).broadcast(2, 1).flatMap { streams =>
ZIO.succeed(Chunk.empty).zipWithPar(streams(1).runCollect) { (left, right) =>
assertTrue(left == Chunk.empty) &&
assertTrue(right == Chunk.range(0, 100))
}
}
}
You are specifying that you want to broadcast to two streams, but then you literally never consume one of the streams, so of course the other one will suspend indefinitely once the maximum buffer size is reached. The solution is simple. Either don't specify that you want to broadcast to two streams if you only want to broadcast to one, or actually consume the second stream.
from zio.
@adamgfraser there are scenarios where one knows there are two consumers but not how they're going to consume the stream, seems like the ZStream
abstraction just isn't enough in these scenarios as you must somehow specify an additional 'canceller' alongside the stream.
perhaps I'm a bit spoiled after so many years of akka streams 😎
from zio.
@eyalfa The stream has a finalizer as part of it but you have to actually consume the stream.
from zio.
Related Issues (20)
- ZTestRunnerJS failure message is unhelpful
- assertTrue on zio-http Response - Exception occurred while executing macro expansion HOT 2
- zio-test: provide something like `assertAll(tests: Seq[TestResult])` HOT 3
- ZIO Metrics: Unexpected `ZStream.tagged` behavior
- ZPool incorrectly propagates uninterruptible flags to `get` Zio, for dynamically created members
- zio-test: compile error on assertion on zio.Cause HOT 4
- ZStream.ensuring, finalizer may be deferred to channel executor's close and as a result may cause deadlocks HOT 1
- ZLayer scoped doesn't work with multiple type constructors HOT 1
- No test found when an exception is raised in the construction of a provided layer HOT 4
- TestClock manipulation doesn't seem to work right for shared environment HOT 5
- Debounce on stream does not work properly HOT 3
- ZStream flatMapPar doesn't respect scope HOT 1
- zio-test + zio-prelude: Need way to "fuzzy compare" with Prelude HOT 2
- ZIO TEST: Using .nn feature breaks reporting on failing assertions with java.lang.StringIndexOutOfBoundsException HOT 5
- Trying to interrupt a fiber in test hangs forever? HOT 2
- zio test 2.0.20 - contains in assertTrue failing compilation HOT 2
- Too big zio-test output when assertion failed on case class instances HOT 10
- 404 after clicking on the ZIO Newsletter button HOT 1
- Layers shared between different test runs when using `zio.test.check` HOT 5
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from zio.