Giter VIP home page Giter VIP logo

txpostgres's Introduction

This is txpostgres is a library for accessing a PostgreSQL database from the Twisted framework. It builds upon asynchronous features of the Psycopg database library, which in turn exposes the asynchronous features of libpq, the PostgreSQL C library.

It requires a version of Psycopg that includes support for asynchronous connections (versions 2.2.0 and later) and a reasonably recent Twisted (it has been tested with Twisted 10.2 onward). Alternatively, psycopg2cffi or psycopg2-ctypes can be used in lieu of Psycopg.

txpostgres tries to present an interface that will be familiar to users of both Twisted and Psycopg. It features a Cursor wrapper class that mimics the interface of a Psycopg cursor but returns Deferred objects. It also provides a Connection class that is meant to be a drop-in replacement for Twisted's adbapi.Connection with some small differences regarding connection establishing.

The main advantage of txpostgres over Twisted's built-in database support is non-blocking connection building and complete lack of thread usage.

It runs on Python 2.6, 2.7, 3.4, 3.5 and PyPy.

If you got txpostgres as a source tarball, you can run the automated test suite and install the library with:

tar xjf txpostgres-x.x.x.tar.bz2
cd txpostgres-x.x.x
trial test
sudo python setup.py install

Alternatively, just install it from PyPI:

pip install txpostgres

The library is distributed under the MIT License, see the LICENSE file for details. You can contact the author, Jan Urbański, at [email protected]. Feel free to download the source, file bugs in the issue tracker and consult the documentation

https://secure.travis-ci.org/wulczer/txpostgres.png?branch=master

txpostgres's People

Contributors

aleksi avatar graingert avatar luhn avatar mrwonko avatar wulczer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

txpostgres's Issues

Unhandled errors on TCP connection timeout

I've been testing txpostgres' response to connection failures/timeouts, and I've run into issues where, during a connection failure or timeout, exceptions occur that are not caught/do not fire an errback and so it's not possible to handle them by i.e. closing the connection and reopening it.

There are two slightly different failure modes. If a tcp_keepalive timeout is allowed to occur, then no errbacks are fired. If a tcp_retries or TCP_USER_TIMEOUT occurs, one errback fires but an unhandled exception still occurs. This happens both with SSL and non-SSL connections -- both are psycopg2.OperationalError exceptions, but the error messages are slightly different (see below).

The tracebacks

Unhandled Error
Traceback (most recent call last):
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/log.py", line 84, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/log.py", line 69, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/internet/posixbase.py", line 581, in _doReadOrWrite
    why = selectable.doRead()
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 584, in doRead
    _PollingMixin.doRead(self)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 151, in doRead
    self.continuePolling()
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 123, in continuePolling
    state = self.pollable().poll()
psycopg2.OperationalError: SSL SYSCALL error: EOF detected
Unhandled Error
Traceback (most recent call last):
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/log.py", line 84, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/log.py", line 69, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/internet/posixbase.py", line 581, in _doReadOrWrite
    why = selectable.doRead()
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 584, in doRead
    _PollingMixin.doRead(self)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 151, in doRead
    self.continuePolling()
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 123, in continuePolling
    state = self.pollable().poll()
psycopg2.OperationalError: could not receive data from server: Connection timed out

Reproducing this error

I created a program to reproduce this: https://gist.github.com/1661012. Apologies if it's not very idiomatic Twisted, but hopefully it does the job.

This must be run on a Linux machine as it requires iptables to simulate connection failures. Having TCP fail quickly using the TCP_USER_TIMEOUT socket option requires running this with kernel >= 2.6.37. Further, the python modules twisted, txpostgres and psycopg2 are required. Finally, a PostgreSQL database listening on 127.0.0.1 with no password needed to access the 'postgres' database from localhost is required.

The program will provide prompts to add and remove an iptables rule -- you can just start the program and follow along.

You can run the program with or without an 'ssl' argument to enable or disable SSL on the connection.

If you don't want to run the program yourself, both failure modes are shown below (which include the tracebacks already shown above):

$ python demo_conn.py ssl (sslmode=require)

An Errback is called (ERRBACK: (<class 'psycopg2.OperationalError'>, SSL SYSCALL error: Connection timed out)) but an unhandled error still occurs:

Unhandled Error
Traceback (most recent call last):
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/log.py", line 84, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/log.py", line 69, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/internet/posixbase.py", line 581, in _doReadOrWrite
    why = selectable.doRead()
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 584, in doRead
    _PollingMixin.doRead(self)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 151, in doRead
    self.continuePolling()
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 123, in continuePolling
    state = self.pollable().poll()
