Giter VIP home page Giter VIP logo

aiotools's People

Contributors

achimnol avatar banzaiman avatar bdowning avatar greenyant avatar pre-commit-ci[bot] avatar samskiter avatar srevinsaju avatar youknowone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

aiotools's Issues

Update test cases to work with multiprocessing start_method = "spawn"

Currently the following 4 cases are failing on macOS since Python 3.8 as multiprocessing's default start method has changed "spawn":

  • tests/test_server.py:414 test_server_user_main_custom_stop_signals
  • tests/test_server.py:450 test_server_user_main_tuple
  • tests/test_server.py:508 test_server_extra_proc
  • tests/test_server.py:543 test_server_extra_proc_custom_stop_signal

To make them working, we need to allow passing extra arguments to the user-provided main functions and rewrite the tests to use module-defined inner main functions so that those could be picklable.

test_fork_signal_fallback fails on armv7l

=================================== FAILURES ===================================
__________________________ test_fork_signal_fallback ___________________________

    @pytest.mark.asyncio
    async def test_fork_signal_fallback():
        with mock.patch.object(
            fork_mod, '_has_pidfd', False,
        ):
>           await _do_test_fork_signal()

tests/test_fork.py:147: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    async def _do_test_fork_signal():
    
        def child():
            try:
                time.sleep(10)
            except KeyboardInterrupt:
                return 101
            return 100
    
        os.setpgrp()
        proc = await afork(child)
        assert proc._pid > 0
        if isinstance(proc, PidfdChildProcess):
            assert proc._pidfd > 0
        proc.send_signal(signal.SIGINT)
        ret = await proc.wait()
>       assert ret == 101
E       assert 254 == 101
E         +254
E         -101

Rare deadlock upon shutdown

Here is a sample gdb stack trace.

Thread 1:

Traceback (most recent call first):
  <built-in method acquire of _thread.lock object at remote 0x7f0e5dc63198>
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/aiotools/server.py", line 618, in start_server
    child.join()
  File "/home/joongi/backend.ai-dev/agent-1909/src/ai/backend/agent/server.py", line 674, in main
    use_threading=True, args=(cfg, ))
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 1114, in invoke
    return Command.invoke(self, ctx)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/home/joongi/backend.ai-dev/agent-1909/src/ai/backend/agent/server.py", line 687, in <module>
    sys.exit(main())
  <built-in method exec of module object at remote 0x7f0e6b435638>
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)

Thread 2 to 7:

Traceback (most recent call first):
  <built-in method close of Loop object at remote 0x7f0e580015e8>
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/aiotools/server.py", line 307, in _worker_main
    loop.close()
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()

Thread 8:

Traceback (most recent call first):
  Waiting for the GIL

Extra logger subprocess:

Traceback (most recent call first):
  <built-in method read of module object at remote 0x7f0e6b39ce08>
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/home/joongi/backend.ai-dev/common-1909/src/ai/backend/common/logging.py", line 188, in log_worker
    rec = log_queue.get()
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch
    code = process_obj._bootstrap()
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/context.py", line 277, in _Popen
    return Popen(process_obj)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/home/joongi/backend.ai-dev/common-1909/src/ai/backend/common/logging.py", line 302, in __enter__
    self.proc.start()
  File "/home/joongi/backend.ai-dev/agent-1909/src/ai/backend/agent/server.py", line 660, in main
    with logger:
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 1114, in invoke
    return Command.invoke(self, ctx)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/home/joongi/.pyenv/versions/venv-m4dm81nc-agent/lib/python3.6/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/home/joongi/backend.ai-dev/agent-1909/src/ai/backend/agent/server.py", line 687, in <module>
    sys.exit(main())
  <built-in method exec of module object at remote 0x7f0e6b435638>
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/joongi/.pyenv/versions/3.6.7/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)

Eventloop get stuck when VirtualClock used with real timestamps

When using real timestamps for like 1709591365, the python event loop gets stuck forever in await:

import pytest
import asyncio
from aiotools import VirtualClock

@pytest.mark.asyncio
async def test_stuck_forever():
  clock = VirtualClock()
  clock.vtime = 1709591365
  with lock.patch_loop():
    await asyncio.sleep(1)

This is due to a behavior (bug?) in the BaseEventLoop python/cpython#116342 .

The way I was able to solve it, was to override the eventloop _clock_resolution to 0.1 . Perhaps this should be added to the patch_loop implementation?

`TaskGroup` propagates `CancelledError` rather than a `MultiError` on exception

See the following snippet:

import asyncio
import aiotools

async def error():
    await asyncio.sleep(1)
    raise Exception("bad stuff")

async def main():
    try:
        async with aiotools.TaskGroup() as tg:
            tg.create_task(error())
            await asyncio.sleep(5)
    except Exception as e:
        print(f"Got inner {repr(e)}")
        raise

try:
    asyncio.run(main())
except BaseException as e:
    print(f"Got outer {repr(e)}")
Got outer CancelledError()

Is this expected behavior? It feels wrong, since a) it ate an exception and b) the task wasn't actually cancelled.

This is Python 3.9 and aiotools 1.4.0.

Fix regression in timer test cases with latest pytest-asyncio

pytest-dev/pytest-asyncio#54

The timer test cases using pytest-asyncio 0.6.0 began to fail since 7ec75cf (the travis job log).
It works fine with pytest-asyncio 0.5.0, but we need to check what's happening there.

There seems to be some side-effects of using the default event loop via asyncio.get_event_loop() and recreating it if once closed, as I didn't touch anything in timer-related codes. The interesting part is that timer tests are failing alternatingly instead of all.

New module: sync

