wtolson / gnsq Goto Github PK
View Code? Open in Web Editor NEWA gevent based python client for the NSQ distributed messaging platform.
Home Page: http://gnsq.readthedocs.org
License: BSD 3-Clause "New" or "Revised" License
A gevent based python client for the NSQ distributed messaging platform.
Home Page: http://gnsq.readthedocs.org
License: BSD 3-Clause "New" or "Revised" License
So based on the high-level reader, it is already abstracting the connection to a nsqd
from gnsq import Consumer
consumer = gnsq.Consumer('topic', 'channel', 'localhost:4150')
@consumer.on_message.connect
def handler(consumer, message):
print 'got message:', message.body
consumer.start()
How does authentication works with Consumer
? How to send an auth_secret
with gnsq?
See nsqio/nsq#608
Hi Trevor! First of all -- thank you so much for all your hard work on gnsq. It's a lovely little library and for the most part has been absolutely flawless for us.
Unfortunately we're having an issue at the moment with gnsq v0.2.3 (and nsqd v0.3.0) in which it seems like a network blip causes a Reader
to lock itself in a ready state of zero and never recover.
We're getting the following error message in the logs:
[172.17.11.122:4150] error requeueing message (NSQSocketError(32, 'Broken pipe'))
This appears to come from line 699 of reader.py
, and seems to me to imply that we're having a network blip while attempting a requeue. Thereafter, it seems like RDY
never increments again. You can see the issue where we're dealing with this internally at hypothesis/h#2304.
I've tried to recreate the conditions under which this might occur using blockade (see this repo), but as yet haven't succeeded.
I've perused the source code, and I'm guessing this might be the issue you fixed in d193160 (and 6905a14). Does that sound right?
If not, is there any way I can help track this issue down?
Hello, I have some long-running services that use gnsq to read messages from one topic, apply some kind of transformation and then output them onto another topic. Occasionally, the TCP connections to the different nsqd might die. This is not a problem for the Consumer as it reconnects right away and starts to fetch messages again. However, sometimes the Producer tries to write to a closed/dead connection and crashes its greenlet in such a situation. This stalls my service and requires a restart. I think gnsq should really put some error handling in place here to make sure this doesn't happen. I would expect my message to be published via an active connection, or the publish to throw an exception that I can catch myself.
I have attached a log of such a crash.
_gnsq.txt
Hi,
I have a running high qps nsqd
producer cluster with dozens of machines, 500k messages per second.
A single CPython process Reader
max out at 3k messages per second no matter how I try.
For the current version of gnsq
, we have query_lookupd and
query_nsqd` operations in the same CPython process. But gevent can only max out 1 single core anyway.
if I duplicate the setup into like 20 process, the work load can be spread into many cores and maybe finish the consuming speed in time. but the problem is increased nsq connections to each producer. A single process is slow, but still requires to connect all nsqd
discovered in lookupd
.
This is a waste of TCP connections. For multiple consumer channels, producer nsqd
would soon have over 10k connections.
So for the sake of saving opened connections, I am thinking of a new master-worker architecture structure:
lookupd
and manage worker processes.nsqd
connections and consume themnsqd
producer. The processing speed can scale up by the number of workers.Is the idea valid? Could it fit in the goal of the gnsq
project?
I want my program to behave like both nsq client and service.
import gnsq
from flask import Flask
def nsq():
reader = gnsq.Reader('topic', 'channel', '192.168.0.97:4150', '192.168.0.97:4161')
@reader.on_message.connect
def handler(reader, message):
print('got message:', message.body)
reader.start()
nsq()
app = Flask(__name__)
count = 0
@app.route('/')
def hello():
count += 1
return 'Hello World! I have been seen {} times.\n'.format(count)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=False)
But when I use this, I can't able to make it service. I stuck at nsq reader.
Is it OK to use nsq()
with threads.
th = threading.Thread(target=nsq)
th.daemon = True
th.start()
When I use the TLS cerification of gnsq, the code raised a exception.
The Traceback of the exception is
Traceback (most recent call last):
File "test.py", line 10, in <module>
producer.start()
File "/usr/local/lib/python3.5/dist-packages/gnsq/producer.py", line 112, in start
self.connect_to_nsqd(address, int(port))
File "/usr/local/lib/python3.5/dist-packages/gnsq/producer.py", line 154, in connect_to_nsqd
conn.identify()
File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 381, in identify
self.upgrade_to_tls()
File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 319, in upgrade_to_tls
self.check_ok()
File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 307, in check_ok
frame, data = self.read_response()
File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 257, in read_response
processed_data = frame_handler(data)
File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 265, in handle_response
self.on_response.send(self, response=data)
File "/usr/local/lib/python3.5/dist-packages/blinker/base.py", line 267, in send
for receiver in self.receivers_for(sender)]
File "/usr/local/lib/python3.5/dist-packages/blinker/base.py", line 267, in <listcomp>
for receiver in self.receivers_for(sender)]
File "/usr/local/lib/python3.5/dist-packages/gnsq/producer.py", line 217, in handle_response
result = self._response_queues[conn].popleft()
It seems that the self._response_queues of conn is missing, because the value of self._response_queues is {} when running at the last exception code.
The NSQD(v1.1.0) is run by the following command:
./nsqd --lookupd-tcp-address=127.0.0.1:4160 -tls-cert=./certs/server.pem -tls-key=./certs/server.key -tls-root-ca-file=./certs/ca.pem -tls-required=true
where, the certs folder is generated by the official command of nsq.
The Python code to connect to the nsqd is shown as following:
from gnsq import Producer
tls_options = {
'keyfile': "./certs/server.key",
'certfile': "./certs/server.pem",
}
producer = Producer('127.0.0.1:4150',tls_v1 = True, tls_options = tls_options)
producer.start()
producer.publish('topic', b'hello world')
because of these lines in gnsq/nsqd.py
def _validate_http_mpub(self, message):
if '\n' not in message:
return message
if a pass a message like b'xxx'
, there will be a type error:
File "/home/kongyifei/repos/crawler/.venv/lib/python3.6/site-packages/gnsq/nsqd.py", line 503, in multipublish_http
body='\n'.join(self._validate_http_mpub(m) for m in messages)
File "/home/kongyifei/repos/crawler/.venv/lib/python3.6/site-packages/gnsq/nsqd.py", line 503, in <genexpr>
body='\n'.join(self._validate_http_mpub(m) for m in messages)
File "/home/kongyifei/repos/crawler/.venv/lib/python3.6/site-packages/gnsq/nsqd.py", line 491, in _validate_http_mpub
if '\n' not in message:
TypeError: a bytes-like object is required, not 'str'
So I'm trying to read stuff at nsqd from a simple flask server, I followed the docs for the Consumer: high-level message reader
server.py
from flask import Flask
import gnsq
consumer = gnsq.Consumer('test_topic', 'test_channel', '0.0.0.0:32807')
@consumer.on_message.connect
def handler(consumer, message):
print ('got message:', message.body)
consumer.start()
app = Flask(__name__)
@app.route("/")
def hello():
return "Flasky"
and all I get when running the server is
[0.0.0.0:32807] connection failed (NSQSocketError(57, 'Socket is not connected'))
.
I have exhausted the ports I know to connect but no luck. Kindly help?
>>> import gnsq
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/cevin/a/venv/lib/python3.7/site-packages/gnsq/__init__.py", line 4, in <module>
from .reader import Reader
File "/Users/cevin/a/venv/lib/python3.7/site-packages/gnsq/reader.py", line 110
async=False,
^
SyntaxError: invalid syntax
See nsqio/nsq#293
I am going to lump these together:
snappy-c.h
build error.requirements.txt
# Install gnsq itself
-e .
Potential bug. See nsqio/pynsq#129
See nsqio/pynsq#135
I'm running a nsq server with one topic and four gnsq reader with one channel each on different systems.
When I look at nsqadmin I see two channels with one connection, one channel with five connections and one with six.
Why are there five connections from the same gnsq reader?
Is this normal behavior?
import gnsq
producer = gnsq.Producer('0.0.0.0:4150')
producer.start()
producer.publish('topic', 'channel', 'testpub!!!')
is there a specific message format to follow when publishing a message?
When I connect to nsqd using gnsq.Nsqd
, 60 seconds after the connection, I see the following on nsqd logs
[nsqd] 2016/10/28 12:01:08.655200 PROTOCOL(V2): [127.0.0.1:53326] exiting ioloop
[nsqd] 2016/10/28 12:01:08.655677 ERROR: client(127.0.0.1:53326) - failed to read command - read tcp 127.0.0.1:4150->127.0.0.1:53326: i/o timeout
[nsqd] 2016/10/28 12:01:08.655987 PROTOCOL(V2): [127.0.0.1:53326] exiting messagePump
The message sent using publish_tcp
doesn't reach the nsqd and neither does it throw an error. However, the second message I send through publish_tcp
raises a gnsq.errors.NSQSocketError: [Errno 107] Socket is not connected
. I think this error should be raised on the first message and not the second.
0.3.3
$ uname -a
Linux ant-Dell-n411z 3.13.0-93-generic #140-Ubuntu SMP Mon Jul 18 21:21:05 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
$ lsb_release -a
Distributor ID: Ubuntu
Description: Ubuntu 14.04.4 LTS
Release: 14.04
Codename: trusty
I've noticed this happening on nsqd v0.3.8 (built w/go1.6.2)
and nsqd v0.2.30 (built w/go1.3)
from gevent import monkey; monkey.patch_all()
import gnsq
import gevent
def on_error(reader, error):
print "got error %s" % error
def on_close(reader):
print "closed..."
conn = gnsq.Nsqd(address="127.0.0.1", tcp_port=4150, http_port=4151)
conn.ping()
conn.connect()
conn.on_error.connect(on_error)
conn.on_close.connect(on_close)
try:
print "publising message 1"
conn.publish_tcp("TestTopic", "Did you get message 1?")
print "successfully published message 1"
# sleep for two minutes in main thread
print "sleeping for 1 minute and 2 seconds"
gevent.sleep(62)
print "awake!"
print "publishing message 2"
conn.publish_tcp("TestTopic", "Did you get message 2?")
print "successfully published message 2"
print "publishing message 3"
conn.publish_tcp("TestTopic", "Did you get message 3?")
print "successfully published message 2"
finally:
conn.close()
$1/nsqd -verbose -verbose &
sleep 2
$1/nsq_tail -channel default -nsqd-tcp-address 127.0.0.1:4150 -topic TestTopic &
sleep 2
python test_gnsq.py
curl http://localhost:4151/stats?format=json | python -m json.tool
echo ""
sleep 3
jobs -p | xargs kill -9
run.sh requires a path to the nsqd binary directory ( I tested it on different nsqd versions )
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
tar -xvzf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
bash run.sh nsq-0.3.8.linux-amd64.go1.6.2/bin/
Alternatively, you could run bash run.sh /usr/bin
if nsq binaries are present there.
The entire output is available here https://gist.github.com/algrebe/5d715eb031f110905b9b2335e0ca52fe
Here are the relevant portions
$ bash run.sh nsq-0.3.8.linux-amd64.go1.6.2/bin/
# nsq_tail connects
[nsqd] 2016/10/28 12:00:06.256774 TCP: new client(127.0.0.1:53323)
[nsqd] 2016/10/28 12:00:06.256930 CLIENT(127.0.0.1:53323): desired protocol magic ' V2'
[nsqd] 2016/10/28 12:00:06.259701 PROTOCOL(V2): [127.0.0.1:53323] [SUB TestTopic default]
[nsqd] 2016/10/28 12:00:06.260200 TOPIC(TestTopic): created
[nsqd] 2016/10/28 12:00:06.260692 TOPIC(TestTopic): new channel(default)
[nsqd] 2016/10/28 12:00:06.261209 PROTOCOL(V2): [127.0.0.1:53323] [RDY 200]
# python gnsq.Nsqd connects
[nsqd] 2016/10/28 12:00:08.652703 TCP: new client(127.0.0.1:53326)
[nsqd] 2016/10/28 12:00:08.653426 CLIENT(127.0.0.1:53326): desired protocol magic ' V2'
# python script - gnsq.Nsqd sends message and then sleeps
publising message 1
successfully published message 1
sleeping for 1 minute and 2 seconds
# nsqd sends this message to nsq_tail
[nsqd] 2016/10/28 12:00:08.654558 PROTOCOL(V2): [127.0.0.1:53326] [PUB TestTopic]
[nsqd] 2016/10/28 12:00:08.655044 PROTOCOL(V2): writing msg(070155f1855b1000) to client(127.0.0.1:53323) - Did you get message 1?
[nsqd] 2016/10/28 12:00:08.655153 [127.0.0.1:53323] state rdy: 200 inflt: 1
[nsqd] 2016/10/28 12:00:08.655323 [127.0.0.1:53323] state rdy: 200 inflt: 1
# nsq_tail recieves message
Did you get message 1?
[nsqd] 2016/10/28 12:00:08.655820 PROTOCOL(V2): [127.0.0.1:53323] [FIN 070155f1855b1000]
# after one minute, this message comes w.r.t gnsq.Nsqd client
[nsqd] 2016/10/28 12:01:08.655200 PROTOCOL(V2): [127.0.0.1:53326] exiting ioloop
[nsqd] 2016/10/28 12:01:08.655677 ERROR: client(127.0.0.1:53326) - failed to read command - read tcp 127.0.0.1:4150->127.0.0.1:53326: i/o timeout
[nsqd] 2016/10/28 12:01:08.655987 PROTOCOL(V2): [127.0.0.1:53326] exiting messagePump
# python script - out of sleep
awake!
publishing message 2
successfully published message 2
# NOTE no error was generated here, it thinks that the message was successfully sent when it hasnt !
publishing message 3
# NOTE only on publishing this message does it raise the exception and close the socket
closed...
Traceback (most recent call last):
File "test_gnsq.py", line 37, in <module>
conn.close()
File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/nsqd.py", line 470, in close
self.send(nsq.close())
File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/nsqd.py", line 239, in send
return self.stream.send(data, async)
File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/stream/stream.py", line 90, in send
self.ensure_connection()
File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/stream/stream.py", line 49, in ensure_connection
raise NSQSocketError(ENOTCONN, 'Socket is not connected')
gnsq.errors.NSQSocketError: [Errno 107] Socket is not connected
# examining the curl output shows us that nsqd only recieved one message
{
"data": {
"start_time": 1477636204,
"topics": [
{
"channels": [
{
"channel_name": "default",
"clients": [
{
"deflate": false,
"finish_count": 1,
"in_flight_count": 0,
"message_count": 1,
"remote_address": "127.0.0.1:53323",
"state": 3,
"user_agent": "nsq_tail/0.3.8 go-nsq/1.0.5",
"version": "V2"
}
],
"deferred_count": 0,
"depth": 0,
"e2e_processing_latency": {
"count": 0,
"percentiles": null
},
"in_flight_count": 0,
"message_count": 1,
"paused": false,
"requeue_count": 0,
"timeout_count": 0
}
],
"depth": 0,
"e2e_processing_latency": {
"count": 0,
"percentiles": null
},
"message_count": 1,
"paused": false,
"topic_name": "TestTopic"
}
],
"version": "0.3.8"
},
"status_code": 200,
"status_txt": "OK"
}
Hi please add support for log level and whether to discard log completely.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.