Giter VIP home page Giter VIP logo

crochet's Introduction

Crochet: Use Twisted anywhere!

Crochet is an MIT-licensed library that makes it easier to use Twisted from regular blocking code. Some use cases include:

  • Easily use Twisted from a blocking framework like Django or Flask.
  • Write a library that provides a blocking API, but uses Twisted for its implementation.
  • Port blocking code to Twisted more easily, by keeping a backwards compatibility layer.
  • Allow normal Twisted programs that use threads to interact with Twisted more cleanly from their threaded parts. For example, this can be useful when using Twisted as a WSGI container.

Crochet is maintained by Itamar Turner-Trauring.

Note: Crochet development is pretty slow these days because mostly it Just Works. PyPI shows about 30,000 downloads a month, so existing users seem happy: https://pypistats.org/packages/crochet

You can install Crochet by running:

$ pip install crochet

Downloads are available on PyPI.

Documentation can be found on Read The Docs.

Bugs and feature requests should be filed at the project Github page.

API and features

Crochet supports Python 3.8, 3.9, 3.10, and 3.11 as well as PyPy3.

Crochet provides the following basic APIs:

  • Allow blocking code to call into Twisted and block until results are available or a timeout is hit, using the crochet.wait_for decorator.
  • A lower-level API (crochet.run_in_reactor) allows blocking code to run code "in the background" in the Twisted thread, with the ability to repeatedly check if it's done.

Crochet will do the following on your behalf in order to enable these APIs:

  • Transparently start Twisted's reactor in a thread it manages.
  • Shut down the reactor automatically when the process' main thread finishes.
  • Hook up Twisted's log system to the Python standard library logging framework. Unlike Twisted's built-in logging bridge, this includes support for blocking Handler instances.

crochet's People

Contributors

andremiras avatar cscutcher avatar graingert avatar itamarst avatar jamwonro avatar jeremycline avatar kouk avatar mdavis-ciena avatar pythonspeed avatar rmax avatar struys avatar tirkarthi avatar tomprince avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

crochet's Issues

twisted initialization at import times breaks when used in daemons

Hello,

We are trying out crochet with a daemonized process. We ran into this issue:

Traceback (most recent call last):
  File "something.py", line 33, in run
    crochet.setup()
  File ".../lib/python2.6/site-packages/crochet/_util.py", line 15, in synced
    return method(self, *args, **kwargs)
  File ".../lib/python2.6/site-packages/crochet/_eventloop.py", line 341, in setup
    self._reactor.callFromThread(self._startReapingProcesses)
  File ".../lib/python2.6/site-packages/twisted/internet/base.py", line 929, in callFromThread
    self.wakeUp()
  File ".../lib/python2.6/site-packages/twisted/internet/base.py", line 521, in wakeUp
    self.waker.wakeUp()
  File ".../lib/python2.6/site-packages/twisted/internet/posixbase.py", line 181, in wakeUp
    util.untilConcludes(os.write, self.o, b'x')
  File ".../lib/python2.6/site-packages/twisted/python/_utilpy3.py", line 97, in untilConcludes
    return f(*a, **kw)
OSError: [Errno 9] Bad file descriptor

I believe this is due to crochet.__init__ importing twisted.reactor which causes it to initialize the reactor at this point. Import usually happens before a process will daemonize. After the process daemonizes the fd is closed, which causes this issue.

We were able to work around it by importing crochet after daemonize, but this is maybe not ideal. A possible solution would be to allow crochet to be imported without having it initialize the twisted reactor.

We are using twisted 12.3.0 and crochet 1.0.0.

ResultRegistry has its stop method called before shutdown

That is,

        self._reactor.addSystemEventTrigger(
            "before", "shutdown", self._registry.stop)

I'm running into an issue now where I'm trying to process HTTP requests during reactor shutdown (a deferred was returned from a 'before shutdown' callback that fires when all HTTP requests are done), and some of the HTTP request processing uses crochet eventual results. Because the registry is stopped before shutdown, some of the HTTP requests die with a ReactorStopped error during processing.

Decorator providing timeouts

The idiom described in the documentation for timing out and then cancelling the operation could be easily implemented as a decorator.

