intsights / sergeant Goto Github PK
View Code? Open in Web Editor NEWFast, Safe & Simple Asynchronous Task Queues Written In Pure Python
Home Page: https://intsights.github.io/sergeant/
License: MIT License
Fast, Safe & Simple Asynchronous Task Queues Written In Pure Python
Home Page: https://intsights.github.io/sergeant/
License: MIT License
The built-in logs should start with an upper case letter and end with a ".".
few examples I found:
Files with bad logs that I found:
Examples:
"task has retried" --> "Task has retried."
"worker is starving" --> "Worker is starving."
level and logger_name are an inherent part of the log and shouldn't be nested under 'logging' IMO. https://github.com/Intsights/sergeant/blob/master/sergeant/logging/logstash.py
Implement a test framework for sergeant. Right now, writing tests is a hard task and we should simplify it as much as possible.
When using a threaded worker, for each worker thread there is another killer thread. This creates a thread mass and big overhead.
I will replace this implementation with a polling loop to monitor each thread.
push_task
method can fail when redis/mongo are not available. In that case, self.logger.exception
is invoked - self being a worker instance.
However, the push_task
caller should not assume that worker.logger is initialized, nor exists.
If that's a requirement from each Worker
implementation, it should be enforced in code.
Exception example - sergeant/worker.py in push_task at line 157
...
consumable_from=consumable_from,
)
return True
except Exception as exception:
self.logger.error(
msg=f'could not push task: {exception}',
)
return False
...
This behavior can help workers that decided to stop serving more requests on their finalize/initialize handlers.
Sometimes when a task is in the queue for a long time - is being deprecated.
Maybe we can add the option to set TTL for a key, so the Redis will delete those tasks automatically.
@wavenator
Whenever a worker is popping a task from the queue, if it crushes before it pushing the task back to the queue - the task will be gone.
Maybe we can add an option in the Mongo connector that it will use findAndUpdate
instead of pop
, and we'll pop it from the queue only when the task will be marked as Finished
@wavenator
Hey,
I suggest adding an HTTP server to supervisor, for the purpose of being able to monitor the health status of the worker pool.
An http request will return a status of currently running workers, and perhaps supervisor metadata.
It’s possible to incorporate query string params for what’s considered “healthy” supervisor: I.e no more than N workers are silent for more than T seconds.
Description:
According to the above issue, the problem seems to be because the redis connector is using single_connection_client, which is not thread safe.
As it is currently defined, it is generating the redis object with single_connection_client flag on.
class Connector(
_connector.Connector,
):
name: str = 'redis'
def __init__(
self,
nodes: typing.List[typing.Dict[str, typing.Any]],
) -> None:
self.connections = [
QueueRedis(
host=node['host'],
port=node['port'],
password=node['password'],
db=node['database'],
retry_on_timeout=True,
socket_keepalive=True,
socket_connect_timeout=10,
socket_timeout=60,
single_connection_client=True,
)
That caused the CI of the agent to sporadically fail when testing integration push (which is using sergeant.worker), but only after adding a second task.
using the provided logstash handler, extra.task is logged as follows:
extra.task: Task(kwargs={}, date=1590487633, run_count=0)
Instead of a desired dict
Currently, the connectors pass a dictionary which translates to the constructors init parameters. I'd like to change this behavior to pass URI to support customization.
Maybe we need to send the original exception into the handle_max_retries function in worker.py and write it to log as well - for monitoring causes.
When a specific task reaches max retries, the task is forever gone.
I want to save failed tasks and their state in order to re-execute them when I fix the bug that causes the task to fail.
My main goal is to make sure that all of my tasks won't get lost and will be executed eventually.
Thanks, @wavenator
When I try to install Sergeant package the installation process is failing because of hiredis
package:
full log:
intsights@barad-laptop ~> pip3 install sergeant
Defaulting to user installation because normal site-packages is not writeable
Collecting sergeant
Using cached sergeant-0.19.0-py3-none-any.whl
Collecting hiredis==1.*
Using cached hiredis-1.1.0.tar.gz (54 kB)
Requirement already satisfied: msgpack==1.* in ./.local/lib/python3.9/site-packages (from sergeant) (1.0.2)
Requirement already satisfied: redis==3.* in ./.local/lib/python3.9/site-packages (from sergeant) (3.5.3)
Requirement already satisfied: psutil==5.* in /usr/lib64/python3.9/site-packages (from sergeant) (5.7.2)
Requirement already satisfied: pymongo==3.* in ./.local/lib/python3.9/site-packages (from sergeant) (3.11.3)
Requirement already satisfied: orjson==3.* in ./.local/lib/python3.9/site-packages (from sergeant) (3.4.8)
Building wheels for collected packages: hiredis
Building wheel for hiredis (setup.py) ... error
ERROR: Command errored out with exit status 1:
command: /usr/bin/python3 -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/setup.py'"'"'; __file__='"'"'/tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' bdist_wheel -d /tmp/pip-wheel-n9gwh88z
cwd: /tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/
Complete output (23 lines):
/tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/setup.py:7: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
import sys, imp, os, glob, io
running bdist_wheel
running build
running build_py
creating build
creating build/lib.linux-x86_64-3.9
creating build/lib.linux-x86_64-3.9/hiredis
copying hiredis/version.py -> build/lib.linux-x86_64-3.9/hiredis
copying hiredis/__init__.py -> build/lib.linux-x86_64-3.9/hiredis
running build_ext
building 'hiredis.hiredis' extension
creating build/temp.linux-x86_64-3.9
creating build/temp.linux-x86_64-3.9/src
creating build/temp.linux-x86_64-3.9/vendor
creating build/temp.linux-x86_64-3.9/vendor/hiredis
gcc -pthread -Wno-unused-result -Wsign-compare -DDYNAMIC_ANNOTATIONS_ENABLED=1 -DNDEBUG -O2 -fexceptions -g -grecord-gcc-switches -pipe -Wall -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -Wp,-D_GLIBCXX_ASSERTIONS -fstack-protector-strong -m64 -mtune=generic -fasynchronous-unwind-tables -fstack-clash-protection -fcf-protection -D_GNU_SOURCE -fPIC -fwrapv -O2 -fexceptions -g -grecord-gcc-switches -pipe -Wall -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -Wp,-D_GLIBCXX_ASSERTIONS -fstack-protector-strong -m64 -mtune=generic -fasynchronous-unwind-tables -fstack-clash-protection -fcf-protection -D_GNU_SOURCE -fPIC -fwrapv -O2 -fexceptions -g -grecord-gcc-switches -pipe -Wall -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -Wp,-D_GLIBCXX_ASSERTIONS -fstack-protector-strong -m64 -mtune=generic -fasynchronous-unwind-tables -fstack-clash-protection -fcf-protection -D_GNU_SOURCE -fPIC -fwrapv -fPIC -Ivendor -I/usr/include/python3.9 -c src/hiredis.c -o build/temp.linux-x86_64-3.9/src/hiredis.o
In file included from src/hiredis.c:1:
src/hiredis.h:4:10: fatal error: Python.h: No such file or directory
4 | #include <Python.h>
| ^~~~~~~~~~
compilation terminated.
error: command '/usr/bin/gcc' failed with exit code 1
----------------------------------------
ERROR: Failed building wheel for hiredis
Running setup.py clean for hiredis
Failed to build hiredis
Installing collected packages: hiredis, sergeant
Running setup.py install for hiredis ... error
ERROR: Command errored out with exit status 1:
command: /usr/bin/python3 -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/setup.py'"'"'; __file__='"'"'/tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /tmp/pip-record-p49fix7q/install-record.txt --single-version-externally-managed --user --prefix= --compile --install-headers /home/intsights/.local/include/python3.9/hiredis
cwd: /tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/
Complete output (23 lines):
/tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/setup.py:7: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
import sys, imp, os, glob, io
running install
running build
running build_py
creating build
creating build/lib.linux-x86_64-3.9
creating build/lib.linux-x86_64-3.9/hiredis
copying hiredis/version.py -> build/lib.linux-x86_64-3.9/hiredis
copying hiredis/__init__.py -> build/lib.linux-x86_64-3.9/hiredis
running build_ext
building 'hiredis.hiredis' extension
creating build/temp.linux-x86_64-3.9
creating build/temp.linux-x86_64-3.9/src
creating build/temp.linux-x86_64-3.9/vendor
creating build/temp.linux-x86_64-3.9/vendor/hiredis
gcc -pthread -Wno-unused-result -Wsign-compare -DDYNAMIC_ANNOTATIONS_ENABLED=1 -DNDEBUG -O2 -fexceptions -g -grecord-gcc-switches -pipe -Wall -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -Wp,-D_GLIBCXX_ASSERTIONS -fstack-protector-strong -m64 -mtune=generic -fasynchronous-unwind-tables -fstack-clash-protection -fcf-protection -D_GNU_SOURCE -fPIC -fwrapv -O2 -fexceptions -g -grecord-gcc-switches -pipe -Wall -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -Wp,-D_GLIBCXX_ASSERTIONS -fstack-protector-strong -m64 -mtune=generic -fasynchronous-unwind-tables -fstack-clash-protection -fcf-protection -D_GNU_SOURCE -fPIC -fwrapv -O2 -fexceptions -g -grecord-gcc-switches -pipe -Wall -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -Wp,-D_GLIBCXX_ASSERTIONS -fstack-protector-strong -m64 -mtune=generic -fasynchronous-unwind-tables -fstack-clash-protection -fcf-protection -D_GNU_SOURCE -fPIC -fwrapv -fPIC -Ivendor -I/usr/include/python3.9 -c src/hiredis.c -o build/temp.linux-x86_64-3.9/src/hiredis.o
In file included from src/hiredis.c:1:
src/hiredis.h:4:10: fatal error: Python.h: No such file or directory
4 | #include <Python.h>
| ^~~~~~~~~~
compilation terminated.
error: command '/usr/bin/gcc' failed with exit code 1
----------------------------------------
ERROR: Command errored out with exit status 1: /usr/bin/python3 -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/setup.py'"'"'; __file__='"'"'/tmp/pip-install-izq_yfv0/hiredis_d1f5e5385e7b487ba634ee1c399d0dbb/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /tmp/pip-record-p49fix7q/install-record.txt --single-version-externally-managed --user --prefix= --compile --install-headers /home/intsights/.local/include/python3.9/hiredis Check the logs for full command output.
When someone puts an exception catching block inside the work method, it might catch WorkerTimedout exceptions too.
More readable
We need to adding to the sergeant worker base class another logstash handler over udp to enable the option of async logging way.
in certain cases, worker.get_summary() returns None
when that happens and the return code is 1, an exception is thrown. =>
--
Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/sergeant/supervisor.py", line 157, in supervise_loop self.supervise_worker( File "/usr/local/lib/python3.9/site-packages/sergeant/supervisor.py", line 234, in supervise_worker msg=f'worker({worker.process.pid}) execution has failed with the following exception: {extra_signature["summary"].get("exception")}', AttributeError: 'NoneType' object has no attribute 'get'
|
it should return an empty dict instead
The concept of Executors has found to be hard to understand from the point of the implementer. I think in the upcoming version I will remove the "Executor" config and replace it with "number_of_threads" where the default is one, and any number higher than one will result in using a threaded executor in favor of the serial. This way it will be easier to understand and error prune.
Currently every time a worker enters on_failure
it writes an error log with message "task has failed".
If I set my worker config to avoid that, like this:
logging=sergeant.config.Logging(
events=sergeant.config.LoggingEvents(
on_failure=False,
),
),
then I don't get any logs at all.
A desired behavior for me is to be able to log WARNING on some exceptions without having always an ERROR log with it.
It would improve logs clarity and will let us write cleaner elastalert rules.
We need a new local connector which will help with writing and testing faster. This local connector will be made with a local queue object rather than some client.
We've seen during the years many problems with logging while working inside a docker container where it could not flush the stdout/stderr buffer. Nonetheless, we also hate the fact that you can not disable logging to the stdout in a reasonable way. I will replace the logging implementation with loguru in the upcoming version.
Right now, the executors are being informed of timeouts with SIGTERM. That leaves no place for infrastructures such as kubernetes or other cloud services to ask for a graceful shutdown which mostly done with a SIGTERM.
It would be great to have the option to have a flag that will define whether to use regular queue or set queue, which will allow to avoid duplication
@wavenator
Following our discussion in the PR,
Redis has presented a new count
parameter for both lpop
and rpop
commands. We are currently using a combination of lrange
and ltrim
to implement the same behavior. We should migrate to using the new parameter.
https://redis.io/commands/rpop
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.