Add a new module called "sync" which provides a set of high-level synchronization primitives not offered in the standard library.

My first idea is AsyncBarrier: when all coroutines that share the same barrier object calls wait(), then they are all blocked until all of them calls wait(). If all has called wait(), resume their execution. Preferably, the API should look like threading.Barrier.
Let's also write test suites for it.

Ideas for other synchronization primitives are welcome.

server: Add uvloop/tokio to test matrix

The current test suite only use the default event loop.
Let's make the test suite run with uvloop and tokio as well, by adding "build matrix" to the existing Travis configuration and some bootstrap codes for test suite.

server: graceful reload and restart via SIGHUP/SIGUSR1 signals

  • Graceful reload: when the main program receives SIGUSR1, let it shutdown all workers and extra processes, call a restart callback, and re-spawn all workers and extra processes again. A restart callback is intended to read the (changed) configuration again, which is written by the user.
  • Graceful restart: when the main program receives SIGHUP, let it terminate completely and re-execute the whole process again, preferably with the same sys.argv and environment variables. Add an optional restart callback which is called after terminating workers but before calling user-defined main exit handler.

There may be required several design decision on what arguments to pass to callbacks and when to call them. Let's discuss and implement.

Persistent Task Group

Let's add a persistent task group in addition to the current aiotools.TaskGroup.

While existing TaskGroup targets to group and ensure termination of a set of tasks with a short period of lifetime (e.g., a single operation that is composed of many async tasks), PersistentTaskGroup targets to group and ensure termination of a continuously changing set of tasks that are created and terminate through a very long time (e.g., the entire process lifecycle), with proper cancellation and exception handling upon shutdown.

One key difference to TaskGroup is that it has its own fallback exception handler because gathering the results (and exceptions) of all tasks at once at the point of "exit" as MultiError is meaningless due to sparsity across time. Custom fallback exception handlers could be implemented using decorators (with functools.wraps() to keep the function signature consistent with the original coroutine).

To automatically detach terminated tasks from the tracking set, we could use weakref.WeakSet.

In fact, the above pattern is widely used in most Backend.AI components, and adding PersistentTaskGroup will reduce boilerplates for Backend.AI.

Two test_actxgroup failures on ARM aarch64

I've udpated https://build.opensuse.org/package/show/home:jayvdb:py-submit/python-aiotools to v1.5.3. I'm still waiting for the armv7l to start in order to check the status of #24 , but have results back for aarch64, where it fails on Python 3.9, but not Python 3.6.

=================================== FAILURES ===================================
________________________________ test_actxgroup ________________________________

event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>

    @pytest.mark.asyncio
    async def test_actxgroup(event_loop):
    
        # Test basic function.
        exit_count = 0
    
        @aiotools.actxmgr
        async def ctx(a):
            nonlocal exit_count
            await asyncio.sleep(0)
            yield a + 10
            await asyncio.sleep(0)
            exit_count += 1
    
        ctxgrp = aiotools.actxgroup()
    
        for i in range(3):
            ctxgrp.add(ctx(i))
    
        async with ctxgrp as values:
            assert values[0] == 10
            assert values[1] == 11
            assert values[2] == 12
            assert len(ctxgrp._cm_yields) == 3
    
        assert exit_count == 3
        assert len(ctxgrp._cm_yields) == 0
        returns = ctxgrp.exit_states()
>       assert returns[0] is None
E       assert False is None

tests/test_context.py:591: AssertionError
______________________ test_actxgroup_exception_from_body ______________________

event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>

    @pytest.mark.asyncio
    async def test_actxgroup_exception_from_body(event_loop):
    
        exit_count = 0
    
        @aiotools.actxmgr
        async def ctx(a):
            nonlocal exit_count
            await asyncio.sleep(0)
            yield a
            await asyncio.sleep(0)
            # yield raises the exception from the context body.
            # If not handled, finalization will not be executed.
            exit_count += 1
    
        ctxgrp = aiotools.actxgroup([ctx(1), ctx(2)])
    
        try:
            async with ctxgrp as values:
                assert values[0] == 1
                assert values[1] == 2
                raise ZeroDivisionError
        except Exception as e:
            assert isinstance(e, ZeroDivisionError)
    
        exits = ctxgrp.exit_states()
>       assert exits[0] is None  # __aexit__ are called successfully
E       assert False is None

tests/test_context.py:683: AssertionError

server: Write test for spawning subprocesses in workers

Spawning additional subprocesses in the child workers when use_threading=True because only the main thread can receive signals from child processes.
This line instantiates a child watcher before launching workers, as described in the Python docs.

Currently there is no explicit test case that tests if this actually works. Let's add one.

Add shutdown timeout to PersistentTaskGroup

Currently PersistentTaskGroup immediately cancels all non-finished tasks upon shutdown.
Let's add an option for configurable timeout here, so that the individual tasks may have chances to finish up before being cancelled.

Implement an async exit stack for composing async context managers

In the standard library there is contextlib.ExitStack which allows multiple context managers to be applied one inside the other such that a failure of an __enter__ method calls the __exit__ methods.

If this could be implemented in such a way that it would allow for both sync and async context managers to be called; and called inside to out with appropriate error handling.

Are the notes at the top of README.md still accurate?

At the top of the README.md file it says:

NOTE: This project is under early stage of development. The public APIs may break version by version.
...
trio: An alternative implementation of asynchronous IO stack for Python, with focus on cancellation scopes and task groups called "nursery".

Am I right in thinking that this project is now pretty mature and stable? And with task groups now in asyncio for python 3.11, where do you see things standing on the trio / asyncio debate?

Thanks!
Tim

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.