psycopg2.OperationalError: SSL SYSCALL error: EOF detected

$ python demo_conn.py (sslmode=disable)

An Errback is called (ERRBACK: (<class 'psycopg2.OperationalError'>, could not receive data from server: Connection timed out) but an unhandled error still occurs:

Unhandled Error
Traceback (most recent call last):
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/log.py", line 84, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/log.py", line 69, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/twisted/internet/posixbase.py", line 581, in _doReadOrWrite
    why = selectable.doRead()
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 584, in doRead
    _PollingMixin.doRead(self)
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 151, in doRead
    self.continuePolling()
  File "/home/woodrow/tmp/txpostgres_noerrback/virt-python/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 123, in continuePolling
    state = self.pollable().poll()
psycopg2.OperationalError: could not receive data from server: Connection timed out

My Environment:

Python 2.7.2 (default, Nov 21 2011, 17:25:27) 
>>> import txpostgres
>>> print(txpostgres.__version__)
(1, 0, 0)
>>> import twisted
>>> print(twisted.__version__)
11.1.0
>>> import psycopg2
>>> print(psycopg2.__version__)
2.4.4 (dt dec pq3 ext)

Add some kind of self-healing to the pool class

Currently, when a postgres server is restarted, any application using txpostgres's ConnectionPool with it has either to detect it (and replace the connection on its own) or be restarted manually.

I believe the handling of such errors should be the (transparent) task of the pool. I thought about it poking a bit in the source code and came up with three options, sorted by decreasing complexity (and convenience):

Write a Pool that re-connects and re-queries if a connection got lost

There would be a lot of refactoring necessary in order to have all the information it needs to retry (runOperation or runQuery and the query itself). Auto-Retry is also kind of dangerous when I think about it.

Replace all connections in the pool once a connection error occurs

A bit tricky to achieve, as connections that aren't in the pool when the first error is triggered might cause reconnections all over again.

A solution might be tagging the connections with a "generation tag" (a simple increasing int would be enough) that's also saved as the current tag for the pool. Once a connection is lost, connections with older tags would be reconnected, tagged with the new tag and a specific "ConnectionLost" exception would be raised. If the tag of the lost connection shows that the connection belongs to an older generation, it would just refresh itself – leaving the pool alone.

The application would still have to handle disconnect errors, but only once and wouldn't have to reconnect on it's own. This would be my favorite solution I think.

Don't re-add broken connections

And finally the both most simple and robust solution (that in fact could be added independent of the above): If _putBackAndPassthrough() encounters a connections error, it doesn't add it back into the pool but creates a new instead.

The drawback would be, that it would take min errors (ie the number of connections in the pool), till the pool is healthy again. I'd add this as a simple boolean kwargs to the ConnectionPool.

I hope it's halfway understandable what I mean. :) Let me know what you think!

error handling with nonexistent table names/wrong connection password

From a comment by https://github.com/oberstet in #5 (comment)

I did some testing with

https://github.com/oberstet/txpostgres/blob/testing/testing/txpostgres_error.py

2 cases:

a) correct database password, should fail on "select * from nonexistent"
b) incorrect password

The latest
d13f8ff

give me tracebacks on both:

https://gist.github.com/1353252

The latest
https://github.com/smira/txpostgres/commit/e8571b02dced034a5289ef8cc4f4ea3e8d02d814

gives me traceback only on b), and works for a):

https://gist.github.com/1353259

Connection.runOperation() and runQuery() may not close the cursor, either

Prompted by #35 I took a look at runQuery() and runOperation(), and they seem to have the same problem as runInteraction() in case of failure. I tried fixing it the same way you fixed runInteraction() but that broke the disconnectWhileRunning test - cursor.close() mustn't be called while the query is still running, but connectionLost() calls the errback (I think?).

How to create a reconnection pool?

I have a ConnectionPoolwhere i want the connections to reconnect if something goes wrong, thus I set the connectionFactory attribute as shown below.

from txpostgres import txpostgres, reconnection
from twisted.internet import defer
from .sql import register_pgsql_types
from functools import partial
from ocmg_common import log
from psycopg2.extras import NamedTupleConnection


class LoggingDetector(reconnection.DeadConnectionDetector):

    def startReconnecting(self, f):
        log.info('Database connection is down, reconnection sequence '
                 'initiated, error {}'.format(f.value))
        return reconnection.DeadConnectionDetector.startReconnecting(self, f)

    def reconnect(self):
        log.info('Attempting to reestablish connecting...')
        return reconnection.DeadConnectionDetector.reconnect(self)

    def connectionRecovered(self):
        log.info('Yehaa, Connection to database has been recovered')
        return reconnection.DeadConnectionDetector.connectionRecovered(self)