Further, this should be the default idiom, rather than wait_for_reactor, which should be de-emphasized or possibly even deprecated.

don't import twisted.internet.process on non-POSIX platforms

As long as this issue is still present in Twisted, Crochet should not import the twisted.internet.process module on non-POSIX platforms (like Windows) as it leads to the following error on import:

...
  File "c:\users\konstantinos\documents\github\twisted\twisted\internet\process.py", line 103, in <module>
    detectLinuxBrokenPipeBehavior()
  File "c:\users\konstantinos\documents\github\twisted\twisted\internet\process.py", line 93, in detectLinuxBrokenPipeBehavior
    reads, writes, exes = select.select([w], [], [], 0)
select.error: (10038, 'An operation was attempted on something that is not a socket')

This problem was introduced in this commit as far as I can tell

Race condition in EventualResult._result

from threading import (
    active_count,
    Condition,
    Thread,
    )

from crochet import EventualResult


# Prepare an already-fired EventualResult.
result = EventualResult(None)
result._set_result('foo')

# This is used to synchronise the start.
start = Condition()


def get_result():
    # This is so that the test in the loop below isn't false the first
    # time round; it's not necessary to demonstrating the race.
    with start:
        start.wait()
    # Keep going until there's only this thread and the main thread
    # remaining; after that there's no chance of a race.
    while active_count() > 2:
        # There's a brief window in _result when the result queue is
        # empty, causing this to raise TimeoutError even though there is
        # a result.
        result._result(0.0)


# Create X threads, where X >= 2.
for _ in range(5):
    Thread(target=get_result).start()

# Set them off. You should see (X - 1) of the threads exiting with a
# TimeoutError, illustrating a race in EventualResult._result.
with start:
    start.notify_all()

Registry stop trigger is after threadpool shutdown, resulting in shutdown hang

Registry stop trigger is after threadpool shutdown, resulting in shutdown hang if an "in flight" _reader method is pending when main thread exits and exit hooks fire.

Here is a summary of what I see:

Thread 1: exithook, trying to shutdown TNG
Thread 2: waiting on crochet _reader
Thread 4: waiting on crochet _reader
Thread 3: TNG shutdown (twisted thread) - waiting on threadpool.stop(), waiting on join of threadpool members

In may case I have a subprocess in defunct state (at least one of the crochet _reader blocked threads is hung waiting on the output of this) and with the twisted thread blocked by the threadpool join it can never finish.

More data points:

  • In twisted.internet.base.py line 959, it registers a threadPool stop during shutdown which joins all the known threadpool threads.
  • The crochet hook shutdown is after shutdown.
  • Therefore the threadPool stop hangs waiting on crochet to deliver a result which doesn't seem to be coming soon.

In this case I think it is it waiting on a sub-process which is currently in a defunct state and the signal has not been delivered (and won't be since the twisted thread is hung waiting on the threadpool shutdown to happen).

So the primary issue I see is that Threadpool shutdown in the during phase seems kind of risky with the crochet stop/unblock running in the after phase. Seems like regardless we want the crochet unblock to happen prior to the attempted threadpool shutdown (or there will be deadlocks).

This arrangement fixes the issue for me:

reactor.addSystemEventTrigger('during', 'shutdown', crochet._main._registry.stop)
if reactor.threadpoolShutdownID:
    # If the threadpool shutdown trigger was already registered, we
    # need to move this to end of during triggers by removing and
    # re-adding since triggers run FIFO order.
    # If it is not at the end, it may hang waiting crochet blocked
    # threads.
    reactor.removeSystemEventTrigger(reactor.threadpoolShutdownID)
    reactor.threadpoolShutdownID = None
    reactor.threadpoolShutdownID = reactor.addSystemEventTrigger(
      'during',
      'shutdown',
      reactor._stopThreadPool)

Cannot pip install Crochet if Twisted is not already installed

Crochet lists Twisted>=11.1 in its setup.py, but importing crochet in that file means that Twisted is already required for pip to use setup.py in the first place. pip install crochet gives me the following traceback:

Downloading/unpacking crochet
  Downloading crochet-0.9.0.tar.gz
  Running setup.py egg_info for package crochet
    Traceback (most recent call last):
      File "<string>", line 16, in <module>
      File "/home/pauweave/.virtualenvs/temp/build/crochet/setup.py", line 14, in <module>
        import crochet
      File "/home/pauweave/.virtualenvs/temp/build/crochet/crochet/__init__.py", line 9, in <module>
        from twisted.internet import reactor
    ImportError: No module named twisted.internet
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):

  File "<string>", line 16, in <module>

  File "/home/pauweave/.virtualenvs/temp/build/crochet/setup.py", line 14, in <module>

    import crochet

  File "/home/pauweave/.virtualenvs/temp/build/crochet/crochet/__init__.py", line 9, in <module>

    from twisted.internet import reactor

