frequenz-floss / frequenz-channels-python Goto Github PK
View Code? Open in Web Editor NEWChannel implementations for Python
Home Page: https://frequenz-floss.github.io/frequenz-channels-python/
License: MIT License
Channel implementations for Python
Home Page: https://frequenz-floss.github.io/frequenz-channels-python/
License: MIT License
Right now, the send
method doesn't prevent None
values from being sent. It should instead raise an exception when a None
value gets sent.
No response
No response
No response
No response
There is no unicast lightweight channel at the moment.
Add a Unicast
channel that can have only one sender and one receiver.
No response
Broadcast
channel can be used instead, and just use one receiver.
We did some tests in the past and the performance gains weren't as good as expected compared to BroadcastChannel
. It needs more thought (and maybe compelling use cases to dig further).
There might be cases when one wants to do a Select
loop with some channels that are optional and if they are not present the handling of those channels will just not be used. Right now there is no trivial way to do this.
Create a Select.with_optional()
method like (maybe incomplete and untested):
def with_optional(self, **optional_receivers: Optional[AsyncIterator[Any]]) -> Select:
# TODO: check name clashing with self._receivers
for name, recv in optional_receivers.items():
self._result[name] = None
if recv is None:
continue
self._receivers[name] = recv
# can replace __anext__() to anext() (Only Python 3.10>=)
msg = recv.__anext__() # pylint: disable=unnecessary-dunder-call
self._pending.add(asyncio.create_task(msg, name=name)) # type: ignore
return self
With the intended use:
# chan1 can't be None, chan_none can
select = Select(chan1=chan1).with_optional(chan_none=None)
while await select.ready():
if select.chan1:
pass
elif select.chan_none: # will always be None if chan_none is None
pass
Yet to be seen
chan_none = None
optional_channels = {}
if chan_none is not None:
optional_channels["chan_none"] = chan_none
select = Select(chan1=chan1, **optional_channels)
while await select.ready():
if select.chan1: # OK
pass
elif chan_none and select.chan_none: # OK too
pass
Select
and never call send to the sender of the channel.chan_none = Broadcast[int]("chan_none").get_receiver()
select = Select(chan1=chan1, chan_none=chan_none)
while await select.ready():
if select.chan1: # OK
pass
elif select.chan_none: # OK too, will never be True
pass
No response
We need to properly document class/instance and module attributes/properties documentation so they appear in the generated docs.
For now how to best do this is documented in this issue:
No response
No response
No response
A clean and consistent interface to create channels, and one that doesn't require to pass strings used for debugging only.
Join both client_id
and service_id
from Anycast
constructor into an name
argument to match Broadcast
.
Make name
optional
If name
is None
I would just generate a uuid
or just id()
(in a distributed context, uuid
will make any unlikely collision even more unlikely).
No response
No response
No response
The Sender
class provides an async send()
method, but most (if not all) our senders could currently send synchronously 1. Being async
means that the control flow/CPU could be taken away from the caller, which is very inconvenient for cases where sync is guaranteed.
Split Sender
into SyncSender
and AsyncSender
(probably make them protocols instead of classes). SyncSender
would provide a sync send()
and AsyncSender
would provide a async asend()
.
Resampler
class) can also benefit from this, now the resampling functions need to be async
because send()
is async
: https://github.com/frequenz-floss/frequenz-sdk-python/blob/ce6d19b869ed26c4df1e08a332f4e4fa4cbfc199/src/frequenz/sdk/timeseries/_resampling.py#L711-L716send()
would drop like now and asend()
would wait until the queue has room for the message.Currently the Data sourcing actor is just spawning tasks for sending and assuming they will eventually finish, and successfully, as there is no error handling for the spawned tasks and if they don't end, we'll be leaking dangling tasks forever.
AsyncSender
s now, we will probably need them in the future, at least for this issue: #20sync send()
in the Data sourcing actor:
Actually, with the removal of Bidirectional
, now we only have 2 types of channels and 1 is sync (broadcast) and the other one is async (anycast). There is also an issue about having an option to make Broadcast
senders block too. โฉ
We need to have a cleaner public interface in terms of modules and where each class is exposed. Right now we have everything exposed in some internal modules (like frequenz.channels.broadcast.Broadcast
), but what it is really meant to be used is also exposed via frequenz.channels
directly (for example frequenz.channels.Broadcast
).
This ends up in a lot of duplication and a lot of irrelevant symbols being shown in the API documentation (https://frequenz-floss.github.io/frequenz-channels-python/next/reference/frequenz/channels/).
Hide all internal modules by prepending a _
to the module name.
I would also expose anything that is not a channel in a util
package. So:
frequenz.channels.
Anycast
Bidirectional.Handle
(declare Handle
inside Bidirectional
)Broadcast
Peekable
Receiver
Sender
frequenz.channels.util.
FileWatcher
Merge
MergeNamed
Select
Timer
Don't expose BufferedReceiver
anymore, and maybe move it to Broadcast
, as it doesn't look like it's being use as an interface.
No response
The obvious alternative is to keep all symbols exposed directly from frequenz.channels
.
Originally posted by leandro-lucarella-frequenz November 3, 2022
Now that we have API documentation, we are exposing every module in the documentation, when the actual classes that are intended to be used by the users are all imported publicly in frequenz.channels
.
For a library this small I think it makes sense con only consider that module our public API, this will also allow us to restructure the project in a non-breaking way.
The de facto way to mark modules and packages internal is to prefix a _
, so all modules and packages will have to be prefixed this way if we want to go this route.
When calling map()
on a receiver, the returned type is of type _Map
and tying-wise it is an abstract Receiver
, this means that when mapping a receiver that has more methods, like .close()
for merging or .reset()
for Timer
, these methods are not available anymore after .map()
is used.
I expect the type to be preserved, so the mapped receiver can be used the same way as the original receiver.
The only way to fix this is probably to have some mapping logic inside Receiver
itself instead of returning a new type. That or removing .map()
and just let users do the transformation themselves when consuming from the receiver.
The current timer fires interval
time after the ready()
method was called, but this is not suitable for periodic timers as it will accumulate timer drift over time.
We need a timer that will periodically fire every interval
time.
Extend Timer
to have a periodic
option or create a new PeriodicTimer
that will start when the timer is instantiated and will fire periodically every interval
time, no matter if ready()
was called or not.
If the timer fired several times before ready()
/consume()
was called, those fires should be queued and returned eventually.
The resampler needs this.
One could use the current Timer
and account for the time drifting oneself. But for that we need to create a new timer for each period, which is wasteful in terms of resources (allocations) and inconvenient in terms of programming effort.
When a broadcast receiver is not getting consumed from fast enough and its buffer has filled up, as new messages arrive, older messages in the receiver's buffer start to get dropped.
In some cases, we might not want messages to get discarded, and instead want the sender to wait for the receiver to drain before it can send further messages to the channel.
Add an optional block_on_full
boolean parameter to the get_sender
method of the Broadcast channel, which defaults to False.
When true, calls to the send
async method will not complete until the value has been sent to all the channels.
No response
No response
No response
The names are just too confusing and we'll never find a name that can convey all the intricacies of timers in the async world.
I'll probably leave the docs about both methods as examples of how to use the timer to achieve different goals.
Remove periodic()
and timeout()
and force users to pass the missing ticks policies manually.
There is not much use in having Merge
as a class from a user point of view, users normally only care about merging some channels, so having a class just makes the syntax more weird for most use cases that will be async for msg in merge(...)
.
Make Merge
private (_Merge
) and create a new function that just does return _Merge(...)
.
No response
No response
No response
Generate docs for the v0.10.0 release so it is available
Backport the changes in #25 to the v0.10.x branch and make a v0.10.1 release.
No response
No response
No response
When we iterate over select
without consuming a previously selected value, select raises a SelectError
that derives from BaseException
.
The use of BaseException
combined with tasks consuming exceptions makes it hard to identify that anything happened, because a broad except Exception:
block doesn't catch BaseException
s.
The use of BaseException
in user code is not recommended according to python docs: https://docs.python.org/3/library/exceptions.html#BaseException
A except Exception:
catches select
errors. i.e. SelectError
s derive from Exception
and not BaseException
.
Synchronization of multiple sources (select()
, merge()
, etc.) (part:synchronization)
We need to have more flexibility to configure a FileWatcher
. At the moment all watching is recursive and there is no way to set own filter and other options that are very useful to have.
I see mainly 2 options that we might need to explore in more depth:
FileWatcher
just a glue to use awatch
with channelsThis means, we just take a awatch
instance in the FileWatcher
configuration and we also use awatch
's own event types, etc. This allows for full flexibility and zero(ish) maintenance cost for updating to new versions of awatch
that could provide even more options.
The downside is stopping can't be taken care of by the FileWatcher
, the users will have to provide an event themselves to stop, if they need to stop the receiver. Or maybe we can just use Task.cancel()
to stop it.
FileWatcher
expose more awatch
options in the __init__
This means that we take in the __init__
an option to pass watch_filter
, debounce
, recursive
and other options that will be just forwarded to awatch
.
The downside is we always need to update if awatch
if more features are added, and we need to create/maintain wrappers for other awatch
types if we want to make the implementation hidden.
Since we are coupling FileWatcher
with awatch
anyways, and using a file watcher is not something that needs to be done very often (it is still a lowish-level construct) and has many things to consider, I lean more towards using the first approach.
The main use case for now is the ConfigManagingActor
in the SDK.
With the current FileWatcher
implementation, the config manager needs to watch for a whole directory recursively just to watch for one individual file, and do the filtering manually, which is not ideal (and was difficult to understand what was going on).
Use awatch
directly or do some of the work manually.
We are considering changing the underlying library, so we should have this in mind before addressing this issue:
Select
is supposed to be used in very specific cases, it is not really intended to be used everywhere as a general event loop, as when using async
there is already an event loop.
Specially for people not used to use async
, it might be tempting to use Select
everywhere, leading to many issues, some of which can really be improved upon, but some that only come from the fact that Select
is being misused.
Example issues that came from misusing `Select:
Improve the Select
documentation to mention it should be only used in very niche situations.
When receiving for multiple channels, normally it should be enough to create one task to handle each receiver, or use Merge
or MergeNamed
when multiple channels transport the same message type.
For example, we can find this code in the SDK:
while await select.ready():
if _ := select.resampling_timer:
awaitables = [
self._output_senders[channel_name].send(sample)
for channel_name, sample in self._resampler.resample()
]
await asyncio.gather(*awaitables)
if msg := select.component_data_receiver:
if msg.inner is None:
# When this happens, then DataSourcingActor has closed the channel
# for sending data for a specific `ComponentMetricRequest`,
# which may need to be handled properly here, e.g. unsubscribe
continue
channel_name, sample = msg.inner
if self.is_sample_valid(sample=sample):
self._resampler.add_sample(
time_series_id=channel_name,
sample=sample,
)
if msg := select.subscription_receiver:
if msg.inner is None:
raise ConnectionError(
"Subscription channel connection has been closed!"
)
await self.subscribe(request=msg.inner)
# Breaking out from the loop is required to regenerate
# component_data_receivers to be able to fulfil this
# subscription (later can be optimized by checking if
# an output channel already existed in the `subscribe()` method)
break
Here Select should not be used at all. There should be a task for the
resampling_timer` like:
def resample_periodically(resampling_time):
async for timestamp in resampling_timer:
awaitables = [
self._output_senders[channel_name].send(sample)
for channel_name, sample in self._resampler.resample()
]
await asyncio.gather(*awaitables)
Then similarly another task for component_data_receiver
, or one task per resampler (and the resampler could even handle that task instead).
Finally the main loop should only handle the subscription_receiver
, also using an async iterator is enough:
async for request in subscription_receiver:
await self.subscribe(request)
Then we don't need a break
to rebuild the component_data_receiver
and the code becomes much more sequential an simpler to read and understand.
So as it can be seen here, misunderstanding Select
could even lead to wrong design decisions.
Select
should be used only in cases where there are interactions between different channels (and they are not all of the same type, so it is not enough to group them using Merge
and MergeNamed
. For example, we want to call set_bounds()
each time a bounds
message arrives, but if no messages arrive in 3 seconds, we want to call it too:
timer = Timer(3.0)
select = Select(bounds=bounds_recv, timer=timer)
latest_bounds = {}
while await select.ready():
if msg := select.bounds:
latest_bounds[msg.inner.name] = msg.inner.value
set_bounds(latest_bounds)
timer.reset()
elif select.timer:
set_bounds(latest_bounds)
Here bounds
and timer
are interacting.
We should also document that giving the user control over the order in which multiple ready receivers are processed is part of the design of the interface.
We also considered using an async iterator interface, but then the order in which multiple ready receivers are processed is in the hands of asyncio.wait()
(or as_completed()
, we tried both, and they result in different ordering patterns, as as_completed()
seems to use first the first awaitable in the list, and wait()
just returns a set with a random order).
This also can be bad for processing fairness, as we might end up with a very active receiver that always end up being the first ready receiver in the async iterator interface, leaving to starvation of all other receivers.
The alternative should be probably documented only in the code, but the user documentation should include and make it very clear that Select
should be used when this ordering should be controlled by the user.
See #27 for more details.
No response
No response
No response
Currently if elif
or break
is used when handling ready receivers is used, it could starve the receivers that come after it, which is not a good thing.
Ideally we should find a way to either forbid the use of elif
or break
or revisit #27.
Originally posted by @leandro-lucarella-frequenz in #62 (comment)
It would be nice for select()
to be more flexible (add and remove receivers, have optional receivers) without losing its static-ness.
Improve the composability of existing receivers and create a new one:
_Merge
to do that) and can dynamically add and remove receivers, and just waits forever (it's never ready) when there are no receiversI think many cases where users need to handle lots of tasks manually to cope with more dynamic situations, like the resampler, could be greatly simplified if they could be handled automatically by using a more flexible select.
Break the select loop, use tasks manually.
This would fix a few open issues / discussions:
While we are introducing our own exceptions I would also consider adding a specific exception for this (maybe ReceiverInactiveError
?), EOFError
is only used by input()
, so it feels a bit weird to raise this exception here.
Originally posted by @leandro-lucarella-frequenz in #42 (comment)
Now that it is release, we should use the final version instead of -dev
.
It might be also a good idea to add an environment variable with the Python version to use for steps that don't use a matrix and need Python.
Right now the FileWatcher
only returns the path of the changed file, but it doesn't say if a change was a creation, modification or deletion, which might be useful information to have.
Originally posted by @shsms in #51 (comment)
Users should be able to send any value through a channel (or at least any serializable value if at some point channels can be over the wire). This includes None
.
Allow sending None
by raising an exception instead of returning None
if a channel is closed.
No response
Wrapping the None
in another type (like (None,)
).
No response
Examples here: https://github.com/frequenz-floss/frequenz-api-microgrid
No response
No response
No response
No response
When using Select.ready()
it is a bit unexpected that more than one event could be received, for example:
while await select.ready():
if _ := select.resampling_timer:
awaitables = [
self._output_senders[channel_name].send(sample)
for channel_name, sample in self._resampler.resample()
]
await asyncio.gather(*awaitables)
if msg := select.component_data_receiver:
if msg.inner is None:
# When this happens, then DataSourcingActor has closed the channel
# for sending data for a specific `ComponentMetricRequest`,
# which may need to be handled properly here, e.g. unsubscribe
continue
channel_name, sample = msg.inner
if self.is_sample_valid(sample=sample):
self._resampler.add_sample(
time_series_id=channel_name,
sample=sample,
)
if msg := select.subscription_receiver:
if msg.inner is None:
raise ConnectionError(
"Subscription channel connection has been closed!"
)
await self.subscribe(request=msg.inner)
# Breaking out from the loop is required to regenerate
# component_data_receivers to be able to fulfil this
# subscription (later can be optimized by checking if
# an output channel already existed in the `subscribe()` method)
break
This is a real example from the SDK: https://github.com/frequenz-floss/frequenz-sdk-python/blob/1e01c2120cb96d703d3c3f6bb1fe1b738fa439d8/src/frequenz/sdk/data_ingestion/resampling/component_metrics_resampling_actor.py#L211-L240
This code is correct now, but there were 2 bugs that are very easy to add when handling a read()
result that introduce data loss:
break
is not in the handling of the last event, then events could be missed (and dropped by ready()
.elif
instead of if
, which is not intuitive when handling something that looks like a switch
/case
, we can again lose events.The most common pattern to handle event loops (which Select.ready()
looks a lot like) is to receive one event at the time. Also if we do this, we can enable the use of structural pattern matching via match
.
Return only one ready event in .ready()
.
I've seen these 2 mistakes made very often, and even found one in the channels documentation itself:
frequenz-channels-python/src/frequenz/channels/utils/timer.py
Lines 26 to 34 in d4a3793
This shows how easy it is to make those mistakes that even developers of the library itself can make it.
The only workaround I see is being super careful, and it is proven that it doesn't work :)
We can see if we can move back to pending tasks that were returned by asyncio.wait()
and if that doesn't work, we can keep an internal list of done
tasks and return those before actually calling asyncio.wait()
again. Not sure how complicated asyncio.wait()
is, but if it isn't just a few lines if there is a done
task already, the later should be more efficient CPU wise (and less efficient memory wise).
We already have 2 ways to join multiple receivers, Merge
and select
. MergeNamed
is a mixture of both, so it is redundant.
Remove MergeNamed
.
No response
No response
No response
The code unnecessarily checks the type of the returned changes
by watchfiles
(if it's None
and then if it has 2 elements).
frequenz-channels-python/src/frequenz/channels/utils/file_watcher.py
Lines 75 to 76 in 4cceb39
But looking more closely at watchfiles documentation, the function yields a Set[FileChange]
. FileChange
being a Tuple[Change, str]
.
This check is unnecessary and should be removed (and if we would keep it, it should be an assert
ion, as the code would be really broken if it happens to be different).
No response
Utility receivers, Merge
, Timer
, etc. (part:receivers)
No response
No response
No response
No response
No response
Now when receiver buffer size is full and new message comes, then the oldest message is removed.
But in some cases it is important that each request is received and processed.
One such use-case is when we request to subscribe for metric data.
If we have many components/metrics and we subscribe for all data at the startup, then default receiver buffer size is not enough. The oldest messages are removed and the application doesn't work properly because we don't receive data.
There are two possible solutions:
We could have special kind of blocking channel.
If sender tries to send message, but the receiver buffer size is full, then we should block sender until the receiver buffer size has free space. For implementation simplicity it is enough if channel have many senders and only one receiver. (this covers our all current use-cases)
We could have option to dynamically increase the buffer size - but in that case there might be more and more requests, and receiver is unable to process them in time and the memory might increase too much.
For now the only use-case was subscribing for the component metric data.
No response
No response
Peekable
is a very niche and confusing interface, so it should be avoided.
Remove Peekable
.
No response
No response
Depends on:
A lot of cloud use-cases for FileWatcher need it to be monitoring NFS/CIFS mounts, which don't emit inotify events on remote updates.
So our FileWatcher which uses the watchfiles
library, doesn't emit events either. An alternative is for the FileWatcher to use the watchdog library, which also has a polling observer.
FS events triggered remotely also produce a message from the FileWatcher.
No response
Utility receivers, Event
, FileWatcher
, Timer
, etc. (part:receivers)
No response
If we do this, the usage can be much more idiomatic:
receiver1 = some_chan.get_receiver()
timer = channel.Timer(30.0)
select = Select(receiver1=receiver1, timer=timer)
async for selected in select.ready():
match selected.origin:
case receiver1:
process_data(selected.value)
case timer:
# something to do once every 30 seconds
pass
case _:
raise Exception("Unknown origin")
So .ready()
should return an async iterator itself, by just yield
ing for each ready message.
With this we also don't need to keep a reference to the Select
instance around, we can just do async for selected in Select(...).ready()
, or we could just add an utility function to make it even more compact, as 99% of the cases nobody will need to even know Select
is a class: async for selected in select(...)
. This also completely removes the temptation of adding the option to dynamically change the Select
instance inside the loop (like in #24).
This also sort of depends on #27, as it could make breaking from the loop even more natural, and cause data loss in that case.
Originally posted by @leandro-lucarella-frequenz in #27 (comment)
This issue:
The timer resetting properly.
v0.15+
Utility receivers, Event
, FileWatcher
, Timer
, etc. (part:receivers)
We need this backported to v0.16 becasue the SDK v0.25 uses it.
Change event_type
type hint to remove None
and use the default-init frozenset(FileWatcher.EventType)
instead.
Then the conditional check for None
on event_type
can be removed.
See review comment #87 (comment) for further details.
We need a documentation to introduce new users to channels.
Write a document that explain the basic channels concepts:
It should also mention common pitfalls, like messages should be small and serializable even when there is no networking support yet.
No response
No response
No response
We have a lot of class attributes that are public, so users can mess with them even when it is not safe to do so.
Just as an example, the Anycast
channel has:
self.limit: int = maxsize # If changed, the deque is not updated
self.deque: Deque[T] = deque(maxlen=maxsize) # a user could just add or remove stuff without using the condition variable
self.send_cv: Condition = Condition() # users could triggers sends when the deque is full, dropping messages unexpectedly
self.recv_cv: Condition = Condition()
self.closed: bool = False # users could mark the channel as open after it was closed.
Review all class attributes and make them private or read-only as appropriate.
No response
No response
No response
Using bidirectional channels is not a recommended practice and should not be encouraged by this library.
Remove Bidirectional
.
No response
No response
No response
Receiver
s are not getting destroyed as soon as they go out of scope, because channel implementations hold on to a reference to the Receiver
s, so that they can put new messages into them.
So when a the task that created a receiver dies, the receiver doesn't, but nobody is reading from it, and the logs get flooded with
Broadcast receiver [chan_name] is full. Oldest message was dropped.
Receiver
s that go out of scope should get cleaned up.
No response
Channels, Broadcast
, Bidirectional
, etc. (part:channels)
The solution appears to be to use a weakref to hold on to the receivers in the channel implementations.
And the send
methods should check and cleanup any weakrefs to destroyed receivers.
It looks like ordering is important here and I don't think set
preserves order. Also, why do we want to compress FileChange
s that are the same, suppose we have this sequence:
/a
is added/a
is deleted/a
is added againWith this implementation we could only get one add and one deletion so we would think the file doesn't exist but it does.
If this introduced a bug that wasn't detected by test, we should probably add a test for this too :)
Originally posted by @leandro-lucarella-frequenz in #42 (comment)
A way to pass errors via channels.
One way to be able to pass exceptions through channels would be to make the channels aware of exceptions and if an exception instance is received via a channel, just raise an exception.
One problem with this approach is using Select()
then becomes less clear, as we would need to wrap the select.ready()
in a try
/ except
block, which dissociates the error to the channel it appeared with.
It might be nicer to have a way to keep the handling of a particular channel (both good results and errors) in the same block. But then to get the data, we seem to always call .inner
(not sure why), so that could be a good place to plug a check for the result sent to the channel and raise
if it was an Exception
.
Bidirectional
channels often need to report errors if a request couldn't be fulfilled for example. Any error that comes up in a sender for which it might not be able to produce a proper message could also be sent to the receiver to let them know about the failure.
Define a message that is a Union
of two types, the success message and an error message.
No response
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.