class ConnectionPool(txpostgres.ConnectionPool):

    connectionFactory = partial(
        txpostgres.Connection,
        detector=LoggingDetector())

    def __init__(self, **connkw):
        self.min = connkw.get('min', 1)
        connkw['connection_factory'] = NamedTupleConnection
        txpostgres.ConnectionPool.__init__(self, None, **connkw)

@defer.inlineCallbacks
def build_connectionpool(register_types=True, **connkw):
    pool = ConnectionPool(**connkw)
    yield pool.start()
    if register_types:
        yield register_pgsql_types(pool)
    defer.returnValue(pool)

After I close the database I get a DeadConnection error as expectet, when I start the the database again I mostly get. psycopg2.InterfaceError("connection already closed") on calls to the pool. This only happens when the number of connections is greater that 1.

Am I doing anything wrong or can anybody explain this behavior. Maybe one can add a Connection pool with reconnecting connections example. I am on Twisted 15.0.0 and txpostgres 1.2.0 from pipy.

All help is greatly appreciated.

Connection.connectionFactory and cursorFactory are treated as methods of Connection

in txpostgres.py:248 you have:

class Connection(_PollingMixin):
    ...
    connectionFactory = psycopg2.connect
    cursorFactory = Cursor

now later in that file in Connection.connect you have:

            self._connection = self.connectionFactory(*args, **kwargs)

The problem there is that, on Python 2.7 at least, self.connectionFactory is treated as a bound method of Connection, so self is passed in implicitly, causing psycopg2.connect to error out:

Traceback (most recent call last):
Failure: twisted.internet.defer.FirstError: FirstError[#0, [Failure instance: Traceback: <type 'exceptions.TypeError'>: argument 1 must be string, not Connection
    [snip]
--- <exception caught here> ---
/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/txpostgres/txpostgres.py:293:connect
/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/psycopg2/__init__.py:179:connect
]]

disconnecting while a query is underway hangs when using epoll

This test case hangs when using the epoll reactor:

def test_disconnectWhileRunning(self):
    """
    Disconnecting from the server when there is a query underway causes the
    query to fail with ConnectionDone.
    """
    d = self.conn.runQuery("select pg_sleep(5)")
    reactor.callLater(0, self.conn.close)
     def restoreConnection(res):
        self.conn = txpostgres.Connection()
        d = self.conn.connect(user=DB_USER, password=DB_PASS,
                              host=DB_HOST, database=DB_NAME)
        return d.addCallback(lambda _: res)

    d = self.assertFailure(d, error.ConnectionDone)
    # restore the connection, otherwise all the other tests will fail
    return d.addBoth(restoreConnection)

The cause is almost certainly a quirk in Linux epoll, where if a file descriptor is closed, it gets removed from the watched set automatically (see Q6 in man epoll(2)). The result is that when psycopg2 closes the connection FD, Twisted never see that and the cursor's connectionLost is never called.

What probably will have to happen is that the connection will have to track all its cursors and call connectionLost() manually on all of them when its close() method is called.

No way to specify cursor_factory=DictCursor

I have my own patched version of txpostgres to allow me to do the following:

return txpostgres.ConnectionPool('pyPgSQL.PgSQL', self._connstr, cursor_factory=DictCursor, min=1)

Can we merge this functionality to txpostgres? It seems like a justified request given that psycopg2 has this.

`ProgrammingError` after reconnecting.

I'm using ConnectionPool with DeadConnectionDetector. Here's my connection function, if it helps.

This morning, my primary database failed, which happened to be during a period of high-frequency querying. So of course, psycopg2.OperationalError appears several times, before it was replaced with ConnectionDead. Shortly thereafter, my standby server was promoted to master. txpostgres was then able to reconnect, which [I assume] it did. But then, immediately, the following error starts: psycopg2.ProgrammingError: execute cannot be used while an asynchronous query is underway. As far as I can tell, no queries were successfully executed until I restarted the app.

I've been digging around the txpostgres code to try to find how this could happen, but I haven't had any enlightenment so far. I was wondering if you might have any ideas.

Here are some tracebacks, but—as is usual for Twisted—they aren't very helpful.

reactor errors upon backend terminating connection

Seen on d13f8ff