ImportError: No module named twisted.internet

Import lock bug

Given sample.py:

from crochet import setup, wait_for_reactor
setup()

from twisted.names.client import lookupAddress

answers, _, _ = wait_for_reactor(lookupAddress)('google.com')

print [
    record.payload.dottedQuad()
    for record in answers
    ]

Running python sample.py finished instantly, but python -c 'import sample fails.

I suspect this might have something to do with the import lock.

Functions registered with @wait_for can trigger attribute error

I have this demo:

http://0bin.net/paste/xgIgxIo4NCtE8eQc#uCoxEeBHoyiNtPWg1NC313ccmhklegpL7N8Wgi7NMVI

When I run it, I get :

INFO:werkzeug: * Running on http://127.0.0.1:8000/
INFO:twisted:Starting factory <autobahn.twisted.websocket.WampWebSocketClientFactory instance at 0x7f968917ab48>
INFO:twisted:Stopping factory <autobahn.twisted.websocket.WampWebSocketClientFactory instance at 0x7f968917ab48>
ERROR:__main__:Exception on / [GET]
Traceback (most recent call last):
  File "/home/sam/.local/share/virtualenvs/test/local/lib/python2.7/site-packages/flask/app.py", line 1817, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/sam/.local/share/virtualenvs/test/local/lib/python2.7/site-packages/flask/app.py", line 1477, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/sam/.local/share/virtualenvs/test/local/lib/python2.7/site-packages/flask/app.py", line 1381, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/sam/.local/share/virtualenvs/test/local/lib/python2.7/site-packages/flask/app.py", line 1475, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/sam/.local/share/virtualenvs/test/local/lib/python2.7/site-packages/flask/app.py", line 1461, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "server.py", line 49, in index
    publish('com.example.on_visit', app._visits, msg = "hello from flask")
  File "/home/sam/.local/share/virtualenvs/test/local/lib/python2.7/site-packages/crochet/_eventloop.py", line 461, in wrapper
    return eventual_result.wait(timeout)
  File "/home/sam/.local/share/virtualenvs/test/local/lib/python2.7/site-packages/crochet/_eventloop.py", line 231, in wait
    result.raiseException()
  File "<string>", line 2, in raiseException
AttributeError: 'NoneType' object has no attribute 'publish'

But publish is a function attached to nothing but the current module object, so I don't understand this error.

To run the code :

pip install crossbar, flask, crochet
crossbar init
crossbar run

Then run in another terminal :

python server.py

Deadlock due to __del__() and ThreadLogObserver queue

Crochet deadlock can happen when overridden __del__() method logs while current thread already holds the ThreadLogObserver queue lock.

Analysis: __del__() methods can be called from thread at any time due to garbage collection timing. If the del method attempts to acquire a non-recursive lock that the current thread already has locked, deadlock will occur. In Twisted classes like Deferred override __del__() and can log from the __del__() method.

Key part of Backtrace is as follows:

File: '/home/rmore/work/rmorehea-rcdn-testtng/virtualenv/local/lib/python2.7/site-packages/twisted/python/threadable.py', line 53, in sync
  return function(self, *args, **kwargs)
File: '/home/rmore/work/rmorehea-rcdn-testtng/virtualenv/local/lib/python2.7/site-packages/twisted/python/log.py', line 191, in msg
  self.observers[i](actualEventDict)
File: '/home/rmore/work/rmorehea-rcdn-testtng/virtualenv/local/lib/python2.7/site-packages/crochet/_eventloop.py', line 288, in __call__
  self._queue.put(msg)
