Giter VIP home page Giter VIP logo

aiocassandra's People

Contributors

asvetlov avatar fediralifirenko avatar hellysmile avatar litwisha avatar pcinkh avatar pyup-bot avatar webknjaz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aiocassandra's Issues

aync write read - more than 20000 read not supported

Hi ,

my sample code objective is asyn write and read (aiocassandra). Both the cases it could be millions. But failing to do so. Especial if i go for selection and looping back large set of data say 100000.

testCRUDasync.txt

Also it gives me -
1.
cqlsh> select * from test.dummytbl;

u'key' | u'col1' | u'col2'
--------+---------+---------

(0 rows)
Failed to format value u'key' : 'unicode' object has no attribute 'formatter'
Failed to format value u'col1' : 'unicode' object has no attribute 'formatter'
1 more decoding errors suppressed.

Current time - End - session : 1551876594703
2019-03-06 18:19:55,200 [DEBUG] aiocassandra: Paginator is closed, cleared in-memory 0 records

[root@vm-9 cassandra_poc]# cqlsh
Connection error: ('Unable to connect to any servers', {'127.0.0.1': error(111, "Tried connecting to [('127.0.0.1', 9042)]. Last error: Connection refused")})

your help is really appreciated.

I have attached the file here.

Regards,
Dwarika

uvloop is not supported?

i use uvloop instead of asyncio.get_event_loop,and aiocassandra can not work.
The error is "attached to a different loop"

Insert queries

Could you give some examples of insert queries ?

Something like this

    executable_query = cassandra.prepare("""INSERT INTO units (date, marks, user_id) VALUES (%s, %s, %s)""", (date,marks,user_id,))
    await cassandra.execute_future(executable_query)

or this:

    executable_query = cassandra.prepare("""INSERT INTO units (date, marks, user_id) VALUES (%s, %s, %s)""")
    await cassandra.execute_future(executable_query, (date,marks,user_id,))

fails with the error:

cassandra.protocol.SyntaxException: <Error from server: code=2000 [Syntax error in CQL query] message="line 1:49 no viable alternative at character '%'">

Paginator __aexit__ coroutine not correctly setting finish event

Hello,

there is a problem with this line:

self._exit_event.set()

The __aexit__ coroutine is not correctly setting the self._finish_event. There are problems with the way aiocassandra uses threading, to correct this, we have to call self._loop.call_soon_threadsafe(self._finish_event.set).

This leads to problems when a task working through results of the pagination is suddenly cancelled.

Would it be possible to fix this, or should I make a custom fork for my use?

concurrent.futures._base.CancelledError when high throughput in aiohttp web application

Hi there,

I am new in asyncio coding. Recently, I wrote a web application with aiohttp, during a request, I need do some db reading from and writing to cassandra. I use the aiocassandra as the driver, in the test environment, all stuff is ok. But in production environment, when the throughput(at the peek around 2k-3k QPS) is going high, I got these errors very frequently(50-80 per 15 minutes)

{'status': 'failed', 'content': None, 'errorCode': 'Internal Server Error', 'errorMessage': 'Traceback (most recent call last):\n  File "/app_libs/common/web/middlewares.py", line 74, in error_middleware\n    response = await handler(request)\n  File "/app_libs/common/web/middlewares.py", line 26, in request_trace_middleware\n    response = await handler(request)\n  File "/app_libs/common/web/middlewares.py", line 59, in track_time_middleware\n    response = await handler(request)\n  File "/app/routes/collecting.py", line 40, in webcollect\n    matchresult = await integration_data(request.app, request_data, config_data, cass_flag=True, kafka_flag=True)\n  File "/app/services/integration.py", line 34, in integration_data\n    result = await integration_event(app, request_data, configs, **kwargs)\n  File "/app/services/integration.py", line 62, in integration_event\n    user_data.online_data[\'page\'], app[\'cassandra\'], profile_id, user_data.device_id)\n  File "/app/services/integration.py", line 282, in check_pt_id\n    \'event\', \'device_id\', device_id, profile_id)\n  File "/app/libs/core.py", line 179, in cassandra_read\n    result.extend(await query(table, pk_str, pk_value))\n  File "/app/libs/core.py", line 168, in query\n    res = await session.execute_future(cql, (pk_v, ))\n  File "/usr/local/lib/python3.7/site-packages/aiocassandra.py", line 153, in execute_future\n    return await asyncio_fut\nconcurrent.futures._base.CancelledError\n'}
... execute_future\n    return await asyncio_fut\nconcurrent.futures._base.CancelledError\n'...

At first, I do those cassandra reading work after the request is finished, make those processes under an asynio task. Then when I got those errors, I wonder if that is caused by aiohttp framewok, as when a request is done, aiohttp may cancel tasks binded a request. Then I invoked aiojobs to make sure all those tasks would be waited even the response is returned. BUT the problem is still there, I try to put the cassandra operations into the request lifelong, take them out from the post-request background asyncio tasks, BUT, still, not solved the prolem. I search a lot on the internet, but not find anything really helpful.

I notice that this driver is wrapped with the datastax version with async query. I am not sure if I am using this driver correctlly or indeed there an issue exists. I dont know what to do with this, changing the thread/process executor or reuse the datastax non-aio verion back? Any advice from you guys?

Thank you very much. (By the way, I also use aiokafka in the same env but has not occured the same errors, and again, when the thoughput is at low level, all stuff is working.)

++++
python 3.7.4
aiohttp==3.6.0
aiocassandra==2.0.1
cassandra-driver==3.18.0

How to use Aiocassandra with Sanic using multiple workers?

I would like to use Sanic, an Async Python web server (https://github.com/channelcat/sanic), with Aiocassandra. But I cannot figure out how to successfully deploy it with multiple workers.

Here is a part of my code:

cluster = Cluster()
cassandra_session = cluster.connect()

# http://sanic.readthedocs.io/en/latest/sanic/middleware.html?highlight=listener#listeners
@app.listener('before_server_start')
def init_cassandra_session(app, loop):
    aiosession(cassandra_session, loop=loop)

@app.route('/')
def my_route(request):
    # Cassandra query and return result

if __name__ == "__main__":
  app.run(host="0.0.0.0", port=8000, debug=True, workers=4)

When I set more than one workers, the query seems got stuck and won't return any results.
But when I change the number of workers to 1, the server works fine.

Is there anything I did wrong here?

Why a threadpool?

I'm considering this library for a project. Thank you for creating and maintaining it.

Is there a reason why it uses a threadpool? The Cassandra driver is already async and has a callback interface. Wouldn't it be more performant to create an interface between the Cassandra driver and asyncio futures?

I could probably write this myself, as it should only be a handful of lines of code. But your pagination context manager is something I'd love to take advantage of.

Paginator fetches pages no matter how many rows already in deque

After calling __aenter__, paginator starts fetching pages into memory, no matter how quickly rows are consumed from it. So, if consumer iterates over rows much slower than paginator fetches them, then rows deque starts to growing and causes a large memory consumption.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.