2011-11-10 15:18:30,171 ERROR twisted.internet.epollreactor : Unhandled Error
Traceback (most recent call last):
  File "/usr/lib64/python2.6/site-packages/twisted/python/log.py", line 84, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/usr/lib64/python2.6/site-packages/twisted/python/log.py", line 69, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/usr/lib64/python2.6/site-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/usr/lib64/python2.6/site-packages/twisted/python/context.py", line 81, in callWithContext
    return func(*args,**kw)
---  ---
  File "/usr/lib64/python2.6/site-packages/twisted/internet/epollreactor.py", line 217, in _doReadOrWrite
    why = selectable.doRead()
  File "/srv/code/products/rbuilder/x/include/txpostgres/txpostgres.py", line 556, in doRead
    _PollingMixin.doRead(self)
  File "/srv/code/products/rbuilder/x/include/txpostgres/txpostgres.py", line 136, in doRead
    self.continuePolling()
  File "/srv/code/products/rbuilder/x/include/txpostgres/txpostgres.py", line 108, in continuePolling
    state = self.pollable().poll()
psycopg2.OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.


2011-11-10 15:18:30,184 ERROR twisted.internet.epollreactor : Unhandled Error
Traceback (most recent call last):
  File "/srv/code/products/rbuilder/x/include/rmake3/lib/daemon.py", line 357, in doWork
    self.reactor.run()
  File "/usr/lib64/python2.6/site-packages/twisted/internet/base.py", line 1162, in run
    self.mainLoop()
  File "/usr/lib64/python2.6/site-packages/twisted/internet/base.py", line 1174, in mainLoop
    self.doIteration(t)
  File "/usr/lib64/python2.6/site-packages/twisted/internet/epollreactor.py", line 197, in doPoll
    log.callWithLogger(selectable, _drdw, selectable, fd, event)
---  ---
  File "/usr/lib64/python2.6/site-packages/twisted/python/log.py", line 84, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/usr/lib64/python2.6/site-packages/twisted/python/log.py", line 69, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/usr/lib64/python2.6/site-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/usr/lib64/python2.6/site-packages/twisted/python/context.py", line 81, in callWithContext
    return func(*args,**kw)
  File "/usr/lib64/python2.6/site-packages/twisted/internet/epollreactor.py", line 230, in _doReadOrWrite
    self._disconnectSelectable(selectable, why, inRead)
  File "/usr/lib64/python2.6/site-packages/twisted/internet/posixbase.py", line 256, in _disconnectSelectable
    selectable.connectionLost(failure.Failure(why))
  File "/srv/code/products/rbuilder/x/include/txpostgres/txpostgres.py", line 170, in connectionLost
    self.continuePolling()
  File "/srv/code/products/rbuilder/x/include/txpostgres/txpostgres.py", line 108, in continuePolling
    state = self.pollable().poll()
psycopg2.OperationalError: connection not open

Failures gitting reactor when trying to connect to nonexisting server/with wrong credentials

Hey there,
I was trying to use txpostgres to implement the writer smart enough to handle connection problems. The following code is my setup of the connection:

        self._db = self._txpostgres.ConnectionPool(
            "postgres", min=1, **self._credentials)

        d = self._db.start()
        d.addCallbacks(self._connection_established, self._connection_failed)
        return d

When the credentials are bad I get to _connection_failed as expected (hurray!) with defer.FirstError wrapping OperationError (this is correct by all means). The problem is that the same error is hitting reactor even though I handle it. I can see two types of tracebacks in the logs:

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/Twisted-10.1.0-py2.6-linux-i686.egg/twisted/internet/selectreactor.py", line 146, in _doReadOrWrite
    why = getattr(selectable, method)()
  File "/home/kowal/projects/txpostgres/txpostgres/txpostgres.py", line 564, in doRead
    _PollingMixin.doRead(self)
  File "/home/kowal/projects/txpostgres/txpostgres/txpostgres.py", line 141, in doRead
    self.continuePolling()
  File "/home/kowal/projects/txpostgres/txpostgres/txpostgres.py", line 113, in continuePolling
    state = self.pollable().poll()
psycopg2.OperationalError: asynchronous connection failed