File: '/usr/lib/python2.7/Queue.py', line 138, in put
  self.not_empty.notify()
File: '/usr/lib/python2.7/threading.py', line 392, in notify
  for waiter in waiters:
File: '/home/rmore/work/rmorehea-rcdn-testtng/virtualenv/local/lib/python2.7/site-packages/twisted/internet/defer.py', line 683, in __del__
  log.msg("Unhandled error in Deferred:", isError=True)
File: '/home/rmore/work/rmorehea-rcdn-testtng/virtualenv/local/lib/python2.7/site-packages/twisted/python/threadable.py', line 53, in sync
  return function(self, *args, **kwargs)
File: '/home/rmore/work/rmorehea-rcdn-testtng/virtualenv/local/lib/python2.7/site-packages/twisted/python/log.py', line 191, in msg
  self.observers[i](actualEventDict)
File: '/home/rmore/work/rmorehea-rcdn-testtng/virtualenv/local/lib/python2.7/site-packages/crochet/_eventloop.py', line 288, in __call__
  self._queue.put(msg)
File: '/usr/lib/python2.7/Queue.py', line 118, in put
  self.not_full.acquire()

Some way of enforcing that a function is called in the reactor

@crochet.run_in_reactor lets you easily call into the reactor from another thread. You may, however, have a method that is meant to be async twisted code. It would be nice if you could decorate it in such a way that it raises an exception if you call it in another thread. e.g.

@crochet.run_in_reactor
def meth_to_call_from_threads(self)
    d = self.async_code()
    d.addCallback(....)

@crochet.ensure_in_reactor
def async_code(self)
    # Please raise an error if I am called from outside the reactor.
    ....

This would catch a case where, e.g. you forget to add the @run_in_reactor decorator to the first method.

@wait_for couldn't be applied above @classmethod

modified crochet's example to show the case:

#!/usr/bin/python
"""
Do a DNS lookup using Twisted's APIs.
"""
from __future__ import print_function

# The Twisted code we'll be using:
from twisted.names import client

from crochet import setup, wait_for
setup()


# Crochet layer, wrapping Twisted's DNS library in a blocking call.
class C(object):
    @wait_for(timeout=5.0)
    @classmethod
    def gethostbyname(cls, name):
        """Lookup the IP of a given hostname.

        Unlike socket.gethostbyname() which can take an arbitrary amount of time
        to finish, this function will raise crochet.TimeoutError if more than 5
        seconds elapse without an answer being received.
        """
        d = client.lookupAddress(name)
        d.addCallback(lambda result: result[0][0].payload.dottedQuad())
        return d


if __name__ == '__main__':
    # Application code using the public API - notice it works in a normal
    # blocking manner, with no event loop visible:
    import sys
    name = sys.argv[1]
    ip = C().gethostbyname(name)
    print(name, "->", ip)

Error on run with python 2.7:

Traceback (most recent call last):
  File "/tmp/test.py", line 15, in <module>
    class C(object):
  File "/tmp/test.py", line 17, in C
    @classmethod
  File "/usr/local/lib/python2.7/dist-packages/crochet-1.2.0-py2.7.egg/crochet/_eventloop.py", line 447, in decorator
    @wraps(function)
  File "/usr/lib/python2.7/functools.py", line 33, in update_wrapper
    setattr(wrapper, attr, getattr(wrapped, attr))
AttributeError: 'classmethod' object has no attribute '__module__'

Error on run with pypy:

Traceback (most recent call last):
  File "app_main.py", line 72, in run_toplevel
  File "/tmp/test.py", line 15, in <module>
    class C(object):
  File "/tmp/test.py", line 17, in C
    @classmethod
  File "/usr/local/lib/pypy2.7/dist-packages/crochet/_eventloop.py", line 447, in decorator
    @wraps(function)
  File "/usr/lib/pypy/lib_pypy/_functools.py", line 22, in __call__
    return self.func(*(self.args + fargs), **fkeywords)
  File "/usr/lib/pypy/lib-python/2.7/functools.py", line 33, in update_wrapper
    setattr(wrapper, attr, getattr(wrapped, attr))
AttributeError: 'classmethod' object has no attribute '__module__'

wait_for and run_in_reactor swallow exceptions

Consider the following code:

