revolution1 / etcd3-py Goto Github PK
View Code? Open in Web Editor NEWPure python client for etcd v3 (Using gRPC-JSON-Gateway)
License: Other
Pure python client for etcd v3 (Using gRPC-JSON-Gateway)
License: Other
from etcd3 import Client
etcd_cert="/etcd/ansible_etcd_certs/etcd-client-ca.pem"
etcd_cert_key="/etcd/ansible_etcd_certs/etcd-client-ca-key.pem"
etcd_ca_cert="/etcd/ansible_etcd_certs/etcd-root-ca.pem"
client = Client(host='172.16.27.4',port=2379,protocol='https',cert=(etcd_cert, etcd_cert_key), verify=etcd_ca_cert, server_version='3.4.7', cluster_version='3.4.0')
print(client.version())
print(client.put('foo','bar'))
error
EtcdVersion(etcdserver='3.4.7', etcdcluster='3.4.0')
Traceback (most recent call last):
File "test_etcd.py", line 9, in <module>
print(client.put('foo','bar'))
File "/data/apps/opt/etcd3-client/lib64/python3.6/site-packages/etcd3/apis/kv.py", line 106, in put
return self.call_rpc(method, data=data)
File "/data/apps/opt/etcd3-client/lib64/python3.6/site-packages/etcd3/client.py", line 202, in call_rpc
self._raise_for_status(resp)
File "/data/apps/opt/etcd3-client/lib64/python3.6/site-packages/etcd3/client.py", line 150, in _raise_for_status
raise get_client_error(error, code, status, resp)
etcd3.errors.go_etcd_rpctypes_error.ErrUnknownError: <ErrUnknownError error:'all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: authentication handshake failed: remote error: tls: bad certificate"', code:14>
Authentication seems to be not working
>>> from etcd3 import Client
>>> client = Client(host='my_etcd_host',protocol='https', verify='/etc/ssl/certs/ca-certificates.crt', username='myuser', password='mypassword')
>>> client.version()
EtcdVersion(etcdserver='3.3.10', etcdcluster='3.3.0')
>>> client.put('foo', 'bar')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/jsfrerot/code/python_envs/etcd_client/lib/python3.6/site-packages/etcd3/apis/kv.py", line 106, in put
return self.call_rpc(method, data=data)
File "/home/jsfrerot/code/python_envs/etcd_client/lib/python3.6/site-packages/etcd3/client.py", line 199, in call_rpc
self._raise_for_status(resp)
File "/home/jsfrerot/code/python_envs/etcd_client/lib/python3.6/site-packages/etcd3/client.py", line 148, in _raise_for_status
raise get_client_error(error, code, status, resp)
etcd3.errors.go_etcd_rpctypes_error.ErrUserEmpty: <ErrUserEmpty error:'etcdserver: user name is empty', code:3>
ensure enough time waitted
First of all thank your solved my previous problem. But I seems meet new exception then.
As follows:
from etcd3 import AioClient
client = AioClient('127.0.0.1', 2379)
async def AsyncFunc():
await client.range("foo")
loop = asyncio.get_event_loop()
loop.run_until_complete(AsyncFunc())
===========
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1089fa358>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x108956d48>, 349800.935111066)]']
connector: <aiohttp.connector.TCPConnector object at 0x1089fa320>
I've got a loop that does a Transaction that does Test-and-set. There's a lot of different things in the 'If' section, but only one of them is likely to change between loops (failure of the others is cause to exit the loop).
success = False
while not success:
cur_foo = self.client.range(foo).kvs[0].value
t = Txn(self.cilent)
t.compare(t.key(foo_bar).mod == 0) # foo_bar doesn't exist
t.compare(...) # other conditions
t.compare(t.key(foo) == cur_foo) # make sure we're in the state we think we are
t.success(t.put(foo, cur_foo + bar))
t.fail(...) # consistent fail action
t.commit()
success = t.succeeded
It'd be nice if there was a way to extract the common bits outside the loop, to make it run faster
base = Txn(self.client)
base.compare(base.key(foo_bar).mod == 0) # foo_bar doesn't exist
base.compare( .. ) # other conditions
base.fail(...) # consistent fail action
success = False
while not success:
t = base.clone()
t.compare(t.key(foo) == cur_foo) # make sure we're in the state we think we are
t.success(t.put(foo, cur_foo + bar))
t.commit()
success = t.succeeded
Hi 👊
This is my first visit to this fine repo, but it seems you have been working hard to keep all dependencies updated so far.
Once you have closed this issue, I'll create separate pull requests for every update as soon as I find one.
That's it for now!
Happy merging! 🤖
pypi requires the packages to have unique names. In the case of this project once installed the package name is etcd3 instead of etcd3-py. This makes it incompatible if a user needs to install another package with the same name. Unfortunately pypi has plenty of other projects that install "etcd3" folders. One for all is python-etcd3 that implements a GRPC client. It is quite possible that a user needs both of these APIs to give options on which protocol to use.
I strongly advise to rename the main project folder and subsequent import name
Thanks!
I put a str
into etcd and got out bytes
k, v = "foo", "bar"
...
client.put(k, v)
...
r = client.range(k).kvs[0].key
print(f"{r:<10}")
gets me
TypeError: unsupported format string passed to bytes.__format__
I expected to be able to round-trip data through etcd without issue. As it is now, I have to explicitly cast everything back to str()
if I want to use it in a format string like above.
I'd like to see the global revision of a response to my Watch of a prefix.
Reading stateful/watch.py'
shows that while the header is received, and the revision is even used for tracking, it's not attached to Event
s that are dispatched.
Just adding a .header
to the Event
object and getting it set to the WatchResponse header would be great.
Alternately, if you wish to simplify things, an API closer to etcd's, that dispatches multiple events at a time (well, really passes them through directly from etcd) such that watching callbacks directly get a WatchResponse much like calls to .range()
get a RangeResponse would also solve my issue.
I cannot see any retries for HTTP calls.
It would be great to have retries somewhere here:
https://github.com/Revolution1/etcd3-py/blob/master/etcd3/client.py#L147
0.1.6
3.6.9
Ubuntu 18.04
I am using an API with aiohttp
. For every request received, an AioClient
is created by an aiohttp middleware. The client is closed after the request has been handled.
If too many requests are sent to the API, the memory footprint of the API process increases continuously, until my machine breaks and resets.
Here is a minimal example. It connects to an etcd database running locally, with ~20 elements present at the prefix "/"
.
import asyncio
from etcd3 import AioClient
async def read_db():
while True:
client = AioClient()
try:
resp = await client.range("/")
finally:
await client.close()
async def all_run(concurrent=10):
"""Run many reads concurrently
"""
await asyncio.gather(
*(read_db() for i in range(concurrent)),
return_exceptions=False,
)
def main():
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(all_run())
except asyncio.CancelledError:
pass
finally:
loop.close()
main()
This script, when running, uses more than 1 Go of memory after only 5 minutes.
I narrowed down the issue to the caches of SwaggerNode
and SwaggerSpec
.
By changing the function read_db
in the above example like the following:
import asyncio
from etcd3 import AioClient
from etcd3.swagger_helper import SwaggerSpec, SwaggerNode
counter = 0
async def read_db():
global counter
while True:
counter += 1
client = AioClient()
try:
resp = await client.range("/")
finally:
await client.close()
if counter % 20 == 0:
# Empty the different caches every 20 reads
SwaggerNode._node_cache = {}
SwaggerNode.__getattr__.__wrapped__.cache = {}
SwaggerSpec._ref.__wrapped__.cache = {}
SwaggerSpec.getPath.__wrapped__.cache = {}
SwaggerSpec.getSchema.__wrapped__.cache = {}
my memory footprint is kept at 120 Mo even after 20 minutes.
if:
watcher failed -> etcd revision compacted -> watcher retry
the watcher will block forever!!!!!
How would you say etcd3-py compares against etcd3-gateway? Could you work together instead of apparently duplicating effort?
because of the api change etcd-io/etcd#9424
I was trying to get a watch working in my app. It's currently complex, but if you have no idea why this is happening, I'll work on getting a stripped-down version working.
If it matters, I'm using an async client.
Exception in thread Thread-1:
Traceback (most recent call last):
File "/tmp/xgw-venv/lib/python3.7/site-packages/etcd3/stateful/watch.py", line 283, in run
for event in self:
File "/tmp/xgw-venv/lib/python3.7/site-packages/etcd3/stateful/watch.py", line 356, in __iter__
if not self._resp or self._resp.raw.closed:
AttributeError: 'ModelizedStreamResponse' object has no attribute 'raw'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/xgw-venv/lib/python3.7/site-packages/etcd3/stateful/watch.py", line 284, in run
self.dispatch_event(event)
File "/tmp/xgw-venv/lib/python3.7/site-packages/etcd3/stateful/watch.py", line 338, in __exit__
self.stop()
File "/tmp/xgw-venv/lib/python3.7/site-packages/etcd3/stateful/watch.py", line 295, in stop
self._kill_response_stream()
File "/tmp/xgw-venv/lib/python3.7/site-packages/etcd3/stateful/watch.py", line 260, in _kill_response_stream
if not self._resp or (self._resp and self._resp.raw.closed):
AttributeError: 'ModelizedStreamResponse' object has no attribute 'raw'```
Maybe this is a Python thing I can work out, but I'm trying to have a watcher that'll self cancel within an event callback. Like watch once, or watch until some condition is satisfied, then cancel itself.
I've designed a kind of request/approval protocol on top of etcd and I'd like the watch to end when my approval watch fires. Any suggestions?
Excellent, stable library BTW. Thanks!
def watch_apr(ev):
k = ev.key
v = ev.value
# cancel here somehow
apr_watcher.onEvent(EventType.PUT, watch_apr)
apr_watcher.runDaemon()
while apr_watcher <> None:
time.sleep(1)
Currently if count_only
is set and handed to client.range()
and no keys are found, the result.count
is None
.
This leads to a bunch of extra checking that looks like:
found = cilent.range(key, count_only=True).count
if found is None:
found = 0
if found > 3:
...
I expected that a count
should always be an int
. Having to put in a bunch of 'is this nullNone?` checks is unbeautiful to say the least. Could you put the one check in the right place (at deserialization/result-object-creation time)?
the AioClient has an exceptiion with call range()
function
As follows:
import asyncio
from etcd3 import AioClient
client = AioClient()
async def AsyncFunc():
r = await client.range('foo')
print('key:', r.kvs[0].key, 'value:', r.kvs[0].value)
loop = asyncio.get_event_loop()
loop.run_until_complete(AsyncFunc())
==============================
Traceback (most recent call last):
File "/xxxxxx/srv/etcddemo.py", line 4, in <module>
Aclient = AioClient()
File "/Users/pyvers/py3.6.8_health_env/lib/python3.6/site-packages/etcd3_py-0.1.6-py3.6.egg/etcd3/aio_client.py", line 177, in __init__
connector = aiohttp.TCPConnector(limit=pool_size, ssl=self.ssl_context)
File "/Users/pyvers/py3.6.8_health_env/lib/python3.6/site-packages/aiohttp-4.0.0a1-py3.6-macosx-10.13-x86_64.egg/aiohttp/connector.py", line 711, in __init__
enable_cleanup_closed=enable_cleanup_closed)
File "/Users/pyvers/py3.6.8_health_env/lib/python3.6/site-packages/aiohttp-4.0.0a1-py3.6-macosx-10.13-x86_64.egg/aiohttp/connector.py", line 207, in __init__
loop = get_running_loop()
File "/Users/pyvers/py3.6.8_health_env/lib/python3.6/site-packages/aiohttp-4.0.0a1-py3.6-macosx-10.13-x86_64.egg/aiohttp/helpers.py", line 276, in get_running_loop
raise RuntimeError("The object should be created from async function")
RuntimeError: The object should be created from async function
pip install --upgrade pip # gets you to pip > 20.1
pip install etcd3-py
...
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-install-2ys2e_8z/etcd3-py/setup.py", line 36, in <module>
requirements = [str(ir.req) for ir in install_reqs]
File "/tmp/pip-install-2ys2e_8z/etcd3-py/setup.py", line 36, in <listcomp>
requirements = [str(ir.req) for ir in install_reqs]
AttributeError: 'ParsedRequirement' object has no attribute 'req'
pip changed their internals; see pypa/pip#8188 (comment)
Any chance of a release soon? The current version on pypi is 10 months old.
maybe we need some heartbeat or timeout mechanism to prevent the stream hangs forever when there is some net work problem
when lock job is done, the release action in lease.py will cause error, becuase Lock
is not reentrant lock.
In [4]: with etcd_client.Lock('test_this_is_key', lock_ttl=30) as lock:
print lock.is_acquired
time.sleep(15)
print 'job done'
...:
True
job done
In [5]: Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/usr/local/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/data/apps/demo/eggs/etcd3_py-0.1.6-py2.7.egg/etcd3/stateful/lease.py", line 163, in keepalived
log.exception("cancel_cb() raised an error")
error: release unlocked lock
etcd3-py/etcd3/stateful/lease.py
Lines 124 to 127 in 6fc1706
Hello,
thanks for the awesome work!
I think the code above is responsible for the keepalive method of a stateful Lease renewing the lease way too fast. It should be breaking out of the loop when keeping is False, instead it breaks out immediately and triggers another reply.
It can be reproduced simply by passing a keep callback to the method and noticing that the callback is called at a rate independent from the granted TTL.
Hi! How to delete values?)
I did a large range request and got what looks like an overflow error. I don't see an obvious way to do pagination or the like; is there one I'm missing, or is/should etcd3-py wrapping those up somehow?
result = await self.db.range(key_prefix, prefix=True, keys_only=True)
File "/usr/local/lib/python3.7/dist-packages/etcd3/aio_client.py", line 32, in __modelize
await self.client._raise_for_status(self._resp)
File "/usr/local/lib/python3.7/dist-packages/etcd3/aio_client.py", line 233, in _raise_for_status
raise get_client_error(error, code, status, resp)
etcd3.errors.go_etcd_rpctypes_error.ErrUnknownError: <ErrUnknownError error:'grpc: received message larger than max (14757329 vs. 4194304)', code:8>
The command documentation is really nice; if all else fails, I can read the code (which is well documented). What's less documented is the return values and the types of the return values... could someone take a swing at that?
etcd3-py version: 0.1.6
Python version: 3.7.4
Operating System: centos 7
I found that using "Watcher() API" single cpu is achieved 100%. I don't know how to optimize. Such as ‘process of pool’?
My code is simple, as follows:
# sit_watcher.py
def doSomething():
.....
def WatchAllAgentService():
client = Client()
watcher = client.Watcher(all=True, progress_notify=True, prev_kv=True)
watcher.onEvent('/health/', doSomething)
watcher.runDaemon()
WatchAllAgentService()
while True:
pass
> ps auwx|grep python
root 31454 95.6 84.2 3711376 3242992 ? R 11月11 20786:29 python sit_watcher.py
Is there a way to do a comparison on the count of keys with a prefix inside a transaction? That is, I want to do something like:
txn.compare( txn.key('/prefix/', count_only=True).value < 10)
To only allow the txn.success() bit to run if there are less than ten items with the prefix /prefix/. I believe the go client can do this using WithPrefix() and CountOnly() but I can't find where/if something similar is available here.
Reading the code, it looks like (thanks to 1. the GIL and 2. Watcher.dispatch_event()
not being async ) it should be safe to call onEvent()
on a running watcher, correct? Is that accident or intent?
If accident, it's definitely a useful function (consider: if watching all events, being able to add filters without having to stop and restart the watcher would be nice) but needs a little more support eg. a way to remove filters/callbacks.
If intent, that's great, but still need a way to remove filters/callbacks.
Either way, are you interested in a PR to make filters/callbacks removable?
Hi. Great looking python module. I'm trying to work out client etcdv3 usage with auth (certs + users) but when I try putting some keys in, I receive an error claiming my username is empty.
I wrote up a bit of automated etcd scripting that creates an initial config, including users, basic key/dir, and sets up server/client auth. I don't know how much you'll want, but I'll start with my Python, and if they usage looks good, might scripts might help reproduce the problem.
from etcd3 import Client
ETCD_CLUSTER_PATH='/_cluster'
ETCD_NODE_PATH=ETCD_CLUSTER_PATH+'/${id}'
ETCD_PATH_SEP='/'
ETCD_KEY_MANIFEST='manifest'
ETCD_KEY_EFFECTIVE='effective'
ETCD_CLIENT_PROTO='https'
ETCD_CLIENT_HOST='127.0.0.1'
ETCD_CLIENT_USER='root'
ETCD_CLIENT_PASS='password'
ETCD_CLIENT_PORT=2379
ETCD_CLIENT_CERTS='/var/test/etcd/certs/'
ETCD_STATE_TTL=60
if __name__ == '__main__':
try:
cli = Client(
host=ETCD_CLIENT_HOST, \
port=ETCD_CLIENT_PORT, \
protocol=ETCD_CLIENT_PROTO, \
username=ETCD_CLIENT_USER, \
password=ETCD_CLIENT_PASS, \
cert=(ETCD_CLIENT_CERTS+'client.pem', ETCD_CLIENT_CERTS+'client-key.pem'), \
verify=ETCD_CLIENT_CERTS+'ca.pem')
cli.version()
cli.put("/test","hi")
except Exception, e:
print >> sys.stderr, 'Error: ' + str(e)
Then I run it:
$ python etcd3_test.py
Error: <ErrUserEmpty error:'etcdserver: user name is empty', code:3>
Are you assigning/passing the username argument through to the gRPC call?
Here's my etcd3_init.sh script:
#!/bin/bash
echo "etcd init script"
if [ "$#" -lt 3 ]; then
echo "usage: `basename "$0"` <cert path> <connect ip> <connect port> <auth (opt)>"
echo "example: ./`basename "$0"` /var/test/etcd/certs 127.0.0.1 2379"
exit
fi
CERTS=$1
IP=$2
PORT=$3
AUTH=0
if [ "$#" -eq 4 ]; then
AUTH=$4
fi
export ETCDCTL_API=3
ETCDCTL_HOSTS="--cacert=$CERTS/ca.pem --endpoints=https://$IP:$PORT"
ROOT_PASS=$(date +%s | sha256sum | base64 | head -c 32 ; echo)
ROOT_PASS="password"
PUB_PASS=$(date +%s | sha256sum | base64 | head -c 32 ; echo)
PRI_PASS=$(date +%s | sha256sum | base64 | head -c 64 | tail -c 32 ; echo)
etcdctl $ETCDCTL_HOSTS user add public:$PUB_PASS
etcdctl $ETCDCTL_HOSTS user add private:$PRI_PASS
etcdctl $ETCDCTL_HOSTS role add public_role
etcdctl $ETCDCTL_HOSTS role add private_role
etcdctl $ETCDCTL_HOSTS user grant-role public public_role
etcdctl $ETCDCTL_HOSTS user grant-role private private_role
etcdctl $ETCDCTL_HOSTS role grant-permission public_role --prefix=true readwrite /public
etcdctl $ETCDCTL_HOSTS role grant-permission private_role --prefix=true readwrite /public
etcdctl $ETCDCTL_HOSTS role grant-permission private_role --prefix=true readwrite /_cluster
etcdctl $ETCDCTL_HOSTS put /public/init "$(date)"
etcdctl $ETCDCTL_HOSTS get /public/init
etcdctl $ETCDCTL_HOSTS put /_cluster/init "$(date)"
etcdctl $ETCDCTL_HOSTS user add root:$ROOT_PASS
etcdctl $ETCDCTL_HOSTS auth enable
if [ $AUTH ]; then
echo "public:$PUB_PASS" > $AUTH
echo "private:$PRI_PASS" >> $AUTH
echo "root:$ROOT_PASS" >> $AUTH
fi
echo "public:$PUB_PASS"
echo "private:$PRI_PASS"
echo "root:$ROOT_PASS"
And here's the etcd.sh script I use to run the server:
#!/bin/bash
echo "etcd operation script"
if [ "$EUID" -ne 0 ]
then echo "please run as root"
exit
fi
if [ "$#" -ne 4 ]; then
echo "usage: `basename "$0"` <cert path> <data path> <inf> <listen port>"
echo "example: ./`basename "$0"` /var/test/etcd/certs /var/test/etcd/data eth0 2379"
exit
fi
DATA=$2
CERTS=$1
INF=$3
#IP=$3
PORT=$4
NEW=true
if [ -d $DATA ]; then
NEW=false
fi
echo "using: data=$DATA certs=$CERTS ip=$IP port=$PORT"
HOST=$(hostname)
IP=$(ip -4 addr show $INF | grep -oP '(?<=inet\s)\d+(\.\d+){3}')
if [ $NEW = true ]; then
rm -rf ./auth
bash -c "sleep 3; ./etcd_init.sh $CERTS 127.0.0.1 2379 ./auth" &
fi
etcd --enable-v2=false --data-dir $DATA --name $HOST --trusted-ca-file=$CERTS/ca.pem --cert-file=$CERTS/server.pem --key-file=$CERTS/server-key.pem --listen-client-urls=https://127.0.0.1:$PORT,https://$IP:$PORT --advertise-client-urls=https://127.0.0.1:$PORT,https://$IP:$PORT,https://$(hostname):$PORT,https://$(hostname -s):$PORT
Hi, get
is a common op in KVAPI, I think it should be added in KVAPI class.
I was trying to do a (somewhat large - 10e3s of records) delete.
await db.delete_range('prefix',prefix=True)
File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/usr/local/lib/python3.7/dist-packages/xgw/cli.py", line 208, in _del
await db.delete_range(key, prefix=prefix)
File "/usr/local/lib/python3.7/dist-packages/etcd3/aio_client.py", line 32, in __modelize
await self.client._raise_for_status(self._resp)
File "/usr/local/lib/python3.7/dist-packages/etcd3/aio_client.py", line 233, in _raise_for_status
raise get_client_error(error, code, status, resp)
etcd3.errors.go_etcd_rpctypes_error.ErrTimeout: <ErrTimeout error:'etcdserver: request timed out', code:14>
Is there a way to adjust up that timeout? if so, where/how?
Not able to catch exceptions, how do we handle errors ?
from etcd3.errors import Etcd3Exception
try:
nb_keys = self.client.range(key, count_only=True)
except Etcd3Exception as e:
File "/home/jsfrerot/code/python_envs/symapid/lib/python3.6/site-packages/etcd3/client.py", line 148, in _raise_for_status
raise get_client_error(error, code, status, resp)
etcd3.errors.go_etcd_rpctypes_error.ErrInvalidAuthToken: <ErrInvalidAuthToken error:'etcdserver: invalid auth token', code:16>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "/opt/2T/git/symapid/symapid/services/etcd.py", line 52, in key_exists
nb_keys = self.client.range(key, count_only=True)
TypeError: catching classes that do not inherit from BaseException is not allowed
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.