and

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/Twisted-10.1.0-py2.6-linux-i686.egg/twisted/python/log.py", line 84, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/usr/local/lib/python2.6/dist-packages/Twisted-10.1.0-py2.6-linux-i686.egg/twisted/python/log.py", line 69, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/usr/local/lib/python2.6/dist-packages/Twisted-10.1.0-py2.6-linux-i686.egg/twisted/python/context.py", line 59, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/usr/local/lib/python2.6/dist-packages/Twisted-10.1.0-py2.6-linux-i686.egg/twisted/python/context.py", line 37, in callWithContext
    return func(*args,**kw)
  File "/usr/local/lib/python2.6/dist-packages/Twisted-10.1.0-py2.6-linux-i686.egg/twisted/internet/selectreactor.py", line 156, in _doReadOrWrite
    self._disconnectSelectable(selectable, why, method=="doRead")
  File "/usr/local/lib/python2.6/dist-packages/Twisted-10.1.0-py2.6-linux-i686.egg/twisted/internet/posixbase.py", line 256, in _disconnectSelectable
    selectable.connectionLost(failure.Failure(why))
  File "/home/kowal/projects/txpostgres/txpostgres/txpostgres.py", line 175, in connectionLost
    self.continuePolling()
  File "/home/kowal/projects/txpostgres/txpostgres/txpostgres.py", line 113, in continuePolling
    state = self.pollable().poll()
psycopg2.OperationalError: asynchronous connection failed

So far my best idea to get around this problem is to use psycopg2 to connect synchronously and after, when I know the server is tehre, use txpostgres. But it's sooo dirty, has timing uses, etc... imo there should be a way to do this cleanly. Any advice?

Python 2.11 with pip 23.2.1 refuses to install txpostgres

I'm a bit unclear on the details but python-3.11 with pip-23.2.1 on CentOS7 is refusing to install txpostgres for me:

pip install  txpostgres
ERROR: Could not find a version that satisfies the requirement txpostgres (from versions: none)
ERROR: No matching distribution found for txpostgres

Maybe related to:
pypa/pip#8559

It installs correctly with python-3.9 and pip-23.2.1 on the same machine.

Implementation of ReconnectingConnectionPool ?

Hi,

I'm a newbie with txpostgres, but a feature is missing for me, it's an automatic reconnection pool, as described here for adbapi:
http://www.gelens.org/2009/09/13/twisted-connectionpool-revisited/

From my point of view, it's an important feature in a daemon, I don't understand isn't standard in adbapi.

With txpostgres, I've read some discussions about this in #18 and #20 but nothing to implement this directly inside txpostgres.

Why it isn't implemented in txpostgres ? Lack of time to do this, or it's a mistake to implement a generic reconnecting connection pool ?

Thanks for your answer.

Regards.

python setup.py install failing with import error

Installing the package from source fails with the following error:

Traceback (most recent call last):
  File "setup.py", line 28, in <module>
    from txpostgres import __versionstr__
ImportError: cannot import name __versionstr__

The setup.py script, at line 25, inserts the txpostgres directory in the path, but it's the parent directory which should be added instead:

sys.path.insert(0, os.path.dirname(__file__))

Connection.runInteraction() may not close cursor?

I just read through the source for runInteraction() since the documentation left me wondering whether I was supposed to manually close the cursor in the supplied function. (Apparently not.)

If I'm reading this right, rollbackAndPassthrough() will always return a Failure, causing the callback that closes the cursor to never be called since the errback will be called instead? Shouldn't it be addBoth() instead of addCallback() for the c.close() lambda? Or am I missing something?

Shutting down postgres with a valid connection throws ValueError on doRead

Using:
Twisted-12.0.0
pyscopg2-2.5.1

By manually stopping postgresql with an active connection open, the connection is not being properly closed and the doRead function tries to add itself back into the reader list when it has a non-existent file descriptor (-1)

Traceback (most recent call last):
  File "/home/max/nxt/magdb/magdb/connection.py", line 25, in doRead
    txpostgres.Connection.doRead(self)
  File "/home/max/nxt/storesrv/venv/local/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 689, in doRead
    self.reactor.addReader(self)
  File "/home/max/nxt/storesrv/venv/local/lib/python2.7/site-packages/twisted/internet/pollreactor.py", line 117, in addReader
    self._updateRegistration(fd)
  File "/home/max/nxt/storesrv/venv/local/lib/python2.7/site-packages/twisted/internet/pollreactor.py", line 74, in _updateRegistration
    self._poller.unregister(fd)
ValueError: file descriptor cannot be a negative integer (-1)
CONNECTION: <connection object at 0x2ac86b0; dsn: 'sslmode=require connect_timeout=5 keepalives_idle=5 keepalives_interval=5 keepalives_count=4 dbname=storage host=127.0.0.1 password=xxxxxxxxxx', closed: 0>

Allow access to rowcount for operations

Occasionally I need to know how many rows where affected by a DELETE or UPDATE. This is currently not possible, since runOperation and runQuery both discard the cursor.

Calling reactor.stop() while queries are underway raises multiple psycopg2.ProgrammingError