from __future__ import absolute_import, division, print_function

from crochet import no_setup, wait_for, run_in_reactor

no_setup()

from twisted.trial import unittest as unittest_twisted
import unittest as unittest_stdlib


class C(object):
    @wait_for(1)
    def wait_for(self):
        raise Exception

    @run_in_reactor
    def run_in_reactor(self):
        raise Exception

    def vanilla(self):
        raise Exception


class TestStdLib(unittest_stdlib.TestCase):
    def test_wait_for(self):
        C().wait_for()

    def test_run_in_reactor(self):
        C().run_in_reactor()

    def test_vanilla(self):
        C().vanilla()


class TestTwistedDefault(unittest_twisted.TestCase):
    def test_wait_for(self):
        C().wait_for()

    def test_run_in_reactor(self):
        C().run_in_reactor()

    def test_vanilla(self):
        C().vanilla()


class TestTwistedSync(unittest_twisted.SynchronousTestCase):
    def test_wait_for(self):
        C().wait_for()

    def test_run_in_reactor(self):
        C().run_in_reactor()

    def test_vanilla(self):
        C().vanilla()

The output of running it with trial on Python 2.7.8 (OS X) is as following:

test_t
  TestStdLib
    test_run_in_reactor ...                                                [OK]
    test_vanilla ...                                                    [ERROR]
    test_wait_for ...                                                   [ERROR]
  TestTwistedDefault
    test_run_in_reactor ...                                             [ERROR]
                                            [ERROR]
    test_vanilla ...                                                    [ERROR]
    test_wait_for ...                                                   [ERROR]
                                                  [ERROR]
  TestTwistedSync
    test_run_in_reactor ...                                                [OK]
    test_vanilla ...                                                    [ERROR]
    test_wait_for ...                                                   [ERROR]

===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/Users/hynek/.pyenv/versions/2.7.8/lib/python2.7/unittest/case.py", line 329, in run
    testMethod()
  File "/Users/hynek/Projects/ssce/test_t.py", line 32, in test_vanilla
    C().vanilla()
  File "/Users/hynek/Projects/ssce/test_t.py", line 21, in vanilla
    raise Exception
exceptions.Exception: 

test_t.TestStdLib.test_vanilla
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/Users/hynek/.pyenv/versions/2.7.8/lib/python2.7/unittest/case.py", line 329, in run
    testMethod()
  File "/Users/hynek/Projects/ssce/test_t.py", line 26, in test_wait_for
    C().wait_for()
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 461, in wrapper
    return eventual_result.wait(timeout)
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 229, in wait
    result = self._result(timeout)
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 193, in _result
    raise TimeoutError()
crochet._eventloop.TimeoutError: 

test_t.TestStdLib.test_wait_for
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/twisted/internet/defer.py", line 140, in maybeDeferred
    result = f(*args, **kw)
  File "/Users/hynek/Projects/ssce/test_t.py", line 18, in run_in_reactor
    raise Exception
exceptions.Exception: 

test_t.TestTwistedDefault.test_run_in_reactor
test_t.TestTwistedDefault.test_run_in_reactor
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/Users/hynek/Projects/ssce/test_t.py", line 43, in test_vanilla
    C().vanilla()
  File "/Users/hynek/Projects/ssce/test_t.py", line 21, in vanilla
    raise Exception
exceptions.Exception: 

test_t.TestTwistedDefault.test_vanilla
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/Users/hynek/Projects/ssce/test_t.py", line 37, in test_wait_for
    C().wait_for()
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 461, in wrapper
    return eventual_result.wait(timeout)
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 229, in wait
    result = self._result(timeout)
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 193, in _result
    raise TimeoutError()
crochet._eventloop.TimeoutError: 

test_t.TestTwistedDefault.test_wait_for
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/twisted/internet/defer.py", line 140, in maybeDeferred
    result = f(*args, **kw)
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 458, in run
    return function(*args, **kwargs)
  File "/Users/hynek/Projects/ssce/test_t.py", line 14, in wait_for
    raise Exception
exceptions.Exception: 

test_t.TestTwistedDefault.test_wait_for
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/Users/hynek/Projects/ssce/test_t.py", line 54, in test_vanilla
    C().vanilla()
  File "/Users/hynek/Projects/ssce/test_t.py", line 21, in vanilla
    raise Exception
