leporo / tornado-redis Goto Github PK
View Code? Open in Web Editor NEWThis project forked from evilkost/brukva
Asynchronous Redis client that works within Tornado IO loop.
This project forked from evilkost/brukva
Asynchronous Redis client that works within Tornado IO loop.
Hi,
I'm writing a small application which allows users to publish/subscribe multi channels at a time and I chose Tornado-Redis to implement this feature. The scenario is that one message would be sent to multi channels, but if a user who subscribed all these channels, then he/she should get the message one time only. Right now, users keep receiving duplicate messages from the Pub/Sub handler.
[Solutions]:
I have an idea that I will override the SockJSSubscriber class to use a bloom filter to avoid duplicating messages sent over this object. If you guys have any better idea, just let me know. I would highly appreciate that.
Thanks,
-Canh
Lets start with an example:
from tornado import gen
import tornadoredis
POOL = tornadoredis.ConnectionPool(...)
@gen.coroutine
def func():
client = tornadoredis.Client(connection_pool=POOL, selected_db=0)
with client.pipeline() as pipe:
pipe.get('xxx')
# ....
yield gen.Task(pipe.execute)
yield gen.Task(client.disconnect)
Generally it's working, but the connection is destroyed unnecessarily. The next operation takes a connection from the pool and fails with an exception "Tried to read from non-existent connection".
It happens because the Client.pipeline() shares a connection with a new Pipeline object, which also is a Client. Therefore there are two clients using this same Connection object. At the end of the scope, the original Client releases a connection to the pool (via .del()). The Pipeline object also uses a Client.del() but have on information about the pool, so it calls a disconnect().
To solve this, you can simply add an empty impl of .del() to the Pipeline class and expect that creator of the Pipeline will take care about the connection.
Regards,
Tom
When the client is waiting on a blocking command, if a disconnection occurs the caller is never notified.
Sample code:
import tornado, tornadoredis
@tornado.gen.engine
def run():
r = tornadoredis.Client()
r.connect()
while True:
result = yield tornado.gen.Task(r.blpop, 'key')
print result
if __name__ == '__main__':
run()
tornado.ioloop.IOLoop.instance().start()
Run the code above, then stop redis-server. This exception will be thrown by the library.
Ideally the client should be notified about the exception so that it can handle it (reconnecting, for instance), but currently it just fails silently.
I pushed a solution to my personal tornado-redis fork that simply calls the client callback passing the exception to it: https://github.com/lessandro/tornado-redis/commit/1795b8f8064fdf4c6098ee0ba8b88fb7471a8a50
This is how i handle disconnections: https://github.com/lessandro/ircd/blob/09df12a6bce4f60ee57b90a3f8d31f52e25ee7ee/servers/redisutil.py#L48
For example, is 'LPUSH mylist a b c' (pushing multiple values to a list in a single command) supported in tornado-redis?
From https://github.com/leporo/tornado-redis/blob/master/tornadoredis/tests/server_commands.py, it seems not.
Add the serialization support using pickle, json and msgpack (http://msgpack.org/).
Issue:
Tornado server no longer responds to HTTP requests.
Backtracking:
Debugged the Tornado process with gdb. Managed to backtrace the code to a mutex deadlock in IOLoop's addCallback. It happens when a Redis command receives the response (in my case a BLPOP) and adds the callback to the ioloop. During the partial wrapping in the IOLoop's addCallback method the Tornado-Redis client becomes deleted (del is called) which disconnects the Connection object. The disconnected Connection object then tries to add another callback to IOLoop. Since addCallback has been called already, the mutex is locked. The second call to addCallback stops indefinitely at the Mutex.
I have provided the following gdb backtrack:
#5 Frame 0x17d4e50, for file /usr/local/lib/python2.7/dist-packages/tornado/ioloop.py, line 714, in add_callback (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, _...(truncated)
with self._callback_lock:
#9 Frame 0x17f73f0, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 369, in _run_callback (self=<IOStream(_write_buffer=<collections.deque at remote 0x19bee50>, _close_callback=None, _pending_callbacks=1, _read_bytes=None, _closed=True, _write_callback=None, _state=None, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lo...(truncated)
self.io_loop.add_callback(wrapper)
#13 Frame 0x16e1360, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 266, in _maybe_run_close_callback (self=<IOStream(_write_buffer=<collections.deque at remote 0x19bee50>, _close_callback=None, _pending_callbacks=1, _read_bytes=None, _closed=True, _write_callback=None, _state=None, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_loc...(truncated)
self._run_callback(cb)
#17 Frame 0x16e0810, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 257, in close (self=<IOStream(_write_buffer=<collections.deque at remote 0x19bee50>, _close_callback=None, _pending_callbacks=1, _read_bytes=None, _closed=True, _write_callback=None, _state=None, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at re...(truncated)
self._maybe_run_close_callback()
#21 (frame information optimized out)
#24 Frame 0x181ef80, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 243, in __del__ (self=<Client(_pipeline=None, selected_db=0, subscribed=set([]), _io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader...(truncated)
connection.disconnect()
#44 Frame 0x16473f0, for file /usr/local/lib/python2.7/dist-packages/tornado/ioloop.py, line 719, in add_callback (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, _...(truncated)
stack_context.wrap(callback), *args, **kwargs))
#48 Frame 0x16470b0, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 369, in _run_callback (self=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, ...(truncated)
self.io_loop.add_callback(wrapper)
#52 Frame 0x16d60a0, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 569, in _handle_write (self=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, ...(truncated)
self._run_callback(callback)
#56 Frame 0x16d5b70, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 220, in write (self=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callbac...(truncated)
self._handle_write()
#60 Frame 0x16d5e90, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py, line 124, in write (self=<Connection(info={'db': 0, 'pass': None}, _event_handler=<weakproxy at remote 0x1a546d8>, _stream=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 2...(truncated)
self._stream.write(data, callback=cb)
#65 Frame 0x1a0aa70, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 371, in start (self=<Task(runner=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f9c0>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1cf8>, gen=<generator at remote 0x1a55140>, yield_point=<...>) at remote 0x19b6bd0>, args=('*3\r\n$5\r\nBLPOP\r\n$24\r\na_redis_key\r\n$1\r\n0\r\n',), key=<object at remote 0x7fbcb309f9c0>, func=<instancemethod at remote 0x19abf50>, kwargs={'callback': <function at remote 0x1a561b8>}) at remote 0x1a50190>, runner=<...>)
self.func(*self.args, **self.kwargs)
#69 Frame 0x19e4d20, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 533, in run (self=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f9c0>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1cf8>, gen=<generator at remote 0x1a55140>, yield_point=<Task(runner=<...>, args=('*3\r\n$5\r\nBLPOP\r\n$24\r\na_redis_key\r\n$1\r\n0\r\n',), key=<object at remote 0x7fbcb309f9c0>, func=<instancemethod at remote 0x19abf50>, kwargs={'callback': <function at remote 0x1a561b8>}) at remote 0x1a50190>) at remote 0x19b6bd0>, next=None, yielded=<...>)
self.yield_point.start(self)
#73 Frame 0x1907e50, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 153, in wrapper (args=(<weakproxy at remote 0x1a546d8>, 'BLPOP', 'a_redis_key', 0), kwargs={'callback': <function at remote 0x19a1f50>}, handle_exception=<function at remote 0x19a1488>, result=<generator at remote 0x1a55140>, final_callback=<function at remote 0x19a1cf8>)
runner.run()
#80 Frame 0x16d2400, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 734, in blpop (self=<weakproxy at remote 0x1a546d8>, keys='a_redis_key', timeout=0, callback=<function at remote 0x19a1f50>, tokens=['a_redis_key', 0])
self.execute_command('BLPOP', *tokens, callback=callback)
#87 Frame 0x17aae50, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 371, in start (self=<Task(runner=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f970>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19add70>, gen=<generator at remote 0x19abd70>, yield_point=<...>) at remote 0x19b6190>, args=('a_redis_key',), key=<object at remote 0x7fbcb309f970>, func=<functools.partial at remote 0x1a54628>, kwargs={'callback': <function at remote 0x19a1f50>}) at remote 0x19b6510>, runner=<...>)
self.func(*self.args, **self.kwargs)
#91 Frame 0x195cb80, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 533, in run (self=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f970>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19add70>, gen=<generator at remote 0x19abd70>, yield_point=<Task(runner=<...>, args=('a_redis_key',), key=<object at remote 0x7fbcb309f970>, func=<functools.partial at remote 0x1a54628>, kwargs={'callback': <function at remote 0x19a1f50>}) at remote 0x19b6510>) at remote 0x19b6190>, next={u'a_redis_key': u'redis_response_data'}, yielded=<...>)
self.yield_point.start(self)
#95 Frame 0x196ef50, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 476, in set_result (self=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f970>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19add70>, gen=<generator at remote 0x19abd70>, yield_point=<Task(runner=<...>, args=('a_redis_key',), key=<object at remote 0x7fbcb309f970>, func=<functools.partial at remote 0x1a54628>, kwargs={'callback': <function at remote 0x19a1f50>}) at remote 0x19b6510>) at remote 0x19b6190>, key=<object at remote 0x7fbcb309fa40>, result={u'a_redis_key': u'redis_response_data'})
self.run()
#99 Frame 0x19ddb30, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 550, in inner (args=({u'a_redis_key': u'redis_response_data'},), kwargs={}, result={...})
self.set_result(key, result)
#104 Frame 0x192bfb0, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=({u'a_redis_key': u'redis_response_data'},), kwargs={}, ret=None, current_state=((), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), contexts=((...), <...>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
ret = fn(*args, **kwargs)
#108 Frame 0x1a18370, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 433, in execute_command (self=<weakproxy at remote 0x1a546d8>, cmd='BLPOP', args=('a_redis_key', 0), kwargs={}, result={u'a_redis_key': u'redis_response_data'}, execute_pending=True, callback=<function at remote 0x19a1500>, cmd_line=<CmdLine(cmd='BLPOP', args=('a_redis_key', 0), kwargs={}) at remote 0x19b6e50>, n_tries=1, command='*3\r\n$5\r\nBLPOP\r\n$24\r\na_redis_key\r\n$1\r\n0\r\n', listening=set([]), data='*2\r\n', resp=[u'a_redis_key', u'redis_response_data'])
callback(result)
#111 Frame 0x1a216f0, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 507, in run (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1758>, gen=<generator at remote 0x19abe60>, yield_point=None) at remote 0x19b6990>, next=[u'a_redis_key', u'redis_response_data'])
yielded = self.gen.send(next)
#115 Frame 0x18ab510, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 476, in set_result (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1758>, gen=<generator at remote 0x19abe60>, yield_point=None) at remote 0x19b6990>, key=<object at remote 0x7fbcb309fa30>, result=[u'a_redis_key', u'redis_response_data'])
self.run()
#119 Frame 0x1a9e970, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 550, in inner (args=([u'a_redis_key', u'redis_response_data'],), kwargs={}, result=[...])
self.set_result(key, result)
#124 Frame 0x196f1c0, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=([u'a_redis_key', u'redis_response_data'],), kwargs={}, ret=None, current_state=((), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), exception_handler=<function at remote 0x19ad1b8>) at remote 0x19b6c10>), contexts=((...), <...>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
ret = fn(*args, **kwargs)
#128 Frame 0x18fc450, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 490, in consume_multibulk (self=<weakproxy at remote 0x1a546d8>, length=2, cmd_line=<CmdLine(cmd='BLPOP', args=('a_redis_key', 0), kwargs={}) at remote 0x19b6e50>, callback=<function at remote 0x19ad578>, tokens=[u'a_redis_key', u'redis_response_data'], data='$62\r\n', token=u'redis_response_data')
callback(tokens)
#131 Frame 0x1914da0, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 507, in run (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1ed8>, gen=<generator at remote 0x1a550a0>, yield_point=None) at remote 0x19b65d0>, next=u'redis_response_data')
yielded = self.gen.send(next)
#135 Frame 0x1915010, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 476, in set_result (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1ed8>, gen=<generator at remote 0x1a550a0>, yield_point=None) at remote 0x19b65d0>, key=<object at remote 0x7fbcb309f9f0>, result=u'redis_response_data')
self.run()
#139 Frame 0x178f8e0, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 550, in inner (args=(u'redis_response_data',), kwargs={}, result=u'redis_response_data')
self.set_result(key, result)
#144 Frame 0x19468b0, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=(u'redis_response_data',), kwargs={}, ret=None, current_state=((), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), exception_handler=<function at remote 0x19ad1b8>) at remote 0x19b6c10>), exception_handler=<function at remote 0x19a1d70>) at remote 0x19b6c90>), contexts=((...), <...>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
ret = fn(*args, **kwargs)
#148 Frame 0x16f5950, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 445, in _consume_bulk (self=<weakproxy at remote 0x1a546d8>, tail='62', callback=<function at remote 0x19a1e60>, response=u'redis_response_data')
callback(response)
#151 Frame 0x195de10, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 507, in run (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1410>, gen=<generator at remote 0x1a55190>, yield_point=None) at remote 0x19b6dd0>, next='redis_response_data\r\n')
yielded = self.gen.send(next)
#155 Frame 0x1a06c60, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 476, in set_result (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1410>, gen=<generator at remote 0x1a55190>, yield_point=None) at remote 0x19b6dd0>, key=<object at remote 0x7fbcb309f9e0>, result='redis_response_data\r\n')
self.run()
#159 Frame 0x17dc140, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 550, in inner (args=('redis_response_data\r\n',), kwargs={}, result='redis_response_data\r\n')
self.set_result(key, result)
#164 Frame 0x19cf540, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=('redis_response_data\r\n',), kwargs={}, ret=None, current_state=((), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), exception_handler=<function at remote 0x19ad1b8>) at remote 0x19b6c10>), exception_handler=<function at remote 0x19a1d70>) at remote 0x19b6c90>), contexts=((...), <...>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
ret = fn(*args, **kwargs)
#169 Frame 0x195e080, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py, line 148, in read_callback (self=<Connection(info={'db': 0, 'pass': None}, _event_handler=<weakproxy at remote 0x1a546d8>, _stream=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b...(truncated)
callback(*args, **kwargs)
#178 Frame 0x1a10280, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=('some_response_data',), kwargs={}, ret=None, current_state=((), None), contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), exception_handler=<function at remote 0x19ad1b8>) at remote 0x19b6c10>), exception_handler=<function at remote 0x19a1d70>) at remote 0x19b6c90>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
ret = fn(*args, **kwargs)
#183 Frame 0x1a0fc30, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 341, in wrapper ()
callback(*args)
#188 Frame 0x1a108c0, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=(), kwargs={}, ret=None, current_state=((...), None), contexts=((...), None), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
ret = fn(*args, **kwargs)
#196 Frame 0x17c5ae0, for file /usr/local/lib/python2.7/dist-packages/tornado/ioloop.py, line 458, in _run_callback (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, ...(truncated)
callback()
#200 Frame 0x1648130, for file /usr/local/lib/python2.7/dist-packages/tornado/ioloop.py, line 607, in start (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, _closing...(truncated)
self._run_callback(callback)
#204 Frame 0x1647f50, for file /usr/local/lib/python2.7/dist-packages/zmq/eventloop/ioloop.py, line 160, in start (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, _c...(truncated)
super(ZMQIOLoop, self).start()
#208 Frame 0x10f4270, for file /root/server.py, line 81, in <module> ()
tornado.ioloop.IOLoop.instance().start()
Hi,
I am writing a realtime web application with SockJSSubscriber
.
When I send a unicode string that exceed ASCII's coding range, like Chinese character, there will raise UnicodeEncodeError
Exception on 'tornadoredis/pubsub.py", line 146, in on_message'.
[Solutions]
I inherit a SockJSSubscriber and overwrite the on_message to encode the msg.body
to str, and it work well.
ะัะธะฒะตั.
ะััั ะฟัะธะปะพะถะตะฝะธะต ะฝะฐ tornado, ะบะพัะพัะพะต ะธัะฟะพะปัะทัะตั tornado-redis. ะกัะพะปะบะฝัะปัั ั ัะตะผ, ััะพ ััะตะฝะธะต ะธ ะทะฐะฟะธัั ะฟัะพะธะทะฒะพะดะธััั ะฝะต ะฒ ัั ะฑะฐะทั. ะะพะถะตั ั ะบะพะณะพ-ะฝะธะฑัะดั ะตััั ะธะดะตะธ, ะฟะพัะตะผั ัะฐะบ ะฟัะพะธัั ะพะดะธั?
Is there a plan to support Python 3 ?
Python 3.3.0 (default, Oct 9 2012, 16:21:00)
[GCC 4.2.1 Compatible Apple Clang 4.1 ((tags/Apple/clang-421.11.65))] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import tornadoredis
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/akh/dev/pyvenv3/lib/python3.3/site-packages/tornadoredis/__init__.py", line 1, in <module>
from .client import Connection, Client
File "/Users/akh/dev/pyvenv3/lib/python3.3/site-packages/tornadoredis/client.py", line 345
except Exception, e:
^
SyntaxError: invalid syntax
Hello!
Consider this test case:
class SequentialPubSubTestCase(RedisTestCase):
def setUp(self):
super(SequentialPubSubTestCase, self).setUp()
self.subscriber = self._new_client()
def tearDown(self):
try:
self.subscriber.connection.disconnect()
del self.subscriber
except AttributeError:
pass
super(SequentialPubSubTestCase, self).tearDown()
@async_test
@gen.engine
def test_sequential_subscribe(self):
yield gen.Task(self.subscriber.subscribe, 'test.channel')
yield gen.Task(self.subscriber.subscribe, 'test.channel2')
self.assertTrue(bool(self.subscriber.subscribed))
When I run it - the second subcription doesn't not yield to the caller and test case hangs until timeout:
(env)MacAir:tornado-redis fz$ python -m tornado.testing tornadoredis.tests.test_pubsub.SequentialPubSubTestCase
F
======================================================================
FAIL: test_sequential_subscribe (tornadoredis.tests.test_pubsub.SequentialPubSubTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/private/var/www/different/python/centrifuge/env/lib/python2.7/site-packages/tornado/testing.py", line 118, in __call__
result = self.orig_method(*args, **kwargs)
File "tornadoredis/tests/redistest.py", line 21, in _runner
return self.wait(timeout=timeout)
File "/private/var/www/different/python/centrifuge/env/lib/python2.7/site-packages/tornado/testing.py", line 312, in wait
self.__rethrow()
File "/private/var/www/different/python/centrifuge/env/lib/python2.7/site-packages/tornado/testing.py", line 248, in __rethrow
raise_exc_info(failure)
File "/private/var/www/different/python/centrifuge/env/lib/python2.7/site-packages/tornado/testing.py", line 296, in timeout_func
timeout)
AssertionError: Async operation timed out after 5 seconds
----------------------------------------------------------------------
Ran 1 test in 5.011s
FAILED (failures=1)
[E 141025 13:00:12 testing:687] FAIL
Return Future instances instead of calling callbacks directly.
Add the support for simplified syntax:
@gen.coroutine
def some_method(self):
...
res = yield self.redis_client.get('mykey')
...
instead of
@gen.engine
def some_method(self):
...
res = yield gen.Task(self.redis_client.get, 'mykey')
...
Benchmark the performance change before merging changes to master.
SyncMset is much quicker than AsyncMset. Is that right?
I ran the demos/benchmark/app.py and found that SyncMset is much quicker than AsyncMset. Is it work right?
My logging:
INFO:tornado.access:200 GET /redis-py/mset (192.168.1.100) 83.39ms
WARNING:tornado.access:404 GET /favicon.ico (192.168.1.100) 0.41ms
INFO:tornado.access:200 GET /mset (192.168.1.100) 1736.85ms
WARNING:tornado.access:404 GET /favicon.ico (192.168.1.100) 0.40ms
I installed tornado, redis, redis-py, torando-redis today.
tornado version is 3.0.1.
redis-py version is 2.7.5.
tornado-redis version 2.4.2.
Redis server v=2.6.13 sha=00000000:0 malloc=jemalloc-3.2.0 bits=32
By the way, my linux version is "centOS 6.2(Final)" and "kernel linux 2.6.32-220.e16.i686".
Hello,
I've updated to the new version and now getting errors on:
File "/Library/Python/2.7/site-packages/tornadoredis/client.py", line 351, in format_command
e_t = self.encode(t)
File "/Library/Python/2.7/site-packages/tornadoredis/client.py", line 340, in encode
value = str(value)
UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-12: ordinal not in range(128)
There is a bug in the code:
Instead this code code:
if not isinstance(value, str):
value = str(value)
need to use this one:
if isinstance(value, str):
value = str(value)
Please, fix this
Thank you
This application never print 'subscribed'.
Here is the main application code:
import json
import os.path
import tornado.httpserver
import tornado.gen
import tornado.ioloop
import sockjs.tornado
import tornado.options
import tornadoredis
c = tornadoredis.Client()
c.connect()
pool = tornadoredis.ConnectionPool(max_connections=1, wait_for_available=True)
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render("template.html", title="PubSub + WebSocket Demo")
class NewMessageHandler(tornado.web.RequestHandler):
def post(self):
message = self.get_argument('message')
print('Publishing a message "%s"' % message)
c.publish('test_channel', message)
self.set_header('Content-Type', 'text/plain')
class RealtimeHandler(sockjs.tornado.SockJSConnection):
def __init__(self, session):
super(RealtimeHandler, self).__init__(session)
self.listen()
@tornado.gen.engine
def listen(self):
self.client = tornadoredis.Client(connection_pool=pool)
self.client.connect()
print('subscribing')
yield tornado.gen.Task(self.client.subscribe, 'invalidation')
print('subscribed')
def on_open(self, request):
print("User connected to realtime socket")
self.client.listen(self.on_message)
def on_event(self, name, *args, **kwargs):
print('on_event', name)
if name == 'invalidation':
self.client.publish('invalidation', json.dumps(kwargs['args']))
def on_message(self, msg):
print('on_message', msg)
if msg.kind == 'message':
message = [json.loads(msg.body)]
self.send(message)
elif msg.kind == 'disconnect':
self.close()
elif msg.kind == 'subscribe':
pass
def on_close(self):
print("User disconnected from realtime socket")
self.client.unsubscribe('invalidation')
self.client.disconnect()
def rel(path):
return os.path.join(os.path.dirname(os.path.abspath(__file__)), path)
application = tornado.web.Application(
[(r'/', MainHandler),
(r'/msg', NewMessageHandler),
(r"/static/(.*)", tornado.web.StaticFileHandler,
{"path": rel("static")})] +
sockjs.tornado.SockJSRouter(RealtimeHandler, '/sock').urls)
if __name__ == '__main__':
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8888)
print('Demo is runing at 0.0.0.0:8888\n'
'Quit the demo with CONTROL-C')
tornado.ioloop.IOLoop.instance().start()
template.html:
<!doctype html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title>{{ title }}</title>
<script src="http://cdn.sockjs.org/sockjs-0.3.min.js"></script>
<script>
function open_websocket(){
function show_message(message){
var el = document.createElement('div');
el.innerHTML = message;
document.body.appendChild(el);
}
var sock = new SockJS('http://127.0.0.1:8888/sock');
sock.onopen = function() {
show_message('Connected.');
};
sock.onmessage = function(e) {
show_message(event.data);
};
sock.onclose = function() {
show_message("Closed.");
};
}
</script>
</head>
<body onload="open_websocket()">
<h1>{{ title }}</h1>
<p>Enter your message and press the 'Send' button. You may open another browser window and send messages from here.</p>
<form method="POST" action="/msg" target="_hidden">
<input name="message" style="width: 500px;border: 1px solid #CCC;padding: 3px;" value="" placeholder="Enter your message here and press the 'Send' button."/>
<input type="submit" value="Post"/>
</form>
<iframe id="_hidden" style="display:none"></iframe>
<h2>Messages</h2>
<hr/>
</body>
</html>
It should be possible to pool app connections to redis.
redis.py implementation is pretty straight forward: https://github.com/andymccurdy/redis-py/blob/master/redis/connection.py#L369
When remote redis goes down, client will receive a on_disconnect event, and throw a
ConnectionError("Socket closed on remote end")
but when redis goes up, and we try to redo our previous actions, we will be blocked at
client.py 1290: yield gen.Task(self.connection.wait_until_ready)
the callback will never be called, hence we can't automatically recover from failure cases.
I fix this issue by add two lines(321,322)
318 def on_disconnect(self):
319 if self.subscribed:
320 self.subscribed = set()
321 if self.connection:
322 self.connection.disconnect()
323 raise ConnectionError("Socket closed on remote end")
When we are disconnected from remote server, we need to disconnect locally, so we won't be blocked for the previous failed unfinished request.
(by Nicolas de Bari Embriz Garcia Rojas)
I notice is that if I restart the redis server, the program that was subscribed to the redis, throws and exeption saying that there is no callback on " on_stream_close", thefore the script hangs and dosn't exit making in this case my app IDLE until I manually restart it and.
altering on_stream_close on line 78 here:ย https://github.com/leporo/tornado-redis/blob/master/tornadoredis/connection.py#L78 help me to avoid the exeption or to exit the program in case of an error, ( I just replaceย like 79 with an exit(0))ย ย but since I am not an expert in python (still learning) I prefer to ask.
This is how I am running or using tornadoredis
if name == 'main':
ย ย ย # Logging
ย ย ย logging.getLogger().setLevel(logging.DEBUG)
ย ย ย # Initialize tornado-redis and subscribe to key callback (pubsub)
ย ย ย rclient = tornadoredis.Client(redisHost,redisPort)
ย ย ย rclient.connect()
ย ย ย rclient.subscribe(redisChannel, lambda s: rclient.listen(BrokerConnection.pubsub))
ย ย ย # Initialize sockjs-tornado and start IOLoop
ย ย ย BrokerRouter = sockjs.tornado.SockJSRouter(BrokerConnection, '/wow')
ย ย ย app = tornado.web.Application(BrokerRouter.urls)
ย ย ย app.listen(port)
ย ย ย logging.info('Listening on port %d for redis key %s', port, redisChannel)
If the redis server goes down and comes back up, there is no subscription to redis, however the SockJSSubscriber still retains the list of subscribers.
These subscribers will never receive any messages as it's no longer subscribed to the redis channel.
We use the following sample code and JMeter with 100 concurrent connections (non-stop)
import tornado.ioloop
import tornado.web
import tornadoredis
import tornado.gen
pool = tornadoredis.ConnectionPool(host='192.168.68.234', port=6379, max_connections=10, wait_for_available=True)
class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
@tornado.gen.engine
def get(self):
c = tornadoredis.Client(connection_pool=pool)
foo = yield tornado.gen.Task(c.get, 'myKey')
c.disconnect()
self.finish(str(foo))
application = tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
But we got errors:
ERROR:tornado.application:Uncaught exception GET / (192.168.68.22)
HTTPRequest(protocol='http', host='192.168.68.235:8888', method='GET', uri='/', version='HTTP/1.1', remote_ip='192.168.68.22', headers={'Connection': 'close', 'Host': '192.168.68.235:8888', 'Authorization': 'Bearer andre', 'User-Agent': 'Apache-HttpClient/4.2.1 (java 1.5)'})
Traceback (most recent call last):
File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/web.py", line 1115, in _stack_context_handle_exception raise_exc_info((type, value, traceback))
File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/stack_context.py", line 302, in wrapped ret = fn(*args, **kwargs)
File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/gen.py", line 550, in inner self.set_result(key, result)
File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/gen.py", line 476, in set_result self.run()
File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/gen.py", line 507, in run yielded = self.gen.send(next)
File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornadoredis/client.py", line 323, in disconnect proxy = pool.make_proxy(client_proxy=self._weak,
ReferenceError: weakly-referenced object no longer exists
I've found bug with ConnectionProxy when all created_connection >= max_connections and wait_for_available option is True
File "env/lib/python2.7/site-packages/tornado/gen.py", line 367, in run
self.yield_point.start(self)
File "env/lib/python2.7/site-packages/tornado/gen.py", line 241, in start
self.func(*self.args, **self.kwargs)
File "env/lib/python2.7/site-packages/tornado/gen.py", line 120, in wrapper
runner.run()
File "env/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
yielded = self.gen.send(next)
File " env/lib/python2.7/site-packages/tornadoredis/client.py", line 1146, in execute
self.connection.info.get('db', None) != self.selected_db):
AttributeError: 'ConnectionProxy' object has no attribute 'info'
I can add "self.info = {'db': 0}" attribute to ConnectionProxy but I don't fully understand process of connection releasing and how wait_until_ready function works.
When I use 'subscribe' and 'psubscribe' with Python2.7,there is someting wrong.
My code look like:
def handle_message(msg):
print 'hello world'
yield gen.Task(client.subscribe, 'channel_one')
yield gen.Task(client.psubscribe, 'channel_two'+'/*')
client.listen(handle_message)
But this code can't triggers the callback function when it received message.
Then I delete the code
callback = None
in 'client._subscribe' .Then it works well.
Consistently get error on multi get (mget) using torando-redis gen/Task to retrieve keys (>100 keys) .
The code that produces this looks like this --
class FooHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self,_args, *_kwargs):
data = yield tornado.gen.Task(Foo.multi_get,keys)
self.finish()
system details:
ubuntu 11.04
tornado 2.4 , on tornado 3 as well
Traceback
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/web.py", line 1115, in _stack_context_handle_exception
raise_exc_info((type, value, traceback))
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception
if tail.exit(_exc):
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit
return self.exception_handler(type, value, traceback)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(_exc_info)
File "supportServices/services.py", line 434, in get_video_status_brightcove
data = yield tornado.gen.Task(Foo.multi_get,keys)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception
if tail.exit(_exc):
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit
return self.exception_handler(type, value, traceback)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(_exc_info)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornadoredis/client.py", line 425, in execute_command
resp = yield gen.Task(resp)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(_args, *_kwargs)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 550, in inner
self.set_result(key, result)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 476, in set_result
self.run()
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 507, in run
yielded = self.gen.send(next)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornadoredis/client.py", line 485, in consume_multibulk
token = self.process_data(data, cmd_line)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornadoredis/client.py", line 461, in process_data
return partial(self._consume_bulk, tail)
I started to experiment with tornado-redis and stumbled on some strange behaviour.
Look at this code:
from __future__ import print_function
import tornado.ioloop
import tornado.gen
import tornadoredis
import time
subscriber = tornadoredis.Client()
subscriber.connect()
publisher = tornadoredis.Client()
publisher.connect()
@tornado.gen.engine
def on_message(message):
print(message)
if message.body == 'subscribe extra channels':
try:
yield tornado.gen.Task(subscriber.subscribe, 'second')
except Exception as e:
print(e)
try:
print('yielding subscribing on third')
yield tornado.gen.Task(subscriber.subscribe, 'third')
except Exception as e:
print(e)
@tornado.gen.engine
def subscribe():
yield tornado.gen.Task(subscriber.subscribe, 'first')
subscriber.listen(on_message)
tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 2, publish)
@tornado.gen.engine
def publish():
print('publishing message...')
try:
yield tornado.gen.Task(publisher.publish, 'first', 'subscribe extra channels')
except Exception as e:
print(e)
if __name__ == '__main__':
tornado.ioloop.IOLoop.instance().add_callback(subscribe)
tornado.ioloop.IOLoop.instance().start()
In this example I am subscribing on channel 'first' Then after 2 seconds publish message into that channel. In message handler function when that message received I am trying to subscribe on 2 new channels. Subscribing on channel 'second' seems to be successful, but subscribing on channel 'third' never happens.
I investigated a little - It seems that execution of subscribing on channel 'third' stops on this line of client's execute_command
method:
yield gen.Task(self.connection.wait_until_ready)
Connection is not ready and read_callbacks have one wrapped function. And it seems to never yielded.
It'd be really helpful if a psubscribe
method was available on BaseSubscriber
, functioning similarly to subscribe
.
Could you provide an example? I would like to use the library, but can't find a way to dynamically add and remove channels I'd like my tornado-redis instance to listen to.
Thanks,
Armin
We fell into a trap when using the unsubscribe, the callback does not trigger when the subscription listen loop has been exited or is not running yet.
So if an app does:
@gen.coroutine
def cleanup(self):
yield gen.Task(self.client.unsubscribe, channels=self.subscriptions)
yield gen.Task(self.client.disconnect)
There is a good chance that in the real world disconnect is not called.
I suggest you make unsubscribe an idempotent operation and always fire the callback, even if there is no subscription or not subscription loop running.
Test cases below:
@tornado.testing.gen_test(timeout=60)
def test_rainy_day(self):
subscriptions = ["one", "two"]
self.client = tornadoredis.Client(host=options.redis_host)
self.client.connect()
result = yield gen.Task(self.client.execute_command, 'CLIENT', 'SETNAME', 'foo')
log.debug(result)
yield gen.Task(self.client.subscribe, subscriptions)
self.client.listen(self.on_redis_msg, exit_callback=self.on_redis_pub_sub_exit)
log.debug("subscribed to: %s", subscriptions)
yield gen.Task(self.client.unsubscribe, channels=subscriptions)
log.debug("unsub 1")
yield gen.Task(self.client.unsubscribe, channels=subscriptions)
log.debug("unsub 2")
result = yield gen.Task(self.client.disconnect)
@tornado.testing.gen_test(timeout=60)
def test_rainy_day_two(self):
subscriptions = ["one", "two"]
self.client = tornadoredis.Client(host=options.redis_host)
self.client.connect()
result = yield gen.Task(self.client.execute_command, 'CLIENT', 'SETNAME', 'foo')
log.debug(result)
yield gen.Task(self.client.unsubscribe, channels=subscriptions)
log.debug("unsub 2")
result = yield gen.Task(self.client.disconnect)
The following error is raised on the second subscription to multiple channels (works first time on a connection, fails on the second subscription on the same connection):
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/web.py", line 1115, in _stack_context_handle_exception
raise_exc_info((type, value, traceback))
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception
if tail.exit(_exc):
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit
return self.exception_handler(type, value, traceback)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(_exc_info)
File "/Users/adam/BUD/budlist/app/backend/handlers/realtime_updates/realtime_updates.py", line 23, in get
yield gen.Task(self.controller.subscribe_and_listen, app, user)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception
if tail.exit(_exc):
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit
return self.exception_handler(type, value, traceback)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(_exc_info)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornadoredis/client.py", line 1147, in listen
response = yield gen.Task(response)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(_args, *_kwargs)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 550, in inner
self.set_result(key, result)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 476, in set_result
self.run()
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 507, in run
yielded = self.gen.send(next)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornadoredis/client.py", line 504, in consume_multibulk
token = self.process_data(data, cmd_line)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornadoredis/client.py", line 491, in process_data
cmd_line)
ResponseError: ResponseError (on LISTEN [(), {}]): Unknown response type s
Here is some additional info (logging of "data" in process_data):
E 140815 15:50:48 realtime_updates:36] got a message: Message(kind=u'subscribe', channel=u'b', body=2, pattern=u'b')
[E 140815 15:51:08 client:468] *3
[E 140815 15:51:08 client:468] $9
[E 140815 15:51:08 client:468] $1
[E 140815 15:51:08 client:468] :2
[E 140815 15:51:08 realtime_updates:36] got a message: Message(kind=u'subscribe', channel=u'a', body=2, pattern=u'a')
[E 140815 15:51:08 client:468] *3
[E 140815 15:51:08 client:468] $9
[E 140815 15:51:08 client:468] subscribe
Hi,
I building a new application with tornado websocket and tornado-redis, a chat like application. It is very great to work with both of them. Thanks for that.
when a user connect to websocket he will subscribe to channel with his user id. so every message some one sent to him can be listen. a user may connect and disconnect websocket frequent times.
Problem is when user establish a successful socket connection, it cannot subscribe to respected channel all the time. Most of the time it works, but some times it making problem
If anyone have any idea, please help me to solve this issue. and i wonder if any one else experiencing same problem?
Thanks in advance!
Issue:
Code is similar to the Websocket pub/sub demo. The problem occures when the websocket is disconnected and close() is called:
@tornado.gen.engine
def cleanup(self):
print "-- Connection %s to client %s closing" % (self.identifier, self.client_identifier)
if self.client_pubsub.subscribed:
yield tornado.gen.Task(self.client_pubsub.unsubscribe, ["ngs:fc:chat:global", self.disconnect_channel])
self.client_pubsub.disconnect()
The yield from unsubscribing returns when the channel is supposed to be unsubscribed. The disconnect is then called. The problem occures when the pub/sub is receiving a published message in the exact moment the unsubscribe is called. The yield returns and the disconnect is called. After that the server throws an exception:
ERROR:tornado.application:Exception in callback <functools.partial object at 0x1694e10>
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 458, in _run_callback
callback()
File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
raise_exc_info(exc)
File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/tornado/iostream.py", line 341, in wrapper
callback(*args)
File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
raise_exc_info(exc)
File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py", line 148, in read_callback
callback(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
raise_exc_info(exc)
File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 550, in inner
self.set_result(key, result)
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 476, in set_result
self.run()
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(*exc_info)
File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 1087, in listen
response = yield gen.Task(response)
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 533, in run
self.yield_point.start(self)
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 371, in start
self.func(*self.args, **self.kwargs)
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 153, in wrapper
runner.run()
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(*exc_info)
File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 479, in consume_multibulk
data = yield gen.Task(self.connection.readline)
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 533, in run
self.yield_point.start(self)
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 371, in start
self.func(*self.args, **self.kwargs)
File "/usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py", line 154, in readline
raise ConnectionError('Tried to read from '
ConnectionError: Tried to read from non-existent connection
When checking the "subscribed" variable of the Client object its value is a set with objects in it for a millisecond or so after the unsubscribe is finished. The solution is the following code:
@tornado.gen.engine
def cleanup(self):
print "-- Connection %s to client %s closing" % (self.identifier, self.client_identifier)
if self.client_pubsub.subscribed:
yield tornado.gen.Task(self.client_pubsub.unsubscribe, "ngs:fc:chat:global")
yield tornado.gen.Task(self.client_pubsub.unsubscribe, self.disconnect_channel)
def check():
if self.client_pubsub.subscribed:
tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(milliseconds=1), check)
else:
self.client_pubsub.disconnect()
tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(milliseconds=1), check)
The unsubscribe task should yield back when the Client really finishes unsubscribing.
In tornado.concurrent
there is nice decorator return_future
, which allows to return Future
if callback argument is None. With this decorator it is possible to use modern api: yield f(a, b)
and yield gen.Task(f, a, b)
for backward compatibility.
I am not sure about pipelining but for simple commands works perfectly fine:
@return_future
def delete(self, *keys, **kwargs):
self.execute_command('DEL', *keys, callback=kwargs.get('callback'))
@return_future
def set(self, key, value, expire=None, pexpire=None,
only_if_not_exists=False, only_if_exists=False, callback=None):
args = []
if expire is not None:
args.extend(("EX", expire))
if pexpire is not None:
args.extend(("PX", pexpire))
if only_if_not_exists and only_if_exists:
raise ValueError("only_if_not_exists and only_if_exists "
"cannot be true simultaneously")
if only_if_not_exists:
args.append("NX")
if only_if_exists:
args.append("XX")
@return_future
def get(self, key, callback=None):
self.execute_command('GET', key, callback=callback)
and here simple test:
@async_test
engine
def test_get_set_coroutine(self):
res = yield self.client.set('foo', 'bar')
self.assertEqual(res, True)
res = yield self.client.get('foo')
self.assertEqual(res, 'bar')
res = yield self.client.delete('foo')
self.assertTrue(res)
self.stop()
What do you think about decorating all commands with return_future
?
Not sure if this is a bug, or I am just using it wrong, my apologies in advance)
I've tried to use pub/sub for wrapping long polling and my handler looks pretty much same as WebSocket example you have. But right after first message handled (after unsubscribe and disconnect called) it still tries to communicate to redis and fails since connection was closed. BTW, WebSocket example is not working either.
To reproduce the issue, the conditions are:
In this case, the second request or following requests will go into connection.wait_until_ready block and wait the connection to be ready, and then callback to the request handler until the connection is ready. Otherwise, wait_until_ready makes the callback lose StackContext, and the callback is actually invoked in the previous StackContext associate with the previous connection.read() operation. So, when a error raised, the exception handling in web.py will use the previous StackContext associated to process exception, that cause the "Cannot send error response after headers written" issue.
To avoid this issue, quoted from stack_context.py "Returns a callable object that will restore the current StackContext when executed. Use this whenever saving a callback to be executed later in a different execution context (either in a different thread or asynchronously in the same thread)", before append the callback to ready_callbacks, it should be wrapped with stack_context.wrap, such as:
def wait_until_ready(self, callback=None):
if callback:
if not self.ready():
callback = stack_context.wrap(callback) # should wrapped for restore StackContext
self.ready_callbacks.append(callback)
else:
callback()
return self
It is also for ConnectionPool.
The sample code for reproduce the issue as below:
[Server side]
import logging
import json
import tornadoredis
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen as gen
from tornado.options import define, options
rds = tornadoredis.Client(host="127.0.0.1",
port=6379,
selected_db=0)
@gen.engine
def get_data(key, callback=None):
try:
result = yield tornado.gen.Task(rds.get, key)
except Exception, e:
callback(None)
return
callback(result)
class TestHandler(tornado.web.RequestHandler):
def initialize(self, **kwargs):
super(TestHandler, self).initialize(**kwargs)
self.set_header("Content-Type", "text/json")
self.set_header("Content-Type", "application/json")
@tornado.web.asynchronous
@gen.engine
def get(self):
# make sure there is a key 0 in redis
value = yield gen.Task(get_data, 0)
raise tornado.web.HTTPError(400, "always return this error")
self.finish(json.dumps({"key":key}))
handlers = [
tornado.web.URLSpec(r"/test", TestHandler, name="Test"),
]
settings = {
"debug": True,
}
define("port", default=8888, help="run on the given port", type=int)
if name == "main":
application = tornado.web.Application(handlers, **settings)
http_server = tornado.httpserver.HTTPServer(application, xheaders=True)
http_server.listen(options.port)
logging.info('Listening on port %s' % options.port)
tornado.ioloop.IOLoop.instance().start()
[Client side]
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
def debug_case:
http_client = AsyncHTTPClient()
def handle_request(response):
# mock up concurrent requests
http_client.fetch("http://localhost:8888/test",
handle_request,
method = "GET")
if response.error:
print "Error:", response.error
else:
print response.body
IOLoop.instance().stop()
http_client.fetch("http://localhost:8888/test",
handle_request,
method = "GET")
IOLoop.instance().start()
if name == "main":
for i in range(1, 100):
debug_case()
The issue seems to be the way it parses output from info command like this:
slave0:10.176.0.235,51657,online
i think it is trying to split k,v on = in the value but they dont exist in this type of output for slaveXXX and thus it crashes with an error like 'need more than 1 value to unpack' .. possibly wrapping the split in a try/except and just using the raw value if it cannot unpack would be a solution? (around line 112 in client.py)
I guess implementation is very similar to pub/sub and listen commands
I used the follow code:
import tornadoredis
from tornadoredis.pubsub import SockJSSubscriber
subscriber = SockJSSubscriber(tornadoredis.Client())
def on_open(self, info):
subscriber.subscribe('users-status', self)
subscriber.subscribe('scores', self)
def on_close(self):
subscriber.unsubscribe('users-status', self)
subscriber.unsubscribe('scores', self)
and worked only one channel for SockJSSubscriber
subscriber.
I found that if not self.redis.subscribed:
allow to listen redis PubSub only for one channel
https://github.com/leporo/tornado-redis/blob/master/tornadoredis/pubsub.py:25-52
def subscribe(self, channel_name, subscriber, callback=None):
"""
Subscribes a given subscriber object to a redis channel.
Does nothing if subscribe calls are nested for the
same subscriber object.
The broadcast method of the subscriber object will be called
for each message received from specified channel.
Override the on_message method to change this behaviour.
"""
self.subscribers[channel_name][subscriber] += 1
self.subscriber_count[channel_name] += 1
if self.subscriber_count[channel_name] == 1:
if not self.redis.subscribed:
if callback:
callback = stack_context.wrap(callback)
def _cb(*args, **kwargs):
self.redis.listen(self.on_message)
if callback:
callback(*args, **kwargs)
cb = _cb
else:
cb = callback
self.redis.subscribe(channel_name, callback=cb)
elif callback:
callback(True)
How can I deal with two and more channel at the same time?
If I make a query to redis that needs to open a connection, and that connection blocks (eg. slow network, network partition), then the whole tornado process blocks. No requests can be handled, even requests that do not depend on redis.
Example: here is a simple tornado HTTP service that handles two URLs, /r1 and /r2. /r1 does not depend on redis at all: it immediately returns a hardcoded string. /r2 fetches a value from redis and returns it.
import tornadoredis
import tornado.httpserver
import tornado.web
import tornado.ioloop
import tornado.gen
class R1Handler(tornado.web.RequestHandler):
'''handle a request which does not depend on redis'''
@tornado.web.asynchronous
@tornado.gen.engine
def get(self):
self.write('all is well\n')
self.finish()
class R2Handler(tornado.web.RequestHandler):
'''handle a request which does depend on redis'''
@tornado.web.asynchronous
@tornado.gen.engine
def get(self):
c = tornadoredis.Client()
foo = yield tornado.gen.Task(c.get, 'foo')
self.set_header('Content-Type', 'text/plain')
self.write('foo = %s\n' % (foo,))
self.finish()
application = tornado.web.Application([
(r'/r1', R1Handler),
(r'/r2', R2Handler),
])
if __name__ == '__main__':
# Start the data initialization routine
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8888)
print 'Demo is runing at 0.0.0.0:8888\nQuit the demo with CONTROL-C'
tornado.ioloop.IOLoop.instance().start()
(I put that script in demos/mixed/app.py in the tornado-redis source tree.)
Expected outcome: if my tornado process is unable to connect to redis, then I expect queries to /r2 to fail or block, but queries to /r1 should continue to work fine.
Actual outcome: as soon as a request for /r2 blocks, requests for /r1 also block. It appears that the entire process is blocked connecting to redis.
Here's how I reproduced it. First, in one terminal run the tornado server:
$ PYTHONPATH=. python demos/mixed/app.py
Demo is runing at 0.0.0.0:8888
Quit the demo with CONTROL-C
In another window, start polling /r1:
$ while true ; do echo -n `date`": " ; curl http://localhost:8888/r1 ; sleep 1 ; done
Mon Jan 12 13:25:01 EST 2015: all is well
Mon Jan 12 13:25:02 EST 2015: all is well
Mon Jan 12 13:25:03 EST 2015: all is well
[...]
In a third window, ensure that /r2 works:
$ redis-cli
127.0.0.1:6379> set foo "hello world"
OK
$ curl http://localhost:8888/r2
foo = hello world
Now simulate a network partition: make all packets to redis disappear (this assumes Linux):
$ sudo iptables -F # wipe all existing firewall rules
$ sudo iptables -A INPUT --proto tcp --dport 6379 -j DROP # drop packets to redis
Hop over to the window that is polling /r1; it should still be working fine:
[...]
Mon Jan 12 13:27:50 EST 2015: all is well
Mon Jan 12 13:27:51 EST 2015: all is well
Mon Jan 12 13:27:52 EST 2015: all is well
[...]
Now request /r2, which blocks because redis is on the other side of a network partition:
$ curl http://localhost:8888/r2
This request blocks, which is entirely OK. (I'd like it to fail with a timeout eventually, but whatever. Not important.)
However, the /r1 poll is now blocked:
[...]
Mon Jan 12 13:28:28 EST 2015: all is well
Mon Jan 12 13:28:29 EST 2015: all is well
Mon Jan 12 13:28:30 EST 2015: [blocked here]
We can see where it's blocked by hitting Ctrl-C on the tornado process:
^CTraceback (most recent call last):
File "demos/mixed/app.py", line 40, in <module>
tornado.ioloop.IOLoop.instance().start()
File "/usr/lib/python2.7/dist-packages/tornado/ioloop.py", line 607, in start
self._run_callback(callback)
File "/usr/lib/python2.7/dist-packages/tornado/ioloop.py", line 458, in _run_callback
callback()
File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
raise_exc_info(exc)
File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(*args, **kwargs)
File "/usr/lib/python2.7/dist-packages/tornado/iostream.py", line 341, in wrapper
callback(*args)
File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
raise_exc_info(exc)
File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(*args, **kwargs)
File "/usr/lib/python2.7/dist-packages/tornado/httpserver.py", line 327, in _on_headers
self.request_callback(self._request)
File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1600, in __call__
handler._execute(transforms, *args, **kwargs)
File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1134, in _execute
self._when_complete(self.prepare(), self._execute_method)
File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1141, in _when_complete
callback()
File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1162, in _execute_method
self._when_complete(method(*self.path_args, **self.path_kwargs),
File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1311, in wrapper
return result
File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 198, in __exit__
return self.exception_handler(type, value, traceback)
File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1115, in _stack_context_handle_exception
raise_exc_info((type, value, traceback))
File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1298, in wrapper
result = method(self, *args, **kwargs)
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 159, in wrapper
deactivate()
File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 198, in __exit__
return self.exception_handler(type, value, traceback)
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(*exc_info)
File "demos/mixed/app.py", line 23, in get
foo = yield tornado.gen.Task(c.get, 'foo')
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 153, in wrapper
runner.run()
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 533, in run
self.yield_point.start(self)
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 371, in start
self.func(*self.args, **self.kwargs)
File "/data/src/tornado-redis/tornadoredis/client.py", line 690, in get
self.execute_command('GET', key, callback=callback)
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 159, in wrapper
deactivate()
File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 198, in __exit__
return self.exception_handler(type, value, traceback)
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(*exc_info)
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 153, in wrapper
runner.run()
File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 507, in run
yielded = self.gen.send(next)
File "/data/src/tornado-redis/tornadoredis/client.py", line 407, in execute_command
self.connection.connect()
File "/data/src/tornado-redis/tornadoredis/connection.py", line 72, in connect
timeout=self.timeout
File "/usr/lib/python2.7/socket.py", line 562, in create_connection
sock.connect(sa)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
KeyboardInterrupt
The problem appears to be that tornado-redis is using the blocking socket calls to connect to redis. I suspect it should probably use tornado.tcpclient instead. I am not a tornado expert though! I just spotted that module in the docs.
Hi,
I've been using tornado-redis with tornado-2.3 and have noticed that when a Client object with an open connection leaves scope and is reclaimed by the garbage collector, the file descriptor for the socket object is not closed.
I think this is caused by a reference to a connection object that is saved in a tornado ExceptionStackContext that should have been removed. The reference in the ExceptionStackContext stops the garbage collector from reclaiming the IOStream connected to redis and closing the socket.
I've attached a link to code for the test server I use to recreate the bug. I've also attached a link to a graph showing the references to the IOStream object.
To run the server you will need the objgraph module: http://mg.pov.lt/objgraph/
Server code: http://dl.dropbox.com/u/107369108/test_server.py
Reference Graph: http://dl.dropbox.com/u/107369108/tmpEQOP5_.png
Thanks!!!
It should return after execution of SELECT command:
class Pipeline(Client):
def __init__(self, transactional, *args, **kwargs):
super(Pipeline, self).__init__(*args, **kwargs)
self.transactional = transactional
self.command_stack = []
def execute_command(self, cmd, *args, **kwargs):
if cmd in ('AUTH', 'SELECT'):
# NOTE: do not append AUTH & SELECT commands to self.command_stack
return super(Pipeline, self).execute_command(cmd, *args, **kwargs)
elif cmd in PUB_SUB_COMMANDS:
raise RequestError(
'Client is not supposed to issue command %s in pipeline' % cmd)
self.command_stack.append(CmdLine(cmd, *args, **kwargs))
I have tornado app with redis connect:
class Application(tornado.web.Application):
def __init__(self):
self.client = tornadoredis.Client(selected_db=self.config.get('redis', 'db'),
password=self.config.get('redis', 'password'),)
self.client.connect()
And i have request handler, that use pipeline:
pipe = self.application.client.pipeline()
pipe.smembers('%s_unmoderated' % self.current_user)
pipe.smembers('%s_moderated' % self.current_user)
pipe.smembers('%s_templates' % self.current_user)
print pipe.command_stack
res = [a for a in (yield tornado.gen.Task(pipe.execute))]
print pipe.command_stack
First print: [SMEMBERS(('test_unmoderated',),{'callback': None}), SMEMBERS(('test_moderated',),{'callback': None}), SMEMBERS(('test_templates',),{'callback': None})]
Second print: [SELECT(('4',),{'callback': <tornado.stack_context._StackContextWrapper object at 0x1eafaf8>})]
Thus, on second function call my res
contains 4 elements instead of 3.
When tornado-redis used as a shared client, it will sometimes crash in process_data
Test code
from tornado.ioloop import IOLoop
from tornado.web import RequestHandler, Application, url, asynchronous
from tornado.gen import engine, Task
from tornadoredis import Client
rds = Client()
class RedisTestHandler(RequestHandler):
@asynchronous
@engine
def get(self):
yield Task(rds.get, 'foo')
self.finish()
if __name__ == '__main__':
from redis import Redis
Redis().set('foo', 'bar')
app = Application([
url(r'/', RedisTestHandler),
], debug=True)
app.listen(9000)
IOLoop.instance().start()
Test it with apache-bench
ab -c2 -n2 http://localhost:9000/
the crash log:
[I 2015-01-09 14:14:34,317 web:1811] 200 GET / (127.0.0.1) 6.19ms
[I 2015-01-09 14:14:34,319 web:1811] 200 GET / (127.0.0.1) 4.72ms
[I 2015-01-09 14:14:34,823 web:1811] 200 GET / (127.0.0.1) 3.03ms
[I 2015-01-09 14:14:34,825 web:1811] 200 GET / (127.0.0.1) 4.30ms
[I 2015-01-09 14:14:35,775 web:1811] 200 GET / (127.0.0.1) 2.74ms
[I 2015-01-09 14:14:35,777 web:1811] 200 GET / (127.0.0.1) 3.84ms
[I 2015-01-09 14:14:36,365 web:1811] 200 GET / (127.0.0.1) 2.38ms
[I 2015-01-09 14:14:36,367 web:1811] 200 GET / (127.0.0.1) 3.17ms
[I 2015-01-09 14:14:36,924 web:1811] 200 GET / (127.0.0.1) 3.16ms
[I 2015-01-09 14:14:36,926 web:1811] 200 GET / (127.0.0.1) 4.37ms
[I 2015-01-09 14:14:37,409 web:1811] 200 GET / (127.0.0.1) 2.62ms
[I 2015-01-09 14:14:37,412 web:1811] 200 GET / (127.0.0.1) 4.56ms
[I 2015-01-09 14:14:37,891 web:1811] 200 GET / (127.0.0.1) 2.56ms
[I 2015-01-09 14:14:37,893 web:1811] 200 GET / (127.0.0.1) 3.69ms
[E 2015-01-09 14:14:38,368 web:1407] Uncaught exception GET / (127.0.0.1)
HTTPServerRequest(protocol='http', host='localhost:9000', method='GET', uri='/', version='HTTP/1.0', remote_ip='127.0.0.1', headers={'Host': 'localhost:9000', 'Accept': '*/*', 'User-Agent': 'ApacheBench/2.3'})
Traceback (most recent call last):
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/web.py", line 1288, in _stack_context_handle_exception
raise_exc_info((type, value, traceback))
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/stack_context.py", line 314, in wrapped
ret = fn(*args, **kwargs)
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 708, in <lambda>
self.future, lambda f: self.run())
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 657, in run
self.result_future.set_exc_info(sys.exc_info())
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 167, in set_exc_info
self.set_exception(exc_info[1])
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 150, in set_exception
self._set_done()
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 177, in _set_done
cb(self)
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 108, in final_callback
if future.result() is not None:
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 109, in result
raise_exc_info(self._exc_info)
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 631, in run
yielded = self.gen.throw(*sys.exc_info())
File "redis_test.py", line 22, in get
yield Task(rds.get, 'foo')
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 628, in run
value = future.result()
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 109, in result
raise_exc_info(self._exc_info)
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/stack_context.py", line 314, in wrapped
ret = fn(*args, **kwargs)
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 708, in <lambda>
self.future, lambda f: self.run())
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 657, in run
self.result_future.set_exc_info(sys.exc_info())
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 167, in set_exc_info
self.set_exception(exc_info[1])
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 150, in set_exception
self._set_done()
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 177, in _set_done
cb(self)
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 108, in final_callback
if future.result() is not None:
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 109, in result
raise_exc_info(self._exc_info)
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 633, in run
yielded = self.gen.send(value)
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornadoredis/client.py", line 441, in execute_command
resp = self.process_data(data, cmd_line)
File "/srv/venv/dev/local/lib/python2.7/site-packages/tornadoredis/client.py", line 490, in process_data
cmd_line)
ResponseError: ResponseError (on GET [('foo',), {}]): Unknown response type b
[E 2015-01-09 14:14:38,377 web:1811] 500 GET / (127.0.0.1) 10.02ms
[I 2015-01-09 14:14:38,378 web:1811] 200 GET / (127.0.0.1) 11.67ms
Environment: tornado==4.0.2 tornado-redis==2.4.18
I think the problem is, when parsing the redis response, especially Bulk Strings, execute_command
splits the reading operation into two separate tasks, one for the response type and length (eg. "$5\r\n"), and a second task to get the rest data. But tornado's IOLoop may take task-switch between two reading tasks, under this condition the shared connection may be read twice both for the response type, and the second task may consume wrong response type.
Generally, a get
command will execute like this:
write command (W) -> get response type and length -> (RC) -> get rest response data (RD)
in concurrent condition, 2 concurrent requests for example, a connection may be accessed in this order
W1 -> W2 -> RC1 -> RC2 (crash, bad response type) ...
I'm currently trying to implement connection retrying for the client right now. I'm doing this by subclassing the Client class and adding a wrapper around Client.connect() to retry some amount of times.
It works well for me because I always try to connect manually before I attempt any commands, but there are many places where the Client checks the connection and instead of using Client.connect() it directly just uses Client.connection.connect().
https://github.com/leporo/tornado-redis/blob/master/tornadoredis/client.py#L1305-1306
if not self.connection.connected():
self.connection.connect()
The same goes for many disconnects.
https://github.com/leporo/tornado-redis/blob/master/tornadoredis/client.py#L1317-L1320
except Exception as e:
self.command_stack = []
self.connection.disconnect()
raise e
I see no reason for this, and changing all of these to use the methods on Client instead of Client.connection would make adding in reconnects and retries much easier. I will happily submit a pull request, but I just wanted to make sure that there was no reason for the current way before I did.
Thanks
Running the demos/sockjs/app.py, works fine when the chat window is loaded first but if the page is reloaded, the chat page fails to send msg.
The logs states that, the class SockJSSubscriber(BaseSubscriber) didn't get an unsubscribe message for the broadcast_channel but for the private channel.
logs:
Class SockJSSubscriber(BaseSubscriber)
on_message:
First time when started:
Message(kind=u'subscribe', channel=u'broadcast_channel', body=1, pattern=u'broadcast_channel')
Message(kind=u'subscribe', channel=u'private.10ba6', body=2, pattern=u'private.10ba6')
After refresh:
Message(kind=u'unsubscribe', channel=u'private.10ba6', body=1, pattern=u'private.10ba6')
Message(kind=u'unsubscribe', channel=u'broadcast_channel', body=0, pattern=u'broadcast_channel')
and when one more tab is opened:
Message(kind=u'subscribe', channel=u'broadcast_channel', body=1, pattern=u'broadcast_channel')
Message(kind=u'subscribe', channel=u'private.301ee', body=2, pattern=u'private.301ee')
Message(kind=u'subscribe', channel=u'private.4c759', body=3, pattern=u'private.4c759')
So after opening the second tab, chat works fine, but it should also work in single tab .
I'm trying to implement simple long-polling chat and have strange situation. When adding new message, message id will be pushed to channel. On updates handler I subscribing to that channel. But I can't execute any commands, because I got exception
RequestError (on ZRANGEBYSCORE [('messages', '22', '22'), {}]): Calling not pub/sub command during subscribed state
I'm doing like this:
@tornado.web.asynchronous
@tornado.gen.engine
def on_pubsub_message(self, message):
if message.kind == 'message':
yield tornado.gen.Task(self.client.unsubscribe, 'chat_messages')
messages = yield tornado.gen.Task(self.client.zrangebyscore, 'messages', message.body, message.body)
result = []
for m in messages:
result.append(json.loads(m[0]))
response = {
'm': result[::-1]
}
self.set_header('Content-Type', 'application/json')
self.finish(json.dumps(response))
So you can see that I'm unsubscribing and still get error. What I'm missing?
When the client is not on the server, we need to replace the websocket url by the public ip of the server, but when the websocket refuse to open.
On firebug: I got a 400 Bad Request.
With an other app using tornado-redis (https://github.com/Orange-OpenSource/Real-Time-Nodes-Statistics) I got this output: AttributeError: 'NoneType' object has no attribute 'write_message'
Thanks
See:
http://redis.io/commands/pubsub
In my use case I would use it to count the number of active connections to a channel
PUBSUB NUMSUB $channels
I am attempting to implement a simple chat client with connection pooling. Working from the demo, I have created the following ChatHandler class.
https://gist.github.com/gregory80/300e377b91ceac2a3e04
While attempting to use this class in a basic call, an attribute error is generated (See stack trace below). Its not clear why "self" here is Nonetype, or why it would generate an attribute error for write_message.
This exception manifests itself running tornado under gunicorn, and only after at least one connection has been already opened.
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/app/wsgiapp.py", line 34, in run
22:38:51 web.1 | WSGIApplication("%(prog)s [OPTIONS] APP_MODULE").run()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/app/base.py", line 131, in run
22:38:51 web.1 | Arbiter(self).run()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/arbiter.py", line 173, in run
22:38:51 web.1 | self.manage_workers()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/arbiter.py", line 460, in manage_workers
22:38:51 web.1 | self.spawn_workers()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/arbiter.py", line 512, in spawn_workers
22:38:51 web.1 | self.spawn_worker()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/arbiter.py", line 485, in spawn_worker
22:38:51 web.1 | worker.init_process()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/workers/base.py", line 104, in init_process
22:38:51 web.1 | self.run()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/workers/gtornado.py", line 96, in run
22:38:51 web.1 | self.ioloop.start()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/ioloop.py", line 271, in start
22:38:51 web.1 | self._run_callback(callback)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/ioloop.py", line 421, in _run_callback
22:38:51 web.1 | callback()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/iostream.py", line 311, in wrapper
22:38:51 web.1 | callback(*args)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/stack_context.py", line 229, in wrapped
22:38:51 web.1 | callback(*args, **kwargs)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/connection.py", line 125, in read_callback
22:38:51 web.1 | callback(*args, **kwargs)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 383, in inner
22:38:51 web.1 | self.set_result(key, result)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 315, in set_result
22:38:51 web.1 | self.run()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
22:38:51 web.1 | yielded = self.gen.send(next)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/client.py", line 447, in process_data
22:38:51 web.1 | callback(response)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 383, in inner
22:38:51 web.1 | self.set_result(key, result)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 315, in set_result
22:38:51 web.1 | self.run()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
22:38:51 web.1 | yielded = self.gen.send(next)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/client.py", line 462, in consume_multibulk
22:38:51 web.1 | callback(tokens)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 383, in inner
22:38:51 web.1 | self.set_result(key, result)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 315, in set_result
22:38:51 web.1 | self.run()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
22:38:51 web.1 | yielded = self.gen.send(next)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/client.py", line 447, in process_data
22:38:51 web.1 | callback(response)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 383, in inner
22:38:51 web.1 | self.set_result(key, result)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 315, in set_result
22:38:51 web.1 | self.run()
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
22:38:51 web.1 | yielded = self.gen.send(next)
22:38:51 web.1 | File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/client.py", line 1014, in listen
22:38:51 web.1 | callback(result)
22:38:51 web.1 | File "PATH/app/webapp.py", line 86, in pubsub_message
22:38:51 web.1 | traceback.print_stack()
Hi,
I'm in early stages of using tornado/redis/sockjs so maybe the error is mine or placed on the incorrect place, sorry.
I implemented the example on https://github.com/leporo/tornado-redis/blob/master/demos/sockjs/app.py
I always get next traceback when I disconnect, I added function calls from classes MessageHandler and SockJSSubscriber.
MessageHandler:__init__()
SockJSSubscriber:on_message()
MessageHandler:on_close
SockJSSubscriber:unsubscribe()
ERROR:tornado.application:Uncaught exception POST /subscribe/782/5es5waak/xhr_streaming (127.0.0.1)
HTTPRequest(protocol='http', host='127.0.0.1:8889', method='POST', uri='/subscribe/782/5es5waak/xhr_streaming', version='HTTP/1.0', remote_ip='127.0.0.1', headers={'Origin': 'http://testtornado.oonair.net', 'Content-Length': '0', 'Accept-Language': 'en-US,en;q=0.8,es;q=0.6,ca;q=0.4', 'Accept-Encoding': 'gzip,deflate,sdch', 'X-Forwarded-Host': 'testtornado.oonair.net', 'Host': '127.0.0.1:8889', 'Accept': '*/*', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1678.0 Safari/537.36', 'Connection': 'close', 'Referer': 'http://testtornado.oonair.net/index.html', 'X-Real-Ip': '192.168.0.135', 'Cookie': '__utma=65992624.1844703960.1382547828.1382547828.1382547828.1; __utmc=65992624; __utmz=65992624.1382547828.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); JSESSIONID=dummy'})
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/tornado/web.py", line 1115, in _stack_context_handle_exception
raise_exc_info((type, value, traceback))
File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 550, in inner
self.set_result(key, result)
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 476, in set_result
self.run()
File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 507, in run
yielded = self.gen.send(next)
File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 1106, in listen
self.on_unsubscribed([result.channel])
File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 1013, in on_unsubscribed
self.subscribed.remove(channel)
KeyError: u'test_channel'
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.