When I have a number of txpostgres.txpostgres.ConnectionPool queries open and I call reactor.stop(), the terminal gets flooded with this traceback:

2011-05-14 15:57:00+0000 [cursor] Unhandled Error
    Traceback (most recent call last):
      File "/home/peter/bzr/env/lib/python2.6/site-packages/Twisted-11.0.0-py2.6-linux-i686.egg/twisted/internet/defer.py", line 542, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
      File "/home/peter/bzr/env/lib/python2.6/site-packages/txpostgres/txpostgres.py", line 407, in rollbackAndPassthrough
        e = defer.maybeDeferred(cursor.execute, "rollback")
      File "/home/peter/bzr/env/lib/python2.6/site-packages/Twisted-11.0.0-py2.6-linux-i686.egg/twisted/internet/defer.py", line 133, in maybeDeferred
        result = f(*args, **kw)
      File "/home/peter/bzr/env/lib/python2.6/site-packages/txpostgres/txpostgres.py", line 178, in execute
        return self._doit('execute', query, params)
    --- <exception caught here> ---
      File "/home/peter/bzr/env/lib/python2.6/site-packages/txpostgres/txpostgres.py", line 196, in _doit
        getattr(self._cursor, name)(*args, **kwargs)
    psycopg2.ProgrammingError: execute cannot be used while an asynchronous query is underway

I never get my shell prompt back without killing the process, so I assume it's stuck in a loop somewhere.

I've written some code to reproduce this:

git clone git://gist.github.com/972617.git gist-972617

psycopg2==2.4.4 has a problem with txpostgres

I have a quick hack to show the problem. I have not spent much time looking into it, but I have simple test to show the issue:

Example Script:

(txbaaa)[test]% cat pgsql2.py

from txpostgres import txpostgres

import sys
from twisted.internet import defer, reactor
from twisted.python import log

log.startLogging(sys.stdout)

@defer.inlineCallbacks
def example():
    conn = txpostgres.Connection()
    yield conn.connect(host='localhost', user='txbaaa', database='txbaaa')
    r = yield conn.runQuery("Select * from bsession")
    print r
    defer.returnValue(r)

if __name__ == "__main__":
    reactor.callWhenRunning(example)
    reactor.run()

Error with version 2.4.4:

(txbaaa)[test]% pip freeze
CorePost==0.0.8
FormEncode==1.2.4
Jinja2==2.6
PyYAML==3.10
Twisted==11.1.0
cyclone==0.4
ldaptor==0.0.0
psycopg2==2.4.4
pyOpenSSL==0.13
pyparsing==1.5.6
pyrad==2.0
six==1.1.0
txpostgres==0.7.0
txredis==2.2
txrestapi==0.1
wsgiref==0.1.2
zope.interface==3.8.0

(txbaaa)[test]% python pgsql2.py                                                          
2011-12-28 11:51:01-0500 [-] Log opened.
2011-12-28 11:51:01-0500 [-] Unhandled error in Deferred:
2011-12-28 11:51:01-0500 [-] Unhandled Error
        Traceback (most recent call last):
          File "/Users/jeremyrossi/src/bb/txbaaa/lib/python2.7/site-packages/twisted/internet/base.py", line 426, in _continueFiring
            callable(*args, **kwargs)
          File "/Users/jeremyrossi/src/bb/txbaaa/lib/python2.7/site-packages/twisted/internet/defer.py", line 1181, in unwindGenerator
            return _inlineCallbacks(None, gen, Deferred())
          File "/Users/jeremyrossi/src/bb/txbaaa/lib/python2.7/site-packages/twisted/internet/defer.py", line 1039, in _inlineCallbacks
            result = g.send(result)
          File "pgsql2.py", line 14, in example
            yield conn.connect(host='localhost', user='txbaaa', database='txbaaa')
        --- <exception caught here> ---
          File "/Users/jeremyrossi/src/bb/txbaaa/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 293, in connect
            self._connection = self.connectionFactory(*args, **kwargs)
          File "/Users/jeremyrossi/src/bb/txbaaa/lib/python2.7/site-packages/psycopg2/__init__.py", line 179, in connect
            connection_factory=connection_factory, async=async)
        exceptions.TypeError: argument 1 must be string, not Connection

^C2011-12-28 11:51:03-0500 [-] Received SIGINT, shutting down.
2011-12-28 11:51:03-0500 [-] Main loop terminated.

Working with 2.2:

