achimnol / aiotools Goto Github PK
View Code? Open in Web Editor NEWIdiomatic asyncio utilties
Home Page: https://aiotools.readthedocs.io
License: MIT License
Idiomatic asyncio utilties
Home Page: https://aiotools.readthedocs.io
License: MIT License
At the moment when using this project, mypy
complains that there is not typing information available.
In my test file I did
from aiotools import VirtualClock
And got this when running mypy:
Skipping analyzing "aiotools": module is installed, but missing library stubs or py.typed marker [import]
It seems there is an issue related with common concurrency problem (e.g. deadlock?)
Mostly it succeeds, but it fails sometimes on local development environment in two ways:
test_timer_leak_nocancel
, it returns wrong result in this line assert spawn_count == done_count # e.g. 9 == 8
test_timer_cancel
, it does not halt sometimes.Currently the following 4 cases are failing on macOS since Python 3.8 as multiprocessing's default start method has changed "spawn":
To make them working, we need to allow passing extra arguments to the user-provided main functions and rewrite the tests to use module-defined inner main functions so that those could be picklable.
=================================== FAILURES ===================================
__________________________ test_fork_signal_fallback ___________________________
@pytest.mark.asyncio
async def test_fork_signal_fallback():
with mock.patch.object(
fork_mod, '_has_pidfd', False,
):
> await _do_test_fork_signal()
tests/test_fork.py:147:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
async def _do_test_fork_signal():
def child():
try:
time.sleep(10)
except KeyboardInterrupt:
return 101
return 100
os.setpgrp()
proc = await afork(child)
assert proc._pid > 0
if isinstance(proc, PidfdChildProcess):
assert proc._pidfd > 0
proc.send_signal(signal.SIGINT)
ret = await proc.wait()
> assert ret == 101
E assert 254 == 101
E +254
E -101
Here is a sample gdb stack trace.
Thread 1:
Traceback (most recent call first):
<built-in method acquire of _thread.lock object at remote 0x7f0e5dc63198>
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/aiotools/server.py", line 618, in start_server
child.join()
File "/home/joongi/backend.ai-dev/agent-1909/src/ai/backend/agent/server.py", line 674, in main
use_threading=True, args=(cfg, ))
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/decorators.py", line 17, in new_func
return f(get_current_context(), *args, **kwargs)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 1114, in invoke
return Command.invoke(self, ctx)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/home/joongi/backend.ai-dev/agent-1909/src/ai/backend/agent/server.py", line 687, in <module>
sys.exit(main())
<built-in method exec of module object at remote 0x7f0e6b435638>
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
Thread 2 to 7:
Traceback (most recent call first):
<built-in method close of Loop object at remote 0x7f0e580015e8>
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/aiotools/server.py", line 307, in _worker_main
loop.close()
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 884, in _bootstrap
self._bootstrap_inner()
Thread 8:
Traceback (most recent call first):
Waiting for the GIL
Extra logger subprocess:
Traceback (most recent call first):
<built-in method read of module object at remote 0x7f0e6b39ce08>
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/home/joongi/backend.ai-dev/common-1909/src/ai/backend/common/logging.py", line 188, in log_worker
rec = log_queue.get()
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch
code = process_obj._bootstrap()
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/context.py", line 277, in _Popen
return Popen(process_obj)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/home/joongi/backend.ai-dev/common-1909/src/ai/backend/common/logging.py", line 302, in __enter__
self.proc.start()
File "/home/joongi/backend.ai-dev/agent-1909/src/ai/backend/agent/server.py", line 660, in main
with logger:
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/decorators.py", line 17, in new_func
return f(get_current_context(), *args, **kwargs)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 1114, in invoke
return Command.invoke(self, ctx)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/home/joongi/backend.ai-dev/agent-1909/src/ai/backend/agent/server.py", line 687, in <module>
sys.exit(main())
<built-in method exec of module object at remote 0x7f0e6b435638>
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
When using real timestamps for like 1709591365
, the python event loop gets stuck forever in await:
import pytest
import asyncio
from aiotools import VirtualClock
@pytest.mark.asyncio
async def test_stuck_forever():
clock = VirtualClock()
clock.vtime = 1709591365
with lock.patch_loop():
await asyncio.sleep(1)
This is due to a behavior (bug?) in the BaseEventLoop python/cpython#116342 .
The way I was able to solve it, was to override the eventloop _clock_resolution
to 0.1
. Perhaps this should be added to the patch_loop
implementation?
See the following snippet:
import asyncio
import aiotools
async def error():
await asyncio.sleep(1)
raise Exception("bad stuff")
async def main():
try:
async with aiotools.TaskGroup() as tg:
tg.create_task(error())
await asyncio.sleep(5)
except Exception as e:
print(f"Got inner {repr(e)}")
raise
try:
asyncio.run(main())
except BaseException as e:
print(f"Got outer {repr(e)}")
Got outer CancelledError()
Is this expected behavior? It feels wrong, since a) it ate an exception and b) the task wasn't actually cancelled.
This is Python 3.9 and aiotools 1.4.0.
The timer test cases using pytest-asyncio 0.6.0 began to fail since 7ec75cf (the travis job log).
It works fine with pytest-asyncio 0.5.0, but we need to check what's happening there.
There seems to be some side-effects of using the default event loop via asyncio.get_event_loop()
and recreating it if once closed, as I didn't touch anything in timer-related codes. The interesting part is that timer tests are failing alternatingly instead of all.
Add a new module called "sync" which provides a set of high-level synchronization primitives not offered in the standard library.
My first idea is AsyncBarrier: when all coroutines that share the same barrier object calls wait()
, then they are all blocked until all of them calls wait()
. If all has called wait()
, resume their execution. Preferably, the API should look like threading.Barrier
.
Let's also write test suites for it.
Ideas for other synchronization primitives are welcome.
The current test suite only use the default event loop.
Let's make the test suite run with uvloop and tokio as well, by adding "build matrix" to the existing Travis configuration and some bootstrap codes for test suite.
The PyPI sdist v1.1.0 doesnt include the LICENSE file.
sys.argv
and environment variables. Add an optional restart callback which is called after terminating workers but before calling user-defined main exit handler.There may be required several design decision on what arguments to pass to callbacks and when to call them. Let's discuss and implement.
Let's add a persistent task group in addition to the current aiotools.TaskGroup
.
While existing TaskGroup
targets to group and ensure termination of a set of tasks with a short period of lifetime (e.g., a single operation that is composed of many async tasks), PersistentTaskGroup
targets to group and ensure termination of a continuously changing set of tasks that are created and terminate through a very long time (e.g., the entire process lifecycle), with proper cancellation and exception handling upon shutdown.
One key difference to TaskGroup
is that it has its own fallback exception handler because gathering the results (and exceptions) of all tasks at once at the point of "exit" as MultiError
is meaningless due to sparsity across time. Custom fallback exception handlers could be implemented using decorators (with functools.wraps()
to keep the function signature consistent with the original coroutine).
To automatically detach terminated tasks from the tracking set, we could use weakref.WeakSet
.
In fact, the above pattern is widely used in most Backend.AI components, and adding PersistentTaskGroup
will reduce boilerplates for Backend.AI.
This will improve stability of test runs.
I've udpated https://build.opensuse.org/package/show/home:jayvdb:py-submit/python-aiotools to v1.5.3. I'm still waiting for the armv7l to start in order to check the status of #24 , but have results back for aarch64, where it fails on Python 3.9, but not Python 3.6.
=================================== FAILURES ===================================
________________________________ test_actxgroup ________________________________
event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
@pytest.mark.asyncio
async def test_actxgroup(event_loop):
# Test basic function.
exit_count = 0
@aiotools.actxmgr
async def ctx(a):
nonlocal exit_count
await asyncio.sleep(0)
yield a + 10
await asyncio.sleep(0)
exit_count += 1
ctxgrp = aiotools.actxgroup()
for i in range(3):
ctxgrp.add(ctx(i))
async with ctxgrp as values:
assert values[0] == 10
assert values[1] == 11
assert values[2] == 12
assert len(ctxgrp._cm_yields) == 3
assert exit_count == 3
assert len(ctxgrp._cm_yields) == 0
returns = ctxgrp.exit_states()
> assert returns[0] is None
E assert False is None
tests/test_context.py:591: AssertionError
______________________ test_actxgroup_exception_from_body ______________________
event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
@pytest.mark.asyncio
async def test_actxgroup_exception_from_body(event_loop):
exit_count = 0
@aiotools.actxmgr
async def ctx(a):
nonlocal exit_count
await asyncio.sleep(0)
yield a
await asyncio.sleep(0)
# yield raises the exception from the context body.
# If not handled, finalization will not be executed.
exit_count += 1
ctxgrp = aiotools.actxgroup([ctx(1), ctx(2)])
try:
async with ctxgrp as values:
assert values[0] == 1
assert values[1] == 2
raise ZeroDivisionError
except Exception as e:
assert isinstance(e, ZeroDivisionError)
exits = ctxgrp.exit_states()
> assert exits[0] is None # __aexit__ are called successfully
E assert False is None
tests/test_context.py:683: AssertionError
Spawning additional subprocesses in the child workers when use_threading=True
because only the main thread can receive signals from child processes.
This line instantiates a child watcher before launching workers, as described in the Python docs.
Currently there is no explicit test case that tests if this actually works. Let's add one.
Let's add:
aiotools.PersistentTaskGroup.all()
.(TODO: write up current discussions)
Currently PersistentTaskGroup immediately cancels all non-finished tasks upon shutdown.
Let's add an option for configurable timeout here, so that the individual tasks may have chances to finish up before being cancelled.
In the standard library there is contextlib.ExitStack
which allows multiple context managers to be applied one inside the other such that a failure of an __enter__
method calls the __exit__
methods.
If this could be implemented in such a way that it would allow for both sync and async context managers to be called; and called inside to out with appropriate error handling.
At the top of the README.md file it says:
NOTE: This project is under early stage of development. The public APIs may break version by version.
...
trio: An alternative implementation of asynchronous IO stack for Python, with focus on cancellation scopes and task groups called "nursery".
Am I right in thinking that this project is now pretty mature and stable? And with task groups now in asyncio for python 3.11, where do you see things standing on the trio / asyncio debate?
Thanks!
Tim
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.