Giter VIP home page Giter VIP logo

gnsq's People

Contributors

fmoor avatar ianpreston avatar jabbawookiees avatar lambdaq avatar mbroome avatar venukumar-ch avatar wtolson avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

gnsq's Issues

How to send an auth_secret as a consumer?

So based on the high-level reader, it is already abstracting the connection to a nsqd

from gnsq import Consumer

consumer = gnsq.Consumer('topic', 'channel', 'localhost:4150')

@consumer.on_message.connect
def handler(consumer, message):
    print 'got message:', message.body

consumer.start()

How does authentication works with Consumer? How to send an auth_secret with gnsq?

Problem with RDY locking to zero

Hi Trevor! First of all -- thank you so much for all your hard work on gnsq. It's a lovely little library and for the most part has been absolutely flawless for us.

Unfortunately we're having an issue at the moment with gnsq v0.2.3 (and nsqd v0.3.0) in which it seems like a network blip causes a Reader to lock itself in a ready state of zero and never recover.

We're getting the following error message in the logs:

[172.17.11.122:4150] error requeueing message (NSQSocketError(32, 'Broken pipe'))

This appears to come from line 699 of reader.py, and seems to me to imply that we're having a network blip while attempting a requeue. Thereafter, it seems like RDY never increments again. You can see the issue where we're dealing with this internally at hypothesis/h#2304.

I've tried to recreate the conditions under which this might occur using blockade (see this repo), but as yet haven't succeeded.

I've perused the source code, and I'm guessing this might be the issue you fixed in d193160 (and 6905a14). Does that sound right?

If not, is there any way I can help track this issue down?

Producer writes to dead connection and crashes

Hello, I have some long-running services that use gnsq to read messages from one topic, apply some kind of transformation and then output them onto another topic. Occasionally, the TCP connections to the different nsqd might die. This is not a problem for the Consumer as it reconnects right away and starts to fetch messages again. However, sometimes the Producer tries to write to a closed/dead connection and crashes its greenlet in such a situation. This stalls my service and requires a restart. I think gnsq should really put some error handling in place here to make sure this doesn't happen. I would expect my message to be published via an active connection, or the publish to throw an exception that I can catch myself.
I have attached a log of such a crash.
_gnsq.txt

multiprocessing to spreadup the reader load?

Hi,

I have a running high qps nsqd producer cluster with dozens of machines, 500k messages per second.

A single CPython process Readermax out at 3k messages per second no matter how I try.