(txbaaa)[test]% pip freeze
CorePost==0.0.8
FormEncode==1.2.4
Jinja2==2.6
PyYAML==3.10
Twisted==11.1.0
cyclone==0.4
ldaptor==0.0.0
psycopg2==2.2.0
pyOpenSSL==0.13
pyparsing==1.5.6
pyrad==2.0
six==1.1.0
txpostgres==0.7.0
txredis==2.2
txrestapi==0.1
wsgiref==0.1.2
zope.interface==3.8.0

(txbaaa)[test]% python pgsql2.py
2011-12-28 11:54:05-0500 [-] Log opened.
2011-12-28 11:54:05-0500 [cursor] [('jrossi26', 'gateway', '10.1.1.1/32', datetime.datetime(2011, 12, 28, 10, 15, 50, 741080))]

Large amounts of notifications can block reactor

Due to the current implementation of checkForNotifies, the reactor thread could spent quite a lot of time in the while self._connection.notifies loop if the amount of notifications is high. This will cause some unexpected results in program behavior.

It would be nice if notifications are dispatched to the observer more gracefully, eg. making connection.notifies a DeferredQueue and having a t.i.t.Cooperator consume that queue and notify observers accordingly.

The same applies if the reactor spends a lot of time in for observer loop.

No results to fetch on insert with runQuery

I'm inserting a row into a table:

    creator_id = int(puppet.creator_dbref[1:])

    conn = yield get_vanilla_db_connection()
    query_str = (
        'INSERT INTO arena_match'
        '  (arena_id, creator_id, game_mode, game_state, difficulty_level,'
        '   highest_wave_completed, mode_specific_data, created_time)'
        '  VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
    )
    value_tuple = (
        puppet.id,
        creator_id,
        'wave',
        'staging',
        puppet.difficulty_level,
        0,
        Json({}),
        datetime.datetime.now()
    )
    # TODO: Get the inserted value back.
    result = yield conn.runQuery(query_str, value_tuple)
    print "RESULT', result"

get_vanilla_db_connection() creates and connects to a txpostgres connection that is defined as such:

txpostgres.Connection(detector=reconnection.DeadConnectionDetector())

Running this, I get:

    --- <exception caught here> ---
      File "/Users/gtaylor/.virtualenvs/battlesnake/lib/python2.7/site-packages/twisted/internet/defer.py", line 1097, in _inlineCallbacks
        result = result.throwExceptionIntoGenerator(g)
      File "/Users/gtaylor/.virtualenvs/battlesnake/lib/python2.7/site-packages/twisted/python/failure.py", line 389, in throwExceptionIntoGenerator
        return g.throw(self.type, self.value, self.tb)
      File "/Users/gtaylor/workspace/testbs/battlesnake/plugins/contrib/arena_master/inbound_commands.py", line 222, in run
        puppet = yield create_arena(p, invoker_dbref)
      File "/Users/gtaylor/.virtualenvs/battlesnake/lib/python2.7/site-packages/twisted/internet/defer.py", line 1097, in _inlineCallbacks
        result = result.throwExceptionIntoGenerator(g)
      File "/Users/gtaylor/.virtualenvs/battlesnake/lib/python2.7/site-packages/twisted/python/failure.py", line 389, in throwExceptionIntoGenerator
        return g.throw(self.type, self.value, self.tb)
      File "/Users/gtaylor/workspace/testbs/battlesnake/plugins/contrib/arena_master/arena_crud/creation.py", line 42, in create_arena
        yield insert_match_in_db(puppet)
      File "/Users/gtaylor/.virtualenvs/battlesnake/lib/python2.7/site-packages/twisted/internet/defer.py", line 1097, in _inlineCallbacks
        result = result.throwExceptionIntoGenerator(g)
      File "/Users/gtaylor/.virtualenvs/battlesnake/lib/python2.7/site-packages/twisted/python/failure.py", line 389, in throwExceptionIntoGenerator
        return g.throw(self.type, self.value, self.tb)
      File "/Users/gtaylor/workspace/testbs/battlesnake/plugins/contrib/arena_master/db_api.py", line 40, in insert_match_in_db
        result = yield conn.runQuery(query_str, value_tuple)
      File "/Users/gtaylor/.virtualenvs/battlesnake/lib/python2.7/site-packages/twisted/internet/defer.py", line 577, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
      File "/Users/gtaylor/.virtualenvs/battlesnake/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 522, in <lambda>
        d.addCallback(lambda c: c.fetchall())
    psycopg2.ProgrammingError: no results to fetch

Shouldn't the first (and only) result be the inserted row's ID?

Unhandled error after `connect()` fails

I suspect that my initial comments on the issue might not be valid anymore; my most recent comment below should provide a clearer and more correct description of the issue