exceptions.Exception: 

test_t.TestTwistedSync.test_vanilla
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/Users/hynek/Projects/ssce/test_t.py", line 48, in test_wait_for
    C().wait_for()
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 461, in wrapper
    return eventual_result.wait(timeout)
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 229, in wait
    result = self._result(timeout)
  File "/Users/hynek/.virtualenvs/sscce/lib/python2.7/site-packages/crochet/_eventloop.py", line 193, in _result
    raise TimeoutError()
crochet._eventloop.TimeoutError: 

test_t.TestTwistedSync.test_wait_for
-------------------------------------------------------------------------------
Ran 9 tests in 3.020s

FAILED (errors=9, successes=2)

The output is the same on pypy.

I hope this is helpful.

Import lock exception thrown in multi threaded applications

Discussion from #twisted irc

11:36:26 AM struys: itamarst1: i'm running into the exception introduced with this change #34
11:36:46 AM struys: i have multiple threads running not just an application thread and a reactor thread
11:37:23 AM struys: i think an import is happening in another thread causing the imp.lock_held() call to return True
11:38:10 AM itamarst1: facepalms
11:38:15 AM itamarst1: file a ticket 
11:38:24 AM struys: that change was introduced to stop people from deadlocking at import time in the same thread
11:38:33 AM itamarst1: yeah, that's a bug
11:39:08 AM itamarst1: file an issue, I'll try to put out new release soon
11:39:28 AM struys: cool
11:39:34 AM itamarst1: (just as soon as I figure out a fix)
11:39:36 AM struys: i'd like to help fixing it but i'm not sure what to do
11:40:00 AM struys: i'll file the ticket and see where it goes
11:40:03 AM itamarst1: either I remove this feature and just document "don't do that", or I'd need a way to figure a way to show this thread is holding import lock

Non-main thread blocked by Crochet call is never unblocked after reactor shutdown.

When I have a blocking call into Crochet from a non-main thread and the twisted reactor shuts down, that call never exits.

This is also a problem in that control-c only unblocks queue.get() on the main thread, so control-c won't unblock other blocked threads.

In my case I worked around by overriding the queue.get method and monkey patching things in:

import crochet._eventloop
from crochet._eventloop import EventualResult
try:
    from queue import Empty
except ImportError:
    from Queue import Empty



class EventualResultAutoUnblock(EventualResult):
    def __init__(self, *args, **kwargs):
        super(EventualResultAutoUnblock, self).__init__(*args, **kwargs)
        orig_get = self._queue.get
        def queue_get_reactor_aware(block=True, timeout=None):
            if not block:
                return self._queue.get(block=block, timeout=timeout)
            start_time = time.time()
            while self._reactor.running:
                if time.time() >= (start_time + timeout):
                    raise TimeoutError()
                try:
                    # checking once per second for reactor shutdown
                    # should be good enough
                    return orig_get(timeout=1)
                except Empty:
                    pass
            msg = 'Twisted reactor shutdown from under blocked thread'
            log.debug(msg)
            raise GeneratorExit(msg)
        # monkey patch
        self._queue.get = queue_get_reactor_aware

crochet._eventloop.EventualResult = EventualResultAutoUnblock

crochet.setup() interacts badly with fork()-without-exec()

crochet.setup() appears not to set up the reactor in certain cases. Consider the following code:

# tasks.py
...
import os
# set up celery...
from celery import group
from twisted.internet import reactor, ProcessProtocol
from crochet import setup, wait_for
setup()
...

@app.task('tasks.enqueue')
def enqueue(script_name, command):
    try:
        proc = ProcessProtocol()
        proc.deferred.addCallBack(lamda x: x)
        env = os.environ.copy()
        prints "About to run async process"
        run_async_process(proc, script_name, command, env)
        return proc.exitcode
    except Exception as e:
        print e


@wait_for(timeout=100.0)
def run_async_process(proc, script_name, command, env):
    pid = reactor.spawnProcess(proc, script_name, command, env)
    print "Process Spawned with PID {}".format(pid)
    # return the deferred when computation ends
    return proc.deferred