For the current version of gnsq, we have query_lookupd and query_nsqd` operations in the same CPython process. But gevent can only max out 1 single core anyway.

if I duplicate the setup into like 20 process, the work load can be spread into many cores and maybe finish the consuming speed in time. but the problem is increased nsq connections to each producer. A single process is slow, but still requires to connect all nsqd discovered in lookupd.

This is a waste of TCP connections. For multiple consumer channels, producer nsqd would soon have over 10k connections.

So for the sake of saving opened connections, I am thinking of a new master-worker architecture structure:

  • The master only periodically check for lookupd and manage worker processes.
  • When worker process are spawn, they create nsqd connections and consume them
  • In this case, each worker runs on its own CPython process, each worker only establish 1 TCP connection to 1 nsqd producer. The processing speed can scale up by the number of workers.

Is the idea valid? Could it fit in the goal of the gnsq project?

gnsq with service

I want my program to behave like both nsq client and service.

import gnsq
from flask import Flask

def nsq():
    reader = gnsq.Reader('topic', 'channel', '192.168.0.97:4150', '192.168.0.97:4161')

    @reader.on_message.connect
    def handler(reader, message):
        print('got message:', message.body)

    reader.start()

nsq()

app = Flask(__name__)
count = 0

@app.route('/')
def hello():
    count += 1 
    return 'Hello World! I have been seen {} times.\n'.format(count)

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, debug=False)

But when I use this, I can't able to make it service. I stuck at nsq reader.
Is it OK to use nsq() with threads.

th = threading.Thread(target=nsq)
th.daemon = True
th.start()

TLS certification failed

When I use the TLS cerification of gnsq, the code raised a exception.
The Traceback of the exception is

Traceback (most recent call last):
  File "test.py", line 10, in <module>
    producer.start()
  File "/usr/local/lib/python3.5/dist-packages/gnsq/producer.py", line 112, in start
    self.connect_to_nsqd(address, int(port))
  File "/usr/local/lib/python3.5/dist-packages/gnsq/producer.py", line 154, in connect_to_nsqd
    conn.identify()
  File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 381, in identify
    self.upgrade_to_tls()
  File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 319, in upgrade_to_tls
    self.check_ok()
  File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 307, in check_ok
    frame, data = self.read_response()
  File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 257, in read_response
    processed_data = frame_handler(data)
  File "/usr/local/lib/python3.5/dist-packages/gnsq/nsqd.py", line 265, in handle_response
    self.on_response.send(self, response=data)
  File "/usr/local/lib/python3.5/dist-packages/blinker/base.py", line 267, in send
    for receiver in self.receivers_for(sender)]
  File "/usr/local/lib/python3.5/dist-packages/blinker/base.py", line 267, in <listcomp>
    for receiver in self.receivers_for(sender)]
  File "/usr/local/lib/python3.5/dist-packages/gnsq/producer.py", line 217, in handle_response
    result = self._response_queues[conn].popleft()

It seems that the self._response_queues of conn is missing, because the value of self._response_queues is {} when running at the last exception code.


The NSQD(v1.1.0) is run by the following command:

./nsqd --lookupd-tcp-address=127.0.0.1:4160 -tls-cert=./certs/server.pem -tls-key=./certs/server.key -tls-root-ca-file=./certs/ca.pem -tls-required=true

where, the certs folder is generated by the official command of nsq.


The Python code to connect to the nsqd is shown as following:

from gnsq import Producer

tls_options = {
    'keyfile': "./certs/server.key",
    'certfile': "./certs/server.pem",
}

producer = Producer('127.0.0.1:4150',tls_v1 = True, tls_options = tls_options)
producer.start()
producer.publish('topic', b'hello world')

multipublish_http only supports str, not bytes

because of these lines in gnsq/nsqd.py

def _validate_http_mpub(self, message):
    if '\n' not in message:
        return message

if a pass a message like b'xxx', there will be a type error:

  File "/home/kongyifei/repos/crawler/.venv/lib/python3.6/site-packages/gnsq/nsqd.py", line 503, in multipublish_http
    body='\n'.join(self._validate_http_mpub(m) for m in messages)
  File "/home/kongyifei/repos/crawler/.venv/lib/python3.6/site-packages/gnsq/nsqd.py", line 503, in <genexpr>
    body='\n'.join(self._validate_http_mpub(m) for m in messages)
  File "/home/kongyifei/repos/crawler/.venv/lib/python3.6/site-packages/gnsq/nsqd.py", line 491, in _validate_http_mpub
    if '\n' not in message:
TypeError: a bytes-like object is required, not 'str'

Socket is not connected

So I'm trying to read stuff at nsqd from a simple flask server, I followed the docs for the Consumer: high-level message reader

server.py

from flask import Flask
import gnsq

consumer = gnsq.Consumer('test_topic', 'test_channel', '0.0.0.0:32807')

@consumer.on_message.connect
def handler(consumer, message):
    print ('got message:', message.body)

consumer.start()

app = Flask(__name__)

@app.route("/")
def hello():
    return "Flasky"

and all I get when running the server is
[0.0.0.0:32807] connection failed (NSQSocketError(57, 'Socket is not connected')).
I have exhausted the ports I know to connect but no luck. Kindly help?

SyntaxError: invalid syntax

>>> import gnsq
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/cevin/a/venv/lib/python3.7/site-packages/gnsq/__init__.py", line 4, in <module>
    from .reader import Reader
  File "/Users/cevin/a/venv/lib/python3.7/site-packages/gnsq/reader.py", line 110
    async=False,
        ^
SyntaxError: invalid syntax

Documentation Clarification and Improvements

I am going to lump these together:

  • The README and Installation documentation should clearly state which versions of Python are supported.
  • External dependencies should be clearly stated, even if they are implied. I got a snappy-c.h build error.
  • If you intend to have people pip install this project, I don't think you want this in your requirements.txt
# Install gnsq itself
-e .

Multiple Reader

I'm running a nsq server with one topic and four gnsq reader with one channel each on different systems.
When I look at nsqadmin I see two channels with one connection, one channel with five connections and one with six.

nsqadmin

Why are there five connections from the same gnsq reader?
Is this normal behavior?

nsqd i/o timeout 60 seconds after gnsqd connects causes first message to be lost instead of raising exception

When I connect to nsqd using gnsq.Nsqd, 60 seconds after the connection, I see the following on nsqd logs

[nsqd] 2016/10/28 12:01:08.655200 PROTOCOL(V2): [127.0.0.1:53326] exiting ioloop
[nsqd] 2016/10/28 12:01:08.655677 ERROR: client(127.0.0.1:53326) - failed to read command - read tcp 127.0.0.1:4150->127.0.0.1:53326: i/o timeout
[nsqd] 2016/10/28 12:01:08.655987 PROTOCOL(V2): [127.0.0.1:53326] exiting messagePump

The message sent using publish_tcp doesn't reach the nsqd and neither does it throw an error. However, the second message I send through publish_tcp raises a gnsq.errors.NSQSocketError: [Errno 107] Socket is not connected. I think this error should be raised on the first message and not the second.

System Info

gnsq version

0.3.3

operating system and version

$ uname -a
Linux ant-Dell-n411z 3.13.0-93-generic #140-Ubuntu SMP Mon Jul 18 21:21:05 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
$ lsb_release -a
Distributor ID: Ubuntu
Description:    Ubuntu 14.04.4 LTS
Release:    14.04
Codename:   trusty

nsqd versions

I've noticed this happening on nsqd v0.3.8 (built w/go1.6.2) and nsqd v0.2.30 (built w/go1.3)

Scripts to reproduce behaviour

test_gnsq.py

from gevent import monkey; monkey.patch_all()

import gnsq
import gevent

def on_error(reader, error):
    print "got error %s" % error

def on_close(reader):
    print "closed..."

conn = gnsq.Nsqd(address="127.0.0.1", tcp_port=4150, http_port=4151)
conn.ping()
conn.connect()
conn.on_error.connect(on_error)
conn.on_close.connect(on_close)

try:
    print "publising message 1"
    conn.publish_tcp("TestTopic", "Did you get message 1?")
    print "successfully published message 1"

    # sleep for two minutes in main thread
    print "sleeping for 1 minute and 2 seconds"
    gevent.sleep(62)
    print "awake!"

    print "publishing message 2"
    conn.publish_tcp("TestTopic", "Did you get message 2?")
    print "successfully published message 2"

    print "publishing message 3"
    conn.publish_tcp("TestTopic", "Did you get message 3?")
    print "successfully published message 2"

finally:
    conn.close()

run.sh

$1/nsqd -verbose -verbose &
sleep 2
$1/nsq_tail -channel default -nsqd-tcp-address 127.0.0.1:4150 -topic TestTopic &
sleep 2
python test_gnsq.py
curl http://localhost:4151/stats?format=json | python -m json.tool
echo ""
sleep 3

jobs -p | xargs kill -9

commands and output

run.sh requires a path to the nsqd binary directory ( I tested it on different nsqd versions )

wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
tar -xvzf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
bash run.sh nsq-0.3.8.linux-amd64.go1.6.2/bin/

Alternatively, you could run bash run.sh /usr/bin if nsq binaries are present there.

output

The entire output is available here https://gist.github.com/algrebe/5d715eb031f110905b9b2335e0ca52fe
Here are the relevant portions

$ bash run.sh nsq-0.3.8.linux-amd64.go1.6.2/bin/

# nsq_tail connects
[nsqd] 2016/10/28 12:00:06.256774 TCP: new client(127.0.0.1:53323)
[nsqd] 2016/10/28 12:00:06.256930 CLIENT(127.0.0.1:53323): desired protocol magic '  V2'
[nsqd] 2016/10/28 12:00:06.259701 PROTOCOL(V2): [127.0.0.1:53323] [SUB TestTopic default]
[nsqd] 2016/10/28 12:00:06.260200 TOPIC(TestTopic): created
[nsqd] 2016/10/28 12:00:06.260692 TOPIC(TestTopic): new channel(default)
[nsqd] 2016/10/28 12:00:06.261209 PROTOCOL(V2): [127.0.0.1:53323] [RDY 200]

# python gnsq.Nsqd connects
[nsqd] 2016/10/28 12:00:08.652703 TCP: new client(127.0.0.1:53326)
[nsqd] 2016/10/28 12:00:08.653426 CLIENT(127.0.0.1:53326): desired protocol magic '  V2'

# python script - gnsq.Nsqd sends message and then sleeps
publising message 1
successfully published message 1
sleeping for 1 minute and 2 seconds

# nsqd sends this message to nsq_tail
[nsqd] 2016/10/28 12:00:08.654558 PROTOCOL(V2): [127.0.0.1:53326] [PUB TestTopic]
[nsqd] 2016/10/28 12:00:08.655044 PROTOCOL(V2): writing msg(070155f1855b1000) to client(127.0.0.1:53323) - Did you get message 1?
[nsqd] 2016/10/28 12:00:08.655153 [127.0.0.1:53323] state rdy:  200 inflt:    1
[nsqd] 2016/10/28 12:00:08.655323 [127.0.0.1:53323] state rdy:  200 inflt:    1

# nsq_tail recieves message
Did you get message 1?
[nsqd] 2016/10/28 12:00:08.655820 PROTOCOL(V2): [127.0.0.1:53323] [FIN 070155f1855b1000]

# after one minute, this message comes w.r.t gnsq.Nsqd client
[nsqd] 2016/10/28 12:01:08.655200 PROTOCOL(V2): [127.0.0.1:53326] exiting ioloop
[nsqd] 2016/10/28 12:01:08.655677 ERROR: client(127.0.0.1:53326) - failed to read command - read tcp 127.0.0.1:4150->127.0.0.1:53326: i/o timeout
[nsqd] 2016/10/28 12:01:08.655987 PROTOCOL(V2): [127.0.0.1:53326] exiting messagePump

# python script - out of sleep
awake!
publishing message 2
successfully published message 2
# NOTE no error was generated here, it thinks that the message was successfully sent when it hasnt !

publishing message 3
# NOTE only on publishing this message does it raise the exception and close the socket
closed...
Traceback (most recent call last):
  File "test_gnsq.py", line 37, in <module>
    conn.close()
  File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/nsqd.py", line 470, in close
    self.send(nsq.close())
  File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/nsqd.py", line 239, in send
    return self.stream.send(data, async)
  File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/stream/stream.py", line 90, in send
    self.ensure_connection()
  File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/stream/stream.py", line 49, in ensure_connection
    raise NSQSocketError(ENOTCONN, 'Socket is not connected')
gnsq.errors.NSQSocketError: [Errno 107] Socket is not connected

# examining the curl output shows us that nsqd only recieved one message
{
    "data": {
        "start_time": 1477636204,
        "topics": [
            {
                "channels": [
                    {
                        "channel_name": "default",
                        "clients": [
                            {
                                "deflate": false,
                                "finish_count": 1,
                                "in_flight_count": 0,
                                "message_count": 1,
                                "remote_address": "127.0.0.1:53323",
                                "state": 3,
                                "user_agent": "nsq_tail/0.3.8 go-nsq/1.0.5",
                                "version": "V2"
                            }
                        ],
                        "deferred_count": 0,
                        "depth": 0,
                        "e2e_processing_latency": {
                            "count": 0,
                            "percentiles": null
                        },
                        "in_flight_count": 0,
                        "message_count": 1,
                        "paused": false,
                        "requeue_count": 0,
                        "timeout_count": 0
                    }
                ],
                "depth": 0,
                "e2e_processing_latency": {
                    "count": 0,
                    "percentiles": null
                },
                "message_count": 1,
                "paused": false,
                "topic_name": "TestTopic"
            }
        ],
        "version": "0.3.8"
    },
    "status_code": 200,
    "status_txt": "OK"
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.