I have some DB wrapper code that is handling DB restarts (i.e. disconnects) by continuously trying to reestablish the connection. Everything is more or less working, with one exception.

It seems that right after the existing connection fails, and after a new attempt also fails (it takes our PostgreSQL 10-15 seconds to accepts new connections after a restart), if you don't explicitly call db.close(); db.del (db is the ConnectionPool instance), the Twisted reactor keeps calling ConnectionPool.doRead(), which in turn eventually reaches the point where psycopg2.connection.poll() (self.pollable().poll()) is called which in turn causes en error (because the connection has already failed) which is then not pushed to the self._pollingD errback as that deferred has been previously "consumed" by the previous error, thus resulting in an Unhandled error being printed—and there's no way to catch this error as self._pollingD was the only way to do it.

So it seems that txpostgres is not cancelling some of the recurring calls coming from the Twisted reactor.

With an explicit db.close(); del db right after a connection error, everything is working smoothly—but it's surprising behaviour and really counterintuitive to manually have to do db.close(); del db right after a failed attempt to db.start().

Here's the code that I'm using (slightly simplified):

(below the code there's also the unhandled error traceback coming from the reactor, after the connection has already failed, i.e. an error has already been passed into self._pollingD.errback and self._pollingD has been set to None, so the next time an error occurs, it's just re-raised, and then the reactor does print "Unhandled error")

class DbUtil(object):

    _db = None
    _connecting_d = None

    @inlineCallbacks
    def _ensure_connected(self, reconnect=False):
        if not self._db or reconnect:
            if self._connecting_d:
                log.msg("already connecting, blocking")
                yield self._connecting_d
                log.msg("...unblocked")
                return

            log.msg('connecting')
            self._connecting_d = Deferred()

            if reconnect:
                log.msg('closing defunct DB')
                self.close()
                yield sleep(5.0)
                log.msg('woke')

            while True:
                try:
                    log.msg('starting ConnectionPool...')
                    self._db = txpostgres.ConnectionPool('pyPgSQL.PgSQL', 'dbname=txpostgres_test')
                    yield self._db.start()
                except Exception as e:
                    log.err("error while connecting: %s" % type(e).__name__)
                    self.close()
                    yield sleep(2.0)
                    log.msg("woke")
                else:
                    log.msg("...DONE")
                    break
            d, self._connecting_d = self._connecting_d, None
            d.callback(None)

    @inlineCallbacks
    def _run_with_retry(self, f):
        while True:
            try:
                ret = yield f()
            except (DatabaseError, OperationalError) as e:
                log.err("database error %s; attempting to reconnect" % type(e).__name__)
                yield self._ensure_connected(reconnect=True)
            else:
                returnValue(ret)

    @inlineCallbacks
    def query(self, sql, *args, **kwargs):
        yield self._ensure_connected()
        ret = yield self._run_with_retry(lambda: self._db.runQuery(sql, *args, **kwargs))
        returnValue(ret)

    @inlineCallbacks
    def query_one(self, sql, *args, **kwargs):
        yield self._ensure_connected()
        res = yield self._run_with_retry(lambda: self.query(sql, *args, **kwargs))
        returnValue(res[0] if res else None)

    @inlineCallbacks
    def query_none(self, sql, *args, **kwargs):
        yield self._ensure_connected()
        yield self._run_with_retry(lambda: self._db.runOperation(sql, *args, **kwargs))

    def close(self):
        if self._db:
            db, self._db = self._db, None
            db.close()

The unhandled error traceback:

2012-02-02 17:35:12+0200 [connection] Unhandled Error
    Traceback (most recent call last):
      File "/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/twisted/python/log.py", line 84, in callWithLogger
        return callWithContext({"system": lp}, func, *args, **kw)
      File "/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/twisted/python/log.py", line 69, in callWithContext
        return context.call({ILogContext: newCtx}, func, *args, **kw)
      File "/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
        return self.currentContext().callWithContext(ctx, func, *args, **kw)
      File "/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
        return func(*args,**kw)
    --- <exception caught here> ---
      File "/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/twisted/internet/selectreactor.py", line 150, in _doReadOrWrite
        why = getattr(selectable, method)()
      File "/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 595, in doRead
        _PollingMixin.doRead(self)
      File "/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 158, in doRead
        self.continuePolling()
      File "/Users/erik.allik/.virtualenvs/dts-agent/lib/python2.7/site-packages/txpostgres/txpostgres.py", line 127, in continuePolling
        state = self.pollable().poll()
    psycopg2.OperationalError: asynchronous connection failed

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.