aiokitchen / aiomisc Goto Github PK
View Code? Open in Web Editor NEWaiomisc - miscellaneous utils for asyncio
License: MIT License
aiomisc - miscellaneous utils for asyncio
License: MIT License
Доброго времени суток!
https://github.com/aiokitchen/aiomisc#quick-start
import argparse
import asyncio
import os
import logging
from aiomisc import entrypoint
....
parser.add_argument(
"--log-format", help="Log format",
default=os.getenv('LOG_FORMAT', 'color'),
**choices=aiomisc.log.LogFormat.choices(),** <----- тут используется aiomisc а сам модуль не импортировано, отсюда после запуска получаем _NameError: name 'aiomisc' is not defined_
metavar='LOG_FORMAT',
)
...
Hi ALl
would be possible to improve the example code with an UDPServer and client like the RPCServer?
Thanks in advance for your help
I'm using aiohttp-s3-client and I'm getting pkg_resource not found.
I could give in and install setuptools, or set the AIOMISC_NO_PLUGINS variables, but I consider the automatic probing for plugins to be unnecessary. It seems that PLUGINS are specifically tied to aiomisc.entrypoint() API, which aiohttp-s3-client doesn't use.
maybe aiomisc.entrypoint.entrypoint can run setup_plugins() before returning Entrypoint, that setup_plugins doesn't need to be called on import.
aiomisc==17.3.23
Python 3.11.5
import asyncio
import logging
from typing import Any
import aiomisc
class SomeService(aiomisc.Service):
async def start(self) -> Any:
self.start_event.set()
async with asyncio.TaskGroup() as tg:
tg.create_task(self.periodic_work())
async def periodic_work(self):
print('enter')
raise ValueError("123")
logging.basicConfig()
try:
with aiomisc.entrypoint(
SomeService(),
log_level="info",
log_format="color",
) as loop:
loop.run_forever()
except KeyboardInterrupt as e:
pass
finally:
logging.shutdown()
I don't use get_unused_port directly but always see this warning
Hi, everyone!
I need a set plain-text format with an ordered key sequence for my application. How I can properly change the default logging template or apply my own log-handler to asyncio.entrypoint
?
Thanks!
Hi, thank you for great library!
What is the idea of writing the same values to the log under different keys, if we have already set new names for these fields?
Maybe we should iterate over record_dict - FIELD_MAPPING.keys()
?
Добрый день!
Нам приходится работать исключительно на debian9 и astra linux 1.6 - к обоим дистрибутивам "прибита гвоздями" версия python 3.5.3 (к астре так точно прибита). В этой версии python не выполняется:
from typing import AsyncContextManager
(AsyncContextManager появился в typing вроде только с версии 3.5.4)
Сами вопрос сейчас решаем патчем такого вида:
--- a/aiomisc/pool.py
+++ b/aiomisc/pool.py
@@ -3,7 +3,7 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from random import random
-from typing import AsyncContextManager
+from typing_extensions import AsyncContextManager
from .utils import cancel_tasks
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1,3 @@
colorlog
prettylog~=0.3.0
+typing_extensions>=3.6.5
Забираем AsyncContextManager из стороннего пакета typing_extensions
С уважением, Александр.
Can I safely use undocumented features, such as aiomisc.log.wrap.wrap_logging_handler()
in production? Is it a stable API with just a poor documentation coverage, or is it just internal stuff that might change at any time?
Any idea why I am not seeing the ValueError?
class IndefiniteService(aiomisc.Service):
async def start(self):
self.start_event.set()
while True:
await asyncio.sleep(1)
class ExceptionService(aiomisc.Service):
async def start(self):
self.start_event.set()
await asyncio.sleep(1)
print("raise exception")
raise ValueError("123")
try:
with aiomisc.entrypoint(
IndefiniteService(),
ExceptionService(),
log_level="info",
log_format="color",
) as loop:
loop.run_forever()
except KeyboardInterrupt as e:
pass
finally:
logging.shutdown()
Output:
raise exception
<--- waited for like 5 seconds, and stoped program
stop
2023-09-11 20:08:27 [T:MainThread] WARNING:aiomisc.entrypoint: Interrupt signal received, shutting down...
Hi,
I found this library extremely useful, thank you for the awesome work.
However, when I used the @threaded(_separate)?
decorators in my typed codebase, the wrapped functions are typed as just Callable
and the original type signatures are also lost. I get no type inference in my editor and my static type checker which will make things like refactors much more manually intensive process.
I noticed that you use functools.wraps
for your decorators and this is a long standing issue for it. It does not preserve function signatures and type information. That's why libraries like wrapt
were created to try to address this issue.
Do you think it would be possible to use wrapt
instead of functools.wraps
to create a better user experience for users who rely on type information? So what I'm trying to ask is, to what degree is user experience for users with typing important to this project? Because if it is important, it would make sense to use a library like wrapt
.
Thank you
import aiomisc
import asyncio
class TestService(aiomisc.Service):
async def start(self):
# Service is ready
self.start_event.set()
while True:
await asyncio.sleep(1)
with aiomisc.entrypoint(
TestService(),
log_level="info",
log_format="color"
) as loop:
loop.run_forever()
Stoping service in Pycharm leads to:
2023-07-13 16:29:13 [T:MainThread] ERROR:asyncio.unhandled: Exception in callback <built-in method set_result of _asyncio.Future object at 0x7fa78859e860>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
asyncio.exceptions.InvalidStateError: invalid state
2023-07-13 16:29:13 [T:MainThread] WARNING:aiomisc.entrypoint: Interrupt signal received, shutting down...
Seems like it's not leading to some breaking behaviour, but just curious, can I fix it somehow.
Currently there is a release mismatch across the distribution channels:
It would really be appreciated if the releases are in sync. Especially the source and the GitHub tags otherwise it would be tricky if distributions need to patch something.
Thanks
Noticed something strange in the following setting (tests fail on teardown):
entrypoint
;pytest_plugins = (
'aiomisc',
)
@pytest.fixture(scope='session')
def loop():
with entrypoint() as loop:
yield loop
@pytest.fixture(scope='session')
async def fixture():
yield 123
async def test_1(fixture):
pass
async def test_2():
pass
Gives
Close <_UnixSelectorEventLoop running=False closed=False debug=True>
def finalizer(): # type: ignore
try:
> return event_loop.run_until_complete(gen.__anext__())
/opt/miniconda3/envs/impulse/lib/python3.9/site-packages/aiomisc_pytest/pytest_plugin.py:462:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/miniconda3/envs/impulse/lib/python3.9/asyncio/base_events.py:617: in run_until_complete
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=True>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
/opt/miniconda3/envs/impulse/lib/python3.9/asyncio/base_events.py:510: RuntimeError
Any of the following changes makes it work without errors:
session
with any other scope;yield
with return
in the fixture;test_2
;entrypoint
with yield new_event_loop()
in the loop fixture;loop
to the fixture explicitly.Not sure it is aiomisc
specific question...
What is the proper way for me to set up the color of the logs while using aiomisc
?
Currently, I am just adding here "INFO": "white", "WARNING": "yellow,bold"
, because by default they are red for some reason, and my eyes are bleeding. But I am sure there is better way.
aiomisc/aiomisc_log/formatter/color.py
Line 43 in 34b5768
Hi All
i have the following client TCP
class MinosBaseClient:
__slots__ = 'reader', 'writer', 'futures', 'loop', 'reader_task', '_serial'
HEADER = struct.Struct(">I")
def __init__(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
loop: asyncio.AbstractEventLoop = None):
self.reader = reader
self.writer = writer
self.futures = {}
self._serial = None
self.loop = loop or asyncio.get_event_loop()
self.reader_task = self.loop.create_task(self._response_reader())
async def _response_reader(self):
try:
while True:
log.debug("MRPC Client: Response Received")
body_size = self.HEADER.unpack(
await self.reader.readexactly(self.HEADER.size)
)[0]
log.debug(f"MRPC Client: received data size of {body_size}")
response: MinosRPCResponse = MinosResponse.load(
await self.reader.readexactly(body_size),
MinosRPCResponse)
future = self.futures.pop(response.id, None)
if future is None:
continue
future.set_result(response)
finally:
while self.futures:
_, future = self.futures.popitem()
if future.done():
continue
log.debug("MRPC Client: Set error")
future.set_exception(ConnectionAbortedError)
async def close(self):
"""
close the connection with the server
"""
log.debug("Client: Closing connection")
self.writer.write(self.HEADER.pack(0))
self.reader_task.cancel()
await asyncio.gather(self.reader_task, return_exceptions=True)
self.loop.call_soon(self.writer.close)
self.writer.write_eof()
self.writer.close()
def send_headers(self, path: str):
"""
Send headers before all
"""
log.debug("MRPC Client: Send Headers")
header_request: MinosRPCHeadersRequest = MinosRequest.build(MinosRPCHeadersRequest).addAction(path)
header_bytes: bytes = header_request.binary
log.debug("MRCP Client: Send headers with ID: %d", header_request.id)
self._serial = header_request.id
self._send_bytes(header_bytes)
log.debug("MRPC Client: Headers, sent")
def send_body(self, message: t.Any = None):
log.debug("MRPC Client: Send Body")
if message:
body_request: MinosRPCBodyRequest = MinosRequest.build(MinosRPCBodyRequest).addId(self._serial) \
.addBody(message)
else:
body_request: MinosRPCBodyRequest = MinosRequest.build(MinosRPCBodyRequest).addId(self._serial)
content_bytes: bytes = body_request.binary
log.debug("MRCP Client: Send Body with ID: %d", body_request.id)
self._send_bytes(content_bytes)
log.debug("Body Sent")
def send_close(self):
...
def _send_bytes(self, data: bytes):
with io.BytesIO() as f:
f.write(self.HEADER.pack(len(data)))
f.write(data)
self.writer.write(f.getvalue())
def send(self, path: str, message: t.Any = None, **kwargs):
self.send_headers(path)
self.futures[self._serial] = self.loop.create_future()
self.send_body(message)
self.send_close()
return self.futures[self._serial]
Is a lot of code but mainly is the same code used for TCP client in the example folder.
The server service is a bit more complex but i have tested the server and the client outside the pytest environment and work well.
So, i have defined the following test
@pytest.fixture
def config():
return {
"controller": "tests.controllers.RootController"
}
@pytest.fixture
def services(config):
return [
MinosRPCServer(address='localhost', port=8900, conf=config)
]
@pytest.fixture
async def service_client() -> MinosBaseClient:
reader, writer = await asyncio.open_connection(
'localhost', 8900
)
client_connector = MinosBaseClient(reader, writer)
try:
yield client_connector
finally:
await client_connector.close()
async def test_microservice_mrpc_client(service_client):
result = await service_client.send("without_arg")
assert True == False
The assert is to have a better veiw of the logs and the error.
when i start the following code i get the following error:
async def test_microservice_mrpc_client(service_client):
> result = await service_client.send("without_arg")
E RuntimeError: Task <Task pending name='Task-61' coro=<test_microservice_mrpc_client() running at /Users/xxxx/PycharmProjects/minos_microservice_test/venv/lib/python3.9/site-packages/aiomisc_pytest/pytest_plugin.py:520> cb=[run_until_complete.<locals>.done_cb()]> got Future <Future pending> attached to a different loop
Have something that is wrong in my code, because i have reviewed everithing and seems that all is fine.
thanks in advance for your help
Currently aiomisc depends on the optional dependency raven
which however is obsoleted by sentry-sdk
. would be good to convert to convert away from the unmaintained dependency to a maintained one.
It would be very helpful if you could tag releases as well again. This would enable distributions who want to fetch the source from GitHub instead of PyPI.
Thanks
Is this an expected behavior that the program does finish?
aiomisc 17.3.2
Conda Python 3.9.17
Apple M1 Max
Example from doc
import asyncio
from random import random, randint
from aiomisc import entrypoint, get_context, Service
class LoggingService(Service):
async def start(self):
context = get_context()
wait_time = await context['wait_time']
print('Wait time is', wait_time)
self.start_event.set()
while True:
print('Hello from service', self.name)
await asyncio.sleep(wait_time)
class RemoteConfiguration(Service):
async def start(self):
# querying from remote server
await asyncio.sleep(random())
self.context['wait_time'] = randint(1, 5)
services = (
LoggingService(name='#1'),
LoggingService(name='#2'),
LoggingService(name='#3'),
RemoteConfiguration()
)
with entrypoint(*services) as loop:
pass
Console output
Intel MKL WARNING: Support of Intel(R) Streaming SIMD Extensions 4.2 (Intel(R) SSE4.2) enabled only processors has been deprecated. Intel oneAPI Math Kernel Library 2025.0 will require Intel(R) Advanced Vector Extensions (Intel(R) AVX) instructions.
Intel MKL WARNING: Support of Intel(R) Streaming SIMD Extensions 4.2 (Intel(R) SSE4.2) enabled only processors has been deprecated. Intel oneAPI Math Kernel Library 2025.0 will require Intel(R) Advanced Vector Extensions (Intel(R) AVX) instructions.
Wait time is 4
Hello from service #1
Wait time is 4
Hello from service #3
Wait time is 4
Hello from service #2
Process finished with exit code 0
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.