Comments (7)
I don't get how there can be a difference betwee elif and break.
ready()
waits for receive()
calls to finish. If multiple finish at the same time, ready_count
is the number of items. With each iteration, when a value is read, ready_count
is decremented. ready()
will not make further receive()
calls until ready_count
is 0.
What is the benefit of dropping messages silently (code-wise, I know a log message is printed) instead of just making available unprocessed messages to the loop again if they weren't consumed in the previous loop?
Messages are not dropped under normal operation. We drop messages only if no messages have been consumed since the last call to ready
. This means that a certain receiver mentioned in the Select
constructor is not being waited on, in the if
block.
But as discussed, I will look into approaches for not dropping messages after break
.
from frequenz-channels-python.
I created #47 for the async iterable interface for ready()
.
from frequenz-channels-python.
If one uses elif instead of if, which is not intuitive when handling something that looks like a switch/case, we can again lose events.
This is not the case. When there are multiple items ready, the Select implementation uses the variable ready_count
to not pull in more messages until all previous messages have been consumed. https://github.com/frequenz-floss/frequenz-channels-python/blob/v0.x.x/src/frequenz/channels/select.py#L96
So using elif
is just fine. The implementation probably needs better documentation.
If the break is not in the handling of the last event, then events could be missed (and dropped by ready().
This is an issue, but it just means that using a Select
is not the right approach for this problem. I'd rather reimplement it like it is in the DataSourcingActor
, creating independent tasks that act on these triggers.
Another approach for this specific problem would be to not break
, but update MergeNamed to add/remove channels.
component_data_stream = MergeNamed(...)
while await select.ready():
if msg := component_data_stream:
process message
elif msg := select.subs_recv:
component_data_stream.add_receiver(name, receiver_from_somewhere)
from frequenz-channels-python.
I don't get how there can be a difference betwee elif
and break
. Is it because you are calling __getattr__()
when using elif
? In any case I think the mechanics are too complicated if we are even having this conversation.
This is an issue, but it just means that using a Select is not the right approach for this problem.
Nevertheless, it shouldn't introduce obscure bugs. Things should be as obvious as possible. What is the benefit of dropping messages silently (code-wise, I know a log message is printed) instead of just making available unprocessed messages to the loop again if they weren't consumed in the previous loop?
If a user wants to drop messages, it should be done explicitly IMHO.
from frequenz-channels-python.
I was thinking that another alternative to this is to return a list of _Selected
objects. That would make it more clear that you have several messages to process and if you don't process the whole list it is much clearer that you are dropping messages. This is what select()
and epoll_wait()
do for example, so at least for people coming from the POSIX/Linux world it could feel more natural.
What about using an iterator for this?
receiver1 = some_chan.get_receiver()
timer = channel.Timer(30.0)
select = Select(receiver1=receiver1, timer=timer)
async for selected in select.ready():
match selected.origin:
case receiver1:
process_data(selected.value)
case timer:
# something to do once every 30 seconds
pass
case _:
raise Exception("Unknown origin")
So .ready()
should return an async iterator itself, by just yield
ing for each ready message.
With this we also don't need to keep a reference to the Select
instance around, we can just do async for selected in Select(...).ready()
, or we could just add an utility function to make it even more compact, as 99% of the cases nobody will need to even know Select
is a class: async for selected in select(...)
. This also completely removes the temptation of adding the option to dynamically change the Select
instance inside the loop (like in #24).
from frequenz-channels-python.
I gave this another try on top of #42 and like in #47 I noticed the order gets messed up.
The reason is, with the current approach, the order in which each receiver is processed is set by the user, but if we switch to an async iterator interface, then the order is in the hands of asyncio.wait()
(or as_completed()
, I tried both, and they result in different ordering patterns, as as_completed()
seems to use first the first awaitable in the list, which wait()
just returns a set with a random order).
Soooooo, once more, I think now that the interface of Select with the static, user-selected ordering is what probably makes more sense, because if you are using it, you probably want to decide which receiver to process first if more than one is ready at the same time.
This said, we should definitely also document this particular behavior and design, so we don't ask this question again in the future. I will add a comment to #29 to make sure this doesn't get lost.
@sahas-subramanian-frequenz if you agree, feel free to close this as "not planned" and add the wontfix
label.
from frequenz-channels-python.
Re-opening because we decided to go for this.
from frequenz-channels-python.
Related Issues (20)
- Improve composability of receivers HOT 4
- `FileWatcher` should expose more `awatch` configuration options
- Write introductory documentation
- Many class attributes should be private or read-only
- Consolidate and make channels `name`s (`client_id` / `service_id`) optional
- Remove `Peekable`
- Remove `Bidirectional`
- Remove `MergeNamed`
- Expose `Merge` as a `merge()` function
- `Receiver.map()` doesn't preserve the original receiver's type HOT 8
- Backport timer `reset()` fix to v0.16 HOT 2
- `Timer`: Remove `periodic()` and `timeout()`
- Filewatcher is not reporting events in CIFS mounts HOT 3
- Documentation and code cleanup for 1.0 HOT 1
- Remove uses of `BaseException`
- Find a way to make using select less likely to starve a receiver HOT 1
- Channel that doesn't remove the oldest messages when receiver buffer is full HOT 2
- Implement `SyncSender` and `AsyncSender` HOT 5
- Add a periodic timer
- Refactor event type and initialization in FileWatcher
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from frequenz-channels-python.