Giter VIP home page Giter VIP logo

Comments (3)

kalombos avatar kalombos commented on August 21, 2024 2

fix is merged but need additional testing

from peewee-async.

kalombos avatar kalombos commented on August 21, 2024

Well i managed to identify several problems

There is not connection releasing in aenter section of transaction.

    async def __aenter__(self):
        if not asyncio_current_task(loop=self.loop):
            raise RuntimeError("The transaction must run within a task")
        await self.db.push_transaction_async()
        if self.db.transaction_depth_async() == 1:
            await _run_no_result_sql(self.db, 'BEGIN') # there is not try finally section!!
        return self

If an error happens during executing BEGIN query the connection will be acquired forever.

Recreating the pool when have error during cursor creating.

    async def cursor_async(self):
        """Acquire async cursor.
        """
        await self.connect_async(loop=self._loop)

        if self.transaction_depth_async() > 0:
            conn = self.transaction_conn_async()
        else:
            conn = None

        try:
            return (await self._async_conn.cursor(conn=conn))
        except:
            await self.close_async() # no reason to do this!!
            raise

There is no reason to kill all connections from pool if one of them is broken. It leads to errors "connection/cursor already closed"

After executing of close_async method we can't set _task_data = None

    async def close_async(self):
        """Close async connection.
        """
        if self._async_wait:
            await self._async_wait
        if self._async_conn:
            conn = self._async_conn
            self._async_conn = None
            self._async_wait = None
            self._task_data = None 
            await conn.close()

We have to wait until all connections in _task_data will be released by transactions otherwise we will have acid problem #209

Need to improve pool management

#196 asyncio.Lock should help with this

No releasing connection in AsyncPostgresqlConnection/AsyncMySQLConnection

        async def cursor(self, conn=None, *args, **kwargs):
        """Get a cursor for the specified transaction connection
        or acquire from the pool.
        """
        in_transaction = conn is not None
        if not conn:
            conn = await self.acquire()
        cursor = await conn.cursor(*args, **kwargs) # if we get an error here the connection will be never released!!
        cursor.release = functools.partial(
            self.release_cursor, cursor,
            in_transaction=in_transaction)
        return cursor

from peewee-async.

kalombos avatar kalombos commented on August 21, 2024

Here is I tried to make a new class of database

class Transaction:
    """Asynchronous context manager (`async with`), similar to
    `peewee.transaction()`. Will start new `asyncio` task for
    transaction if not started already.
    """
    def __init__(self, db, connection):
        self.db = db
        self.connection = connection

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        try:
            async with self.connection.cursor() as cursor:
                if exc_type:
                    await cursor.execute("ROLLBACK")
                else:
                    try:
                        await cursor.execute("COMMIT")
                    except:
                        await cursor.execute("ROLLBACK")
                        raise
        finally:
            self.db.release_connection(self.connection)


def get_task_id():
    task = asyncio_current_task()
    if task:
        return id(task)
    else:
        raise Exception("no current_task")


class PostgresqlDatabaseExperimental(peewee.PostgresqlDatabase):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._pool = None
        self._async_lock = asyncio.Lock()
        self._connections_in_transaction = {}
        self._allow_sync = False

    def set_allow_sync(self, value):
        """Allow or forbid sync queries for the database. See also
        the :meth:`.allow_sync()` context manager.
        """
        self._allow_sync = value

    @contextlib.contextmanager
    def allow_sync(self):
        """Allow sync queries within context. Close sync
        connection on exit if connected.

        Example::

            with database.allow_sync():
                PageBlock.create_table(True)
        """
        old_allow_sync = self._allow_sync
        self._allow_sync = True

        try:
            yield
        except:
            raise
        finally:
            self._allow_sync = old_allow_sync
            try:
                self.close()
            except self.Error:
                pass  # already closed

    def execute_sql(self, *args, **kwargs):
        """Sync execute SQL query, `allow_sync` must be set to True.
        """
        assert self._allow_sync, (
            "Error, sync query is not allowed! Call the `.set_allow_sync()` "
            "or use the `.allow_sync()` context manager.")
        if self._allow_sync in (logging.ERROR, logging.WARNING):
            logging.log(self._allow_sync,
                        "Error, sync query is not allowed: %s %s" %
                        (str(args), str(kwargs)))
        return super().execute_sql(*args, **kwargs)


    def release_connection(self, connection):
        if self._pool:
            self._pool.release(connection)
        task_id = get_task_id()
        if task_id in self._connections_in_transaction:
            del self._connections_in_transaction[task_id]

    async def get_pool(self):
        async with self._async_lock:
            if self._pool is None:
                self._pool = await aiopg.create_pool(database=self.database, **self.connect_params)
            return self._pool

    async def close_async(self):
        """Terminate all pool connections.
        """
        async with self._async_lock:
            if self._pool is not None:
                self._pool.terminate()
                await self._pool.wait_closed()
            self._pool = None

    async def fetch_results(self, cursor, query):
        with peewee.__exception_wrapper__:
            await cursor.execute(query.sql())
            if isinstance(query, (peewee.Select, peewee.ModelCompoundSelectQuery)):
                result = AsyncQueryWrapper(cursor=cursor, query=query)
                return await result.fetchall()
            if isinstance(query, peewee.Update):
                if query._returning:
                    result = AsyncQueryWrapper(cursor=cursor, query=query)
                    return await result.fetchall()

                return cursor.rowcount

    def get_connection_in_transaction(self):
        task_id = get_task_id()
        if task_id in self._connections_in_transaction:
            return self._connections_in_transaction[task_id]

    async def async_get_connection(self):
        connection = self.get_connection_in_transaction()
        if connection:
            return connection
        pool = await self.get_pool()
        return await pool.acquire()

    def is_task_in_transaction(self):
        task_id = get_task_id()
        if task_id in self._connections_in_transaction:
            return True
        return False

    async def transaction(self):
        if self.is_task_in_transaction():
            raise Exception("already in transaction")

        connection = await self.async_get_connection()
        try:
            async with connection.cursor() as cursor:
                await cursor.execute("BEGIN")
        except:
            self.release_connection(connection)
            raise

        return Transaction(self, connection)

    async def async_execute(self, query):
        connection = await self.async_get_connection()
        try:
            async with connection.cursor() as cursor:
                return await self.fetch_results(cursor, query)
        finally:
            if not self.is_task_in_transaction():
                self.release_connection(connection)

But i realized that it will be a lot of changes and it is hard to make them as one iteration so i decided to save the changes here and refactor the code by small parts

from peewee-async.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.