@app.task(name='tasks.fails')
def fails(script_name, command):
    group(enqueue.si(script_name, command))()

# test.py
from tasks import enqueue, fails

print "Prints expected results"
enqueue('/usr/bin/echo', ['/usr/bin/echo', 'hello'])

print "Prints nothing"
fails('/usr/bin/echo', ['/usr/bin/echo', 'hello'])

The first task returns what is excepted. The second task hangs just after printing "About to run async process," and the body of run_async_process() never executes. In the second case, enqueue() is wrapped by celery.group(). The issue was solved by writing the code like so:

# tasks.py
...
import os
# set up celery...
from celery import group
from twisted.internet import reactor, ProcessProtocol
from crochet import setup, wait_for
...

@app.task('tasks.enqueue')
def enqueue(script_name, command):
    setup()
    try:
        proc = ProcessProtocol()
        proc.deferred.addCallBack(lamda x: x)
        env  = os.environ.copy()
        prints "About to run async process"
        run_async_process(proc, script_name, command, env)
        return proc.exitcode
    except Exception as e:
        print e

After discussing the issue with itamarst and implementing the fix, the issue is believed to be caused when forked Celery processes failing to start the reactor loop. By moving the setup() call into the parent of the function that calls the reactor API, and ONLY calling setup() there, the problem is circumvented.

Be aware that this may have broader implications for code that forks processes and utilizes crochet.setup().

Don't drop errors on the floor if user never gets it out of wait()

If you have a DeferredResult with an error, and wait() is never called, the error is never logged or seen anywhere. This can make debugging difficult. Another case is where wait() was called, but there wasn't a result yet, and then wait() is never called again.

control-c tests failure on windows

The two test_control_c_is_possible tests fail on Windows 8 with Python 2.7:

...
===============================================================================
[FAIL]
Traceback (most recent call last):
  File "C:\Users\Konstantinos\Documents\GitHub\crochet\crochet\tests\test_api.py", line 330, in test_control_c_is_possible
    self.assertEqual(process.wait(), 23)
  File "C:\Users\Konstantinos\PyEnv\pyinstall\lib\site-packages\twisted\trial\_synctest.py", line 356, in assertEqual
    % (msg, pformat(first), pformat(second)))
twisted.trial.unittest.FailTest: not equal:
a = 2
b = 23

crochet.tests.test_api.EventualResultTests.test_control_c_is_possible
===============================================================================
[FAIL]
Traceback (most recent call last):
  File "C:\Users\Konstantinos\Documents\GitHub\crochet\crochet\tests\test_api.py", line 753, in test_control_c_is_possible
    self.assertEqual(process.wait(), 23)
  File "C:\Users\Konstantinos\PyEnv\pyinstall\lib\site-packages\twisted\trial\_synctest.py", line 356, in assertEqual
    % (msg, pformat(first), pformat(second)))
twisted.trial.unittest.FailTest: not equal:
a = 2
b = 23


crochet.tests.test_api.WaitForReactorTests.test_control_c_is_possible
-------------------------------------------------------------------------------
Ran 81 tests in 11.607s

FAILED (skips=3, failures=2, successes=76)

Support for multiple processes sharing single Twisted instance

Many web apps use forking WSGI containers, which means normal Crochet operation starts one reactor for each. This makes it hard to e.g. have shared server that listens to incoming requests or some such, since you can't have all processes listening on same port. It would be nice if Crochet had a mode where it started a single Twisted reactor and it was shared between all these processes.

Side-effects on import of Crochet

Importing Crochet creates a ResultRegistry, which adds a reactor shutdown event. This should really be done in setup() or no_setup().

Dynamic Twisted Thread Pool

If you defer a number of things to thread each of which call EventualResult.wait() it is possible to fill the threadpool with blocked threads, causing hangs or timeouts.

It'd be great if EventualResult flagged the pool worker threads as blocked, allowing the pool to create new threads, and resolve the nested defer to threads.

EventualResult.wait() Possible Issue

EventualResult.wait() does not respect C-c if wait is called in the mainthread.

Unsure as to the real reason for this, but the assumption is that on some platform Queue.get(VERYLONGTIMEOUT) performs that wait in C, rather than in a while loop in python.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.