Comments (3)
fix is merged but need additional testing
from peewee-async.
Well i managed to identify several problems
There is not connection releasing in aenter section of transaction.
async def __aenter__(self):
if not asyncio_current_task(loop=self.loop):
raise RuntimeError("The transaction must run within a task")
await self.db.push_transaction_async()
if self.db.transaction_depth_async() == 1:
await _run_no_result_sql(self.db, 'BEGIN') # there is not try finally section!!
return self
If an error happens during executing BEGIN query the connection will be acquired forever.
Recreating the pool when have error during cursor creating.
async def cursor_async(self):
"""Acquire async cursor.
"""
await self.connect_async(loop=self._loop)
if self.transaction_depth_async() > 0:
conn = self.transaction_conn_async()
else:
conn = None
try:
return (await self._async_conn.cursor(conn=conn))
except:
await self.close_async() # no reason to do this!!
raise
There is no reason to kill all connections from pool if one of them is broken. It leads to errors "connection/cursor already closed"
After executing of close_async method we can't set _task_data = None
async def close_async(self):
"""Close async connection.
"""
if self._async_wait:
await self._async_wait
if self._async_conn:
conn = self._async_conn
self._async_conn = None
self._async_wait = None
self._task_data = None
await conn.close()
We have to wait until all connections in _task_data will be released by transactions otherwise we will have acid problem #209
Need to improve pool management
#196 asyncio.Lock should help with this
No releasing connection in AsyncPostgresqlConnection/AsyncMySQLConnection
async def cursor(self, conn=None, *args, **kwargs):
"""Get a cursor for the specified transaction connection
or acquire from the pool.
"""
in_transaction = conn is not None
if not conn:
conn = await self.acquire()
cursor = await conn.cursor(*args, **kwargs) # if we get an error here the connection will be never released!!
cursor.release = functools.partial(
self.release_cursor, cursor,
in_transaction=in_transaction)
return cursor
from peewee-async.
Here is I tried to make a new class of database
class Transaction:
"""Asynchronous context manager (`async with`), similar to
`peewee.transaction()`. Will start new `asyncio` task for
transaction if not started already.
"""
def __init__(self, db, connection):
self.db = db
self.connection = connection
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
async with self.connection.cursor() as cursor:
if exc_type:
await cursor.execute("ROLLBACK")
else:
try:
await cursor.execute("COMMIT")
except:
await cursor.execute("ROLLBACK")
raise
finally:
self.db.release_connection(self.connection)
def get_task_id():
task = asyncio_current_task()
if task:
return id(task)
else:
raise Exception("no current_task")
class PostgresqlDatabaseExperimental(peewee.PostgresqlDatabase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._pool = None
self._async_lock = asyncio.Lock()
self._connections_in_transaction = {}
self._allow_sync = False
def set_allow_sync(self, value):
"""Allow or forbid sync queries for the database. See also
the :meth:`.allow_sync()` context manager.
"""
self._allow_sync = value
@contextlib.contextmanager
def allow_sync(self):
"""Allow sync queries within context. Close sync
connection on exit if connected.
Example::
with database.allow_sync():
PageBlock.create_table(True)
"""
old_allow_sync = self._allow_sync
self._allow_sync = True
try:
yield
except:
raise
finally:
self._allow_sync = old_allow_sync
try:
self.close()
except self.Error:
pass # already closed
def execute_sql(self, *args, **kwargs):
"""Sync execute SQL query, `allow_sync` must be set to True.
"""
assert self._allow_sync, (
"Error, sync query is not allowed! Call the `.set_allow_sync()` "
"or use the `.allow_sync()` context manager.")
if self._allow_sync in (logging.ERROR, logging.WARNING):
logging.log(self._allow_sync,
"Error, sync query is not allowed: %s %s" %
(str(args), str(kwargs)))
return super().execute_sql(*args, **kwargs)
def release_connection(self, connection):
if self._pool:
self._pool.release(connection)
task_id = get_task_id()
if task_id in self._connections_in_transaction:
del self._connections_in_transaction[task_id]
async def get_pool(self):
async with self._async_lock:
if self._pool is None:
self._pool = await aiopg.create_pool(database=self.database, **self.connect_params)
return self._pool
async def close_async(self):
"""Terminate all pool connections.
"""
async with self._async_lock:
if self._pool is not None:
self._pool.terminate()
await self._pool.wait_closed()
self._pool = None
async def fetch_results(self, cursor, query):
with peewee.__exception_wrapper__:
await cursor.execute(query.sql())
if isinstance(query, (peewee.Select, peewee.ModelCompoundSelectQuery)):
result = AsyncQueryWrapper(cursor=cursor, query=query)
return await result.fetchall()
if isinstance(query, peewee.Update):
if query._returning:
result = AsyncQueryWrapper(cursor=cursor, query=query)
return await result.fetchall()
return cursor.rowcount
def get_connection_in_transaction(self):
task_id = get_task_id()
if task_id in self._connections_in_transaction:
return self._connections_in_transaction[task_id]
async def async_get_connection(self):
connection = self.get_connection_in_transaction()
if connection:
return connection
pool = await self.get_pool()
return await pool.acquire()
def is_task_in_transaction(self):
task_id = get_task_id()
if task_id in self._connections_in_transaction:
return True
return False
async def transaction(self):
if self.is_task_in_transaction():
raise Exception("already in transaction")
connection = await self.async_get_connection()
try:
async with connection.cursor() as cursor:
await cursor.execute("BEGIN")
except:
self.release_connection(connection)
raise
return Transaction(self, connection)
async def async_execute(self, query):
connection = await self.async_get_connection()
try:
async with connection.cursor() as cursor:
return await self.fetch_results(cursor, query)
finally:
if not self.is_task_in_transaction():
self.release_connection(connection)
But i realized that it will be a lot of changes and it is hard to make them as one iteration so i decided to save the changes here and refactor the code by small parts
from peewee-async.
Related Issues (20)
- ACID violation: implicit lose transaction when the connection pool is reopened HOT 5
- Dont work atomic() in fastapi task HOT 2
- Refactor Database fetch_results method HOT 1
- Some database classes should be made deprecated
- Handle with AioPool HOT 2
- Make peewee_asyncext deprecated
- Split code by files HOT 4
- Add typing HOT 1
- Add psycopg3 pool HOT 1
- Renaming
- Add aio_peek method
- transaction refactoring
- Move manager methods to database HOT 2
- Additional loop support HOT 3
- Fix security issues
- Add RawQuery support for AioModel HOT 1
- Add compound query support for AioModel HOT 2
- Connection is not realeased if error is in BEGIN statement
- Found a get_or_none bug HOT 7
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from peewee-async.