aio-libs-abandoned / aioredis-py Goto Github PK
View Code? Open in Web Editor NEWasyncio (PEP 3156) Redis support
Home Page: https://aioredis.readthedocs.io/
License: MIT License
asyncio (PEP 3156) Redis support
Home Page: https://aioredis.readthedocs.io/
License: MIT License
I want to help this project but I don't know what are the features that you've in mind! There is some way to we talk or list that describe what must be done?
and docstrings
Right now it is a bit cumbersome to create a pool of reconnecting connections, this is due to
https://github.com/aio-libs/aioredis/blob/3a69dc876338207c4605439319703427c2614738/aioredis/pool.py#L169
This could be improved
In functions create_connection
, create_pool
and create_redis
db
and password
arguments should be made keyword-only.
There should be only one positional argument -- address
.
This will protect from specifying host/port address as separate arguments (not tuple):
# protect from this
yield from create_connection(host, port)
When I use channel.wait_message()
I'm losing pub/sub messages.
It's easy to replicate this bug. Run in one terminal window subscribing script:
import aioredis
import asyncio
@asyncio.coroutine
def main():
red = yield from aioredis.create_redis(('localhost', 6379))
channel, = yield from red.subscribe('foo')
num = 0
while (yield from channel.wait_message()):
msg = yield from channel.get()
print(num, msg)
num += 1
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
In second one terminal window run publishing script:
import aioredis
import asyncio
@asyncio.coroutine
def main():
red = yield from aioredis.create_redis(('localhost', 6379))
for i in range(100000):
yield from red.publish('foo', i)
print(i)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Publishing script will finish with 99999
but subscribing script with much less number (e.g. 80340
).
If you change while (yield from channel.wait_message()):
to while True:
in subscribing script it will finish with 99999
without any problem.
Python 3.4.3
OS X 10.10.3
Decoding given array is useful feature. What do you think about?
I get the following exception:
File "/work/grinder/web/query.py", line 45, in set_task
yield from self.redis.set(key, value)
File "/nix/store/a46yqr9lzi4ggk3iijm819c0a9hv8hb7-python3-3.4.1/lib/python3.4/asyncio/futures.py", line 348, in __iter__
yield self # This tells Task to wait for completion.
File "/nix/store/a46yqr9lzi4ggk3iijm819c0a9hv8hb7-python3-3.4.1/lib/python3.4/asyncio/tasks.py", line 370, in _wakeup
value = future.result()
File "/nix/store/a46yqr9lzi4ggk3iijm819c0a9hv8hb7-python3-3.4.1/lib/python3.4/asyncio/futures.py", line 243, in result
raise self._exception
aioredis.errors.ReplyError: ERR Protocol error: invalid bulk length
When the value
is None
. This is what iis going through the network:
"*3\r\n$3\r\nSET\r\n$53\r\nsqltask-d54bd60ee28f207c22d032452a51ad48094aa510-task\r\n$-1\r\n"
And it clobbers the connection.
Exposing RedisConnection.execute
method to end-user (as low-level interface ofcourse) makes it possible to change db index manually thus bypassing select
method which stores db index in connection.
Problem becomes even more dramatic in case of RedisPool
ex:
with (yield from pool) as conn:
conn.select(other_db)
# connection released and points to `other_db`; not reusable.
In case of pool -- connection can be closed and dropped;
in case of .execute
-- TBD (the only solution yet is to hard-code check for 'select' command)
For connection, pool and commands.Redis
.wait_closed()
should wait for finishing RedisConnection._reader_task
.
conn._reader_task.cancel()
schedules task cancellation but asyncio needs eventloop iteration for actual cancelling.
Under Python 3.5, asyncio.async
is deprecated. If any codes use asyncio.async
, a DeprecatedWarning will be printed.
/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:509: DeprecationWarning: asyncio.async() function is deprecated, use ensure_future()
Python 3.5 suggests that asyncio.async
should be replaced by asyncio.ensure_future
.
One possible solution:
try:
from asyncio import ensure_future as asyncio_ensure_future
except ImportError:
from asyncio import async as asyncio_ensure_future
Then, use asyncio_ensure_future
to replace all the asyncio.async
in the codes.
Traceback is following on each request to redis:
if (yield from self.redis.exists(self._result_key())):
File "/lib/python3.4/site-packages/aioredis/util.py", line 50, in wait_convert
result = yield from fut
File "/lib/python3.4/site-packages/aioredis/commands/__init__.py", line 35, in execute
return (yield from conn.execute(*args, **kwargs))
File "/nix/store/ymsrnz9fkcj4lcmyln5by9mab5r0h3zl-python3-3.4.2/lib/python3.4/asyncio/futures.py", line 388, in __iter__
yield self # This tells Task to wait for completion.
File "/nix/store/ymsrnz9fkcj4lcmyln5by9mab5r0h3zl-python3-3.4.2/lib/python3.4/asyncio/tasks.py", line 285, in _wakeup
value = future.result()
File "/nix/store/ymsrnz9fkcj4lcmyln5by9mab5r0h3zl-python3-3.4.2/lib/python3.4/asyncio/futures.py", line 269, in result
raise CancelledError
concurrent.futures._base.CancelledError
Version: v0.1.4-18-ga5b75d4
Redis instance is created using create_reconnecting_redis
strace
shows that reports are sent, and replied well. And no reconnections are going in-between.
Connection should be created with create_connection
call.
Make Connection.__init__
method accept reader
& writer
streams
(or set them in some other method).
The idea behind Connection.connect
was to be able to reconnect if connection is lost
but the code looks messy to me and reconnection process as a whole yet unclear.
Reconnection mechanism will be implemented in Pool
Warning
Pub/Sub mode currenty can not be used with Pool.
http://aioredis.readthedocs.org/en/v0.2.4/start.html#pub-sub-mode
Please let us know if you have plans to implement this functionality in near future.
Thanks.
get rid of _writer.drain
and make execute
return Future
Currently, RediscConnection.execute asserting live connection: https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py#L211
As a result, if the connection to server lost (server restarted, or network lost), AssertionError will be raised.
This leads to requirement to catch AssertionError if client would like to reconnect to server, and this is not correct way to catch this - more appropriate way would be to raise custom exception here and catch this specific exception in the client.
You don't need to check for self._reader_task.done()
.cancel()
does it itself and returns True if task is scheduled to cancellation, False if the task is already cancelled.
So just
self._reader_task.cancel()
self._reader_task = None
is enough.
Moreover, IIRC you can trap in the situation when task has been called and scheduled for cancellation but still had no chance to execute itself -- it's, say, scheduled for execution in next event loop iteration.
In this case task still is not done -- only scheduled to be done.
Do you have any plan to support Redis Sentinel?
Assume that there are task1, task2, task3 (Let's call that T1, T2, T3. and they use RedisPool) and self.minsize=1, self.maxsize=2 of Pool. (self.freesize = 1, self.size = 1)
While T1 acquired connection, T2 requests a connection to pool. (self.freesize = 0, self.size = 1)
# Pool.acquire() -> Pool._fill_free()
@asyncio.coroutine
def _fill_free(self):
while self.freesize < self.minsize and self.size < self.maxsize:
conn = yield from self._create_new_connection() <------------ (1) T2
yield from self._pool.put(conn)
While pending T2 at (1), T3 requests a connection to pool and suspends at (1) and T1 release a connection. (self.freesize = 1, self.size = 1)
And then, T2 resume, acquire and release a connection (self.freesize = 2, self.size = 2)
# Pool.acquire() -> Pool._fill_free()
@asyncio.coroutine
def _fill_free(self):
while self.freesize < self.minsize and self.size < self.maxsize:
conn = yield from self._create_new_connection() <------------ (1) T3
yield from self._pool.put(conn)
If T3 resumes, T3 tries to put a new connection to pool. However, T3 hangs until some task acquire connection, because it exceeds Maxsize of Pool.
>>> __import__('sys').version
>>> loop = asyncio.get_event_loop()
'3.4.3 (default, Oct 30 2015, 20:31:57) \n[GCC 5.2.0]
>>> redis = aioredis.create_redis(('localhost', 6379), db=1, encoding='utf8', loop=loop)
>>> redis = loop.run_until_complete(redis)
>>> loop.run_until_complete(redis.lrange('test', 0, -1))
['ab', 'cd'] # strings
>>> tr = redis.multi_exec()
>>> tr.lrange('test', 0, -1)
>>> loop.run_until_complete(tr.execute())[0]
[b'ab', b'cd'] # bytes!
>>> tr = redis.multi_exec()
>>> tr.lrange('test', 0, -1, encoding='utf8') # note the encoding
>>> loop.run_until_complete(tr.execute())[0]
[b'ab', b'cd'] # bytes!
If timeout is specified - it will be ignored and, as a result, - the coroutine potentially will never be done.
Example:
await redis.blpop('non existent list', 10)
What should be done:
Coroutine should be unblocked after 10 seconds if list is empty or does not exists.
What really happens:
Coroutine continues to wait for list creation or first element.
like aiomysql
and aiopg
. Some examples in bf6acf7 uses async def
style.
From Wikipedia
Gitter is an instant messaging and chat room system for developers and users of GitHub repositories.
I first encountered it af https://github.com/biosustain/potion, see how the readme on github has a gitter link, I think this project could benefit from a gitter. What do you guys think?
I don't sure that you need to use converters in low level API -- that may clash with high level one.
I'm +0 for removing conversions but the final decision if up to you, sure.
Trying to connect to non-existing host I get the following traceback:
Traceback (most recent call last):
File "/usr/local/bin/controller", line 9, in <module>
load_entry_point('job-router==2.7.0', 'console_scripts', 'controller')()
File "/usr/local/lib/python3.4/site-packages/job_router/run_controller.py", line 44, in main
loop=loop))
File "/usr/local/lib/python3.4/asyncio/base_events.py", line 316, in run_until_complete
return future.result()
File "/usr/local/lib/python3.4/asyncio/futures.py", line 275, in result
raise self._exception
File "/usr/local/lib/python3.4/asyncio/tasks.py", line 238, in _step
result = next(coro)
File "/usr/local/lib/python3.4/site-packages/aioredis/pool.py", line 26, in create_pool
yield from pool._fill_free(override_min=False)
File "/usr/local/lib/python3.4/site-packages/aioredis/pool.py", line 154, in _fill_free
conn = yield from self._create_new_connection()
File "/usr/local/lib/python3.4/site-packages/aioredis/commands/__init__.py", line 139, in create_redis
loop=loop)
File "/usr/local/lib/python3.4/site-packages/aioredis/connection.py", line 53, in create_connection
host, port, loop=loop)
File "/usr/local/lib/python3.4/asyncio/streams.py", line 63, in open_connection
lambda: protocol, host, port, **kwds)
File "/usr/local/lib/python3.4/asyncio/base_events.py", line 581, in create_connection
infos = f1.result()
File "/usr/local/lib/python3.4/asyncio/futures.py", line 275, in result
raise self._exception
File "/usr/local/lib/python3.4/concurrent/futures/thread.py", line 54, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/local/lib/python3.4/socket.py", line 533, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known
socket.gaierror: [Errno -2] Name or service not known
is not very handy, adding host/port pair to error message (like https://github.com/KeepSafe/aiohttp/blob/master/aiohttp/connector.py#L295-L299) would be useful.
Currently on redis server restart the poll is full of closed connections.
That causes assertion error on acquiring (https://github.com/aio-libs/aioredis/blob/master/aioredis/pool.py#L117)
I suggest dropping closed connections in .fill_free
coroutine.
On a highly-loaded process with single connection I've got an AssertionError from
MultiExec._do_execute`.
assert len(results) == len(waiters), (results, waiters)
# results, waiters looked like:
([('OK', 0)], [])
It seems, that when I use multi like this:
with (yield from self.tr_lock): # asyncio.Lock
tr = self.redis.multi_exec()
tr.hmset(self.data_key, *chunk_data)
tr.zadd(self.chunks_key, *items)
yield from tr.execute()
other green threads somehow can add other commands to this connection.
Is it possible for reconnecting redis?
self.redis = yield from aioredis.create_reconnecting_redis(...)
Should I always use separate redis connection for transaction-only interactions?
By the way, the whole application freezes until Ctrl+C pressed.
There are a few problems with the pub/sub docs:
asyncio.async
is depreciated in favour of asyncio.ensure_future
async_reader2
is defined by never use, i guess the last line should be using it.on one connection the following causes ERR Multi calls can not be nested
:
redis = yield from create_redis()
def task():
tr = redis.multi_exec()
tr.set('key', 'abc')
yield from tr.execute()
yield from asyncio.gather(task(), task())
_testutil.py
class RedisEncodingTest(BaseTest):
def setUp(self):
super().setUp()
self.redis = self.loop.run_until_complete(self.create_redis(
('localhost', self.redis_port), loop=self.loop, encoding='utf-8'))
def tearDown(self):
del self.redis
super().tearDown()
string_commands_test.py
class StringCommandsEncodingTest(RedisEncodingTest):
@run_until_complete
def test_set(self):
ok = yield from self.redis.set('my-key', 'value')
self.assertTrue(ok)
with self.assertRaises(TypeError):
yield from self.redis.set(None, 'value')
util.py
@asyncio.coroutine
def wait_ok(fut):
res = yield from fut
if res == b'QUEUED':
return res
return res == b'OK'
In wait_ok, fut's result is 'OK', not b'OK'.
so res == b'OK' is always False
maybe res == b'QUEUED' is always False
Asyncio-redis is a fairly complete and mature asynchronous redis implementation. I'm curious as to what sets aioredis apart from it?
Hello,
Can you please show me how to use one of SCAN command with aioredis? I understand how use with simple redis and without asyncio, but here I can't figure out how to iterate over elements.
It would be very helpful.
Thank you!
Here is my thoughts regarding cursor for those commands:
wrap response of this commands in a subclass of namedtuple providing raw cursor value
and data itself. But also implementing context manager interface allowing to iterate over it. something like:
with (yield from redis.scan(0)) as cursor_context:
while cursor_context:
raw_cursor, response = yield from cursor_context
and simple usage
cur, resp = yield from redis.scan(0)
while cur != 0:
cur, resp = yield from redis.scan(cur)
It's a descriptor and 'MULTI' is issued on get not on call.
This may lead to unexpected bugs.
in branch redis_on_streams
Would be nice to have.
Absence decreases performance on old linux kernels aiohttp/448
E.g. why to use aioredis and why asyncio-redis doesn't satisfies all requirements.
encoding
parameter (in RedisConnection
) by-passed to hiredis
parser.encoding
parameter (basically needed if working without connection level encoding)Must check pros&cons for both points;
Recently I've updated my python to 3.4.1 (debian testing switched to this version), as result every single test in aioredis causes following warning (however no warning with python3.4):
Task was destroyed but it is pending!
task: <Task pending _read_data() at /home/nick/sources/python/aioredis/aioredis/connection.py:78 wait_for=<Future pending cb=[Task._wakeup()]>>
I tried to run tests with PYTHONASYNCIODEBUG=1
and got following exception:
pending _read_data() at /home/nick/sources/python/aioredis/aioredis/connection.py:78 wait_for=<Future pending cb=[Task._wakeup()]>>
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
File "runtests.py", line 290, in <module>
runtests()
File "runtests.py", line 273, in runtests
failfast=failfast).run(tests)
File "/usr/lib/python3.4/unittest/runner.py", line 168, in run
test(result)
File "/usr/lib/python3.4/unittest/suite.py", line 87, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python3.4/unittest/suite.py", line 125, in run
test(result)
File "/usr/lib/python3.4/unittest/case.py", line 625, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python3.4/unittest/case.py", line 577, in run
testMethod()
File "tests/connection_test.py", line 35, in test_global_loop
('localhost', self.redis_port), db=0))
File "/usr/lib/python3.4/asyncio/base_events.py", line 239, in run_until_complete
self.run_forever()
File "/usr/lib/python3.4/asyncio/base_events.py", line 212, in run_forever
self._run_once()
File "/usr/lib/python3.4/asyncio/base_events.py", line 912, in _run_once
handle._run()
File "/usr/lib/python3.4/asyncio/events.py", line 96, in _run
self._callback(*self._args)
File "/usr/lib/python3.4/asyncio/tasks.py", line 298, in _wakeup
self._step(value, None)
File "/usr/lib/python3.4/asyncio/tasks.py", line 244, in _step
result = next(coro)
File "/usr/lib/python3.4/asyncio/coroutines.py", line 78, in __next__
return next(self.gen)
File "/home/nick/sources/python/aioredis/aioredis/connection.py", line 42, in create_connection
conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)
File "/home/nick/sources/python/aioredis/aioredis/connection.py", line 63, in __init__
self._reader_task = asyncio.Task(self._read_data(), loop=self._loop)
task: <Task pending _read_data() at /home/nick/sources/python/aioredis/aioredis/connection.py:78 wait_for=<Future pending cb=[Task._wakeup()]>>
make: *** [test] Error
Any idea how to mute this warning?
RedisPool
must be pool of connections and Redis
must hide from end-user what it uses inside.
It must be possible to instantiate Redis
with connection and with connection's pool
thus providing single consistent high-level interface.
Implementation status:
client kill
& monitor
)Redis 3.0 introduced new Cluster commands.
Need to implement them.
see http://redis.io/commands#cluster
for case when hiredis
is not available
This must be implemented in connection.
TODO:
execute
method that connection is not "in subscribe" mode;_read_data
method and start other reader task;asyncio.as_completed()
;result must be something like:
ok = yield from conn.subscribe('channel:1', 'channel:2')
for waiter in conn.wait_messages(): # (this is the "other reader task" that need a good name)
data = yield from waiter
# do something
As for high-level part Redis docs have a good hint: http://redis.io/topics/pubsub (Client library implementation hints)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.