talkiq / gcloud-aio Goto Github PK
View Code? Open in Web Editor NEW(Asyncio OR Threadsafe) Google Cloud Client Library for Python
Home Page: https://talkiq.github.io/gcloud-aio
(Asyncio OR Threadsafe) Google Cloud Client Library for Python
Home Page: https://talkiq.github.io/gcloud-aio
Hey,
I'm creating a image/bucket viewer and need to make ~100 generate_signed_url calls per page load. Its slow, so i thought I'll just do it async but your examples just show 1 file (the advantage of using async for 1 file escapes me?).
Isn't it just as easy to use normal async/await python3 constructs? What does this library add is what i'm getting at?
_raise_for_status
added in #126 and updated in #167 results in exceptions being logged for all results with a code >= 400. This probably isn't how all calls should be treated. For example, blob_exists()
in storage calls get_blob()
with an exception handler to to check for 404's and 410's. My logs are now littered with exceptions. I can't find any other affected methods with some quick grep foo but I don't know the code base all that well there could be other places.
Are aiohttp sessions closed when created by module itself? If in fact the sessions are not closed we could perhaps add something in line with
def __del__(self):
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(self._close_session())
async def _close_session(self):
await self.session.close()
I am unsure if this due to my inexperience with asyncio style code but in my current code, I end up calling Storage
several times in quick succession then fetching metadata/blobs after and sometimes I get this error:
Backing off acquire_access_token(...) for 0.5s (concurrent.futures._base.TimeoutError)
which refers to:
gcloud-aio/auth/gcloud/aio/auth/token.py
Line 204 in d090df4
In another case, I was trying to async gather a function that ended up calling a client per item in a list:
await asyncio.gather(*[check_file(bucket, file) for file in filelist])
Basically where check_file
was a function given a bucket and a blob_name would check its hash in the metadata and return.
I was getting this same error.
Unfortunately I don't have a simple code example right now but I can try to come up with one if it helps?
Locally I worked around it by creating fewer clients and passing them around which avoids this error.
The point is, gcloud storage api is fairly unstable and is known for throwing random 503 / 500 / etc errors from time to time. Right now I have to implement retries in my app. The case is pretty similar to the one described in Issue#137
Same applies to gcloud.aio.auth metadata calls, actually
If you'll decide that this is a good thing to do, I'll prepare PR and tag you, and so on.
Can you please share a working example for cloud storage?
I'm got an error when I ran the smoke test code.
aiohttp.client_exceptions.ClientResponseError: 404, message='Not Found'
Need to pin max compatible version to < 3.5.0
and/or migrate from read_timeout
and conn_timeout
to timeout
.
the code on giithub and on pypi is out of sync (e.g. storage.py _preprocess_data). Is that on purpose?
Exception in thread Thread-OnRpcTerminated:
Traceback (most recent call last):
File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib64/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File ".../google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 491, in close
callback(self, reason)
File ".../google/cloud/pubsub_v1/subscriber/futures.py", line 39, in _on_close_callback
self.set_exception(result)
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 531, in set_exception
for waiter in self._waiters:
AttributeError: 'Future' object has no attribute '_waiters'
AFAIK, it doesn't seem possible to retrieve the mutations results when using insert
... or operate
or commit
.
It looks like there are some return statements missing here and there:
Let me know if I can help with a PR
Currently, list_blobs()
returns a maximum of 1000 blobs from a bucket.
Desired behavior: that list_blobs()
iterate through subsequent pages of blobs to return all of the blobs in a bucket, even when there are more than 1000.
Maybe there is some way to paginate when I call list_blobs()
that I am missing?
Here is the code that I am running to produce this issue:
import settings
import asyncio
from gcloud.aio.storage import Storage
from aiohttp import ClientSession as Session
async def test():
async with Session() as session:
storage = Storage(session=session)
bucket = storage.get_bucket(settings.bucket_name)
blobs = await bucket.list_blobs()
return len(blobs) # 1000
asyncio.run(test())
Looks like the publisher side of pub sub is not implemented, is there plans for this?
I have forked your project. I have a venv in the storage folder where I have pip install -r requirements.txt
. However, running a new integration test yields:
==================================================================== test session starts =====================================================================
platform linux -- Python 3.7.0, pytest-4.0.0, py-1.7.0, pluggy-0.8.0
rootdir: /home/jens/Desktop/gcloud-aio/storage, inifile:
plugins: asyncio-0.9.0
collected 0 items / 1 errors
=========================================================================== ERRORS ===========================================================================
___________________________________________________ ERROR collecting tests/integration/upload_resumable.py ___________________________________________________
tests/integration/upload_resumable.py:8: in <module>
from gcloud.aio.storage import Storage
gcloud/aio/storage/__init__.py:2: in <module>
__version__ = get_distribution('gcloud-aio-storage').version
venv/lib/python3.7/site-packages/pkg_resources/__init__.py:472: in get_distribution
dist = get_provider(dist)
venv/lib/python3.7/site-packages/pkg_resources/__init__.py:344: in get_provider
return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0]
venv/lib/python3.7/site-packages/pkg_resources/__init__.py:892: in require
needed = self.resolve(parse_requirements(requirements))
venv/lib/python3.7/site-packages/pkg_resources/__init__.py:778: in resolve
raise DistributionNotFound(req, requirers)
E pkg_resources.DistributionNotFound: The 'gcloud-aio-storage' distribution was not found and is required by the application
====================================================================== warnings summary ======================================================================
venv/lib/python3.7/site-packages/aiohttp/multipart.py:8
/home/jens/Desktop/gcloud-aio/storage/venv/lib/python3.7/site-packages/aiohttp/multipart.py:8: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
from collections import Mapping, Sequence, deque
/home/jens/Desktop/gcloud-aio/storage/venv/lib/python3.7/site-packages/aiohttp/multipart.py:8: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
from collections import Mapping, Sequence, deque
-- Docs: https://docs.pytest.org/en/latest/warnings.html
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 1 errors during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
============================================================ 2 warnings, 1 error in 0.31 seconds =======```
Even if I copy the smoke test into upload_resumable.py I get this error. Do you know why and can you help?
When I make a request that I know fails, I don't receive the promised insertErrors
key in the response.
In iPython:
In [1]: from gcloud.aio.bigquery import Table
In [2]: existing_table = Table(dataset_name="event", table_name="unknown_source", project="a-project")
In [3]: response = await existing_table.insert(rows=[{"this will": "fail"}])
In [4]: response
Out[4]: {'kind': 'bigquery#tableDataInsertAllResponse'}
With gcloud-aio-auth==3.2.1 (#167), the following code hangs for me within my async environment (I use fastapi):
from gcloud.aio.storage import Storage
async with Storage(service_file=API_KEY) as storage:
bucket = storage.get_bucket(bucket_name)
blob = await bucket.get_blob(afile)
binary_data = await blob.download()
(With api_key
, bucket_name
, afile
as variables I have defined.
The part that hangs is await blob.download()
.
If I kill the process I found body = await resp.text(errors='replace')
as the last line in my traceback. So I reverted to gcloud-aio-auth==3.2.0 and my code runs again.
I moved body = await resp.text(errors='replace')
into the except aiohttp.ClientResponseError:
block, my tests do not hang again.
This code works if I run it in a python terminal.
I am unsure how to write a reproducible test for this as I am unsure what the cause is. If it is the way I have written my async code or if this is a real bug.
Any help or advice would be really good!
Related to PR #166 and issue #126
After this update library works incorrect with aiohttp 3.6.*
When you call raise_for_status method from original library, response will be automatically released in case of "bad" status code. This means that you can't read body after this call. You'll have aiohttp.ClientConnectionError
from here
Code to reproduce:
import asyncio
import aiohttp
# This function was just copied from your library
async def _raise_for_status(resp):
try:
resp.raise_for_status()
except aiohttp.ClientResponseError:
await resp.text()
raise
async def main():
async with aiohttp.ClientSession() as session:
resp = await session.delete('http://example.com/non-existing-url')
try:
await _raise_for_status(resp)
except aiohttp.ClientResponseError:
print('Expected behaviour')
except aiohttp.ClientConnectionError:
print('Unexpected behaviour')
asyncio.run(main())
I can make a PR (if needed) but you should choose approach
As for me the only way to retrieve body from response in this case - to read the body from response before call resp.raise_for_status()
. But this is a bad approach to read the body inside library only for additional logging, IMHO.
In any case current functionality doesn't works properly.
Having installed gcloud-aio-auth
(version 2.0.1
) and using it, mypy does not find the module, logging Cannot find module named 'gcloud.aio.auth'
.
This is due to the missing __init__.py
in gcloud
and gcloud/aio
(gcloud/aio/auth/__init__.py
exists). Adding them to the package fixes this. Probably the files are lost in the setup.py
/twine
process.
I have an entity with a datetime property like this:
"createdDate":"2016-06-09T14:02:36.059360Z"
When I try to load this entity, I get this error:
File "/Users/scotts/.pyenv/versions/python38/lib/python3.8/site-packages/gcloud/aio/datastore/entity.py", line 15, in __init__
self.properties = {k: self.value_kind.from_repr(v).value
File "/Users/scotts/.pyenv/versions/python38/lib/python3.8/site-packages/gcloud/aio/datastore/entity.py", line 15, in <dictcomp>
self.properties = {k: self.value_kind.from_repr(v).value
File "/Users/scotts/.pyenv/versions/python38/lib/python3.8/site-packages/gcloud/aio/datastore/value.py", line 38, in from_repr
value = datetime.strptime(data[json_key],
File "/Users/scotts/.pyenv/versions/3.8.0/lib/python3.8/_strptime.py", line 568, in _strptime_datetime
tt, fraction, gmtoff_fraction = _strptime(data_string, format)
File "/Users/scotts/.pyenv/versions/3.8.0/lib/python3.8/_strptime.py", line 349, in _strptime
raise ValueError("time data %r does not match format %r" %
ValueError: time data '2016-06-09T14:02:36.059360Z' does not match format '%Y-%m-%dT%H:%M:%S.%f000Z'
In my case the JSON credentials are not in a file, they are in memory. It would be ideal if the token object also accepted credentials directly. Right now I have to write them to a temporary file.
auth.py:168
if user does not provide a session a session is created here, but as far as I can see the session is never closed? I think this cause errors in my code.
My use case is instantiate a storage object without session and then use download later on passing a session. I can fix this by passing the constructor a session but in any case :-)
Is there an example on how to combine 2 libraries e.g. PubSub subscription consumed to a GCS file (or multiple, in batch) ?
On the latter (GCS), is there some setting for the chunk size to use ? We use gcsfs
today but it's not threadsafe.
Memory efficient approach. Should not need to load large files into memory. Inspiration here: https://github.com/GoogleCloudPlatform/google-resumable-media-python/blob/master/google/resumable_media/_upload.py
Also, consider to support uploading in chunks for very large files (> 100 mb?)
As of today, looks like the v2beta2 Cloud Tasks API has been disabled (and with it, all support for pull task queues). We should nuke our code accordingly!
See talkiq/gcloud-rest#37
d6d3b0f introduced using the ssl
keyword to aiohttp.TCPConnector
.
The requirements here currently state aiohttp
must be at least 2.0.0 but this ssl keyword is not available until aio-libs/aiohttp#2628 which is listed under milestone 3.0. I haven't checked but it looks like this ssl
keyword is added then in version 3 and as such the requirement here should be at least 3.0
I was trying to run asyncio storage client in google cloud kubernetes cluster with default auth without service file. Is there any way to authenticate storage client without service file?
The actual request should be made asynchronous, either by porting to using the REST API with aiohttp rather than the upstream google-cloud-python
library or through some other method.
Note we are currently calling this method synchronously.
Anywhere we assign self.session = aiohttp.ClientSession()
, we should probably close the session on cleanup with something like def __del__(self): self.session.close()
According to google documentation the "resumable upload" must be used for files larger than 5 mb. Suggest to ether:
upload_resumable
method to storage.pyupload
which determines which upload type to use. In case the file size is larger than 5mb automatically use upload_resumableHello,
I'm trying to write to a bucket, I ran gcloud auth application-default login
and granted access and everything. The default credentials also work because I am able to write to the Bucket with this https://cloud.google.com/storage/docs/reference/libraries#using_the_client_library
When i'm using the example from gcloud-aio however
import aiohttp
from gcloud.aio.storage import Storage
async with aiohttp.ClientSession() as session:
client = Storage(session=session)
status = await client.upload(bucket='my-bucket',
object_name='test',
content_type='application/xml',
file_data=body) # body is just some example xml string
print(status)
all im getting is
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host metadata.google.internal:80 ssl:default [getaddrinfo failed]
Can somebody help/point me into the right direction?
gzipping files before upload could lead to a significant performance boost. see:
https://cloud.google.com/storage/docs/json_api/v1/how-tos/performance
Content-Encoding: gzip
must set in headers
when use gzip = True
-> zip data before upload (https://docs.python.org/3/library/gzip.html or use built-in gzip of aiohttp?)
Not sure how this would work with streams?
when something is wrong with our datastore query, gcloud only returns 400 with no meaningful logs. when we try to execute same query on https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects/runQuery try window it tells me what is wrong with the query. it is not a bug but would be better if we have more information in the response we get.
When v4
of aiohttp
gets released, it will drop support for read_timeout
and conn_timeout
in favor of timeout
. timeout
was introduced in (as far as I can tell from git history) v3.3
and never backported to 2.x
.
We currently support aiohttp >= 2.0.0, < 4.0.0
. If we want to support 4.x
, we'll need to update our range to aiohttp >= 3.3.0, < 5.0.0
. When should we make this switch? aiohttp v3.3
was released on June 1st, 2018. Should we be including a deprecation period of some sort?
Another option, of course, is to support both with work on our side to handle differing library versions. That's the most compatible, but we'll want to be careful about doing this too frequently lest our code become an un-maintainble mess of library compatibility.
If you try to use .download or .download_as_string with binary file, the request will fail because the download method calls resp.text() which tries to decode the raw data.
gcloud-aio/storage/gcloud/aio/storage/storage.py
Lines 66 to 71 in 15001d7
TODO:
comment in 15001d7.i've been using gcloud-aio-storage in a cloud function to handle parallel uploads with great success, but recently tried deploying and found the cloud function wouldn't deploy with gcloud-aio-storage as a dependency in requirements.txt.
you can verify this by attempting to deploy a hello world example with gcloud-aio-storage in the requirements file.
in my project i've vendorized gcloud-aio-storage and gcloud-aio-auth and hardcoded the versions in both of their init.py files
gcloud-aio/taskqueue/gcloud/aio/taskqueue/taskmanager.py
Lines 27 to 32 in 1192dcf
TODO:
comment in 1192dcf.According to the datastore docs:
blobValue | string (bytes format)
A blob value. May have at most 1,000,000 bytes. When excludeFromIndexes is false, may have at most 1500 bytes. In JSON requests, must be base64-encoded.A base64-encoded string.
The Value.from_repr
is naively attempting to cast the base64 value to bytes
without decoding it first, which results in an error.
When a wrong path to a credentials file is given no error is thrown in auth.py:45-51. I argue that in the case where the user has provided a specific path to the credentials file and the path is not correct we shoudl throw an error instead of silently "hope" that teh right credentials are stored in an env var :-) I can do the fix If you like i.e. catch and raise a `FileNotFoundError?
The Blob
constructor sets blob.bucket = storage.Bucket(...)
, but this value is overridden on a call to Blob.upload_from_string
, which will set this value to the metadata returned by the upload command, eg. blob.bucket = storage.Bucket(...).name
gcloud-aio/datastore/tests/integration/smoke_test.py
Lines 9 to 14 in e2ab3ac
TODO:
comment in e2ab3ac in #8.gcloud-aio-storage
v5.1.0 (latest). I'm calling Storage.upload()
with a file object, and after a while it fails with the following traceback:
~/.local/share/virtualenvs/backend/lib/python3.8/site-packages/gcloud/aio/storage/storage.py in upload(self, bucket, object_name, file_data, content_type, parameters, headers, metadata, session, timeout, force_resumable_upload)
207 if metadata:
208 log.warning('metadata will be ignored for upload_type=Simple')
--> 209 return await self._upload_simple(url, object_name, stream,
210 parameters, headers,
211 session=session, timeout=timeout)
~/.local/share/virtualenvs/backend/lib/python3.8/site-packages/gcloud/aio/storage/storage.py in _upload_simple(self, url, object_name, stream, params, headers, session, timeout)
306
307 s = AioSession(session) if session else self.session
--> 308 resp = await s.post(url, data=stream, headers=headers, params=params,
309 timeout=timeout)
310 data: dict = await resp.json()
~/.local/share/virtualenvs/backend/lib/python3.8/site-packages/gcloud/aio/auth/session.py in post(self, url, headers, data, timeout, params)
63 params: Dict[str, str] = None
64 ) -> aiohttp.ClientResponse:
---> 65 resp = await self.session.post(url, data=data, headers=headers,
66 timeout=timeout, params=params)
67 resp.raise_for_status()
~/.local/share/virtualenvs/backend/lib/python3.8/site-packages/aiohttp/client.py in _request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx)
502 resp = await req.send(conn)
503 try:
--> 504 await resp.start(conn)
505 except BaseException:
506 resp.close()
~/.local/share/virtualenvs/backend/lib/python3.8/site-packages/aiohttp/client_reqrep.py in start(self, connection)
858 if self._continue is not None:
859 set_result(self._continue, True)
--> 860 self._continue = None
861
862 # payload eof handler
~/.local/share/virtualenvs/backend/lib/python3.8/site-packages/aiohttp/helpers.py in __exit__(self, exc_type, exc_val, exc_tb)
594
595 if exc_type is asyncio.CancelledError and self._cancelled:
--> 596 raise asyncio.TimeoutError from None
597 return None
598
TimeoutError:
The service account exists and has the permissions required.
We should default to loading creds from $GOOGLE_APPLICATION_CREDENTIALS
if they are not explicitly passed in.
We could also parse the project name out of those creds, by default.
ie. blob.download_to_file(filename)
and blob.upload_from_file(filename)
These should check for None instead:
gcloud-aio/datastore/gcloud/aio/datastore/query.py
Lines 87 to 90 in abe2ea9
Is there anybody looking into the Vision API with this?
It's the last type missing.
gcloud-aio/datastore/gcloud/aio/datastore/constants.py
Lines 74 to 84 in ed062be
We need to implement the ArrayValue type - also see #88 for a similar task
eg. nox.py
-> noxfile.py
, nox-automation
-> nox
, etc.
If this ticket gets grabbed by a TalkIQ employee, you can see us doing this for a non-OSS repo in this PR
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.