aohan237 / asyncnsq Goto Github PK
View Code? Open in Web Editor NEWThis project forked from jettify/aionsq
asyncio (PEP 3156) nsq (message queue) client.
License: MIT License
This project forked from jettify/aionsq
asyncio (PEP 3156) nsq (message queue) client.
License: MIT License
when NSQ is restarted, my app will never receive any more messages.
I'm having trouble using the Nsqd class to empty a certain topic. When I change the HTTP method to POST
it works.
empty_topic
: https://github.com/aohan237/asyncnsq/blob/master/asyncnsq/http/nsqd.py#L65
I suspect this is the case for some other Nsqd methods too, but this is the only one I have tested.
Robbert
i have seen your pub example ,but u create producer every single time . i decide to it in my tornado project , can your client support to init for a long run to serve my demand ?
I've noticed that the library has no license. Could it be added so that the users would know the terms of the use of this software?
from asyncnsq import create_reader
Traceback (most recent call last):
File "", line 1, in
File "/usr/local/lib/python3.5/dist-packages/asyncnsq/init.py", line 6, in
from .consumer import NsqConsumer
File "/usr/local/lib/python3.5/dist-packages/asyncnsq/consumer.py", line 106
yield await self._queue.get()
platform:
LSB Version: :core-4.1-amd64:core-4.1-noarch
Distributor ID: CentOS
Description: CentOS Linux release 7.5.1804 (Core)
Release: 7.5.1804
Codename: Core
python: python3.6
asyncnsq version: 0.4.1
code:
class Mail(BaseApi):
executor = ThreadPoolExecutor(max_workers=20)
def __init__(self):
super(Mail, self).__init__()
loop = asyncio.get_event_loop()
task = [loop.create_task(self.mail_consumer()) for _ in range(10)]
@arg_parser(("mails", list), ("send_manner", str), ("send_type", str))
# @authentication()
async def send_mail(self, request):
"""
发送邮件,可直接发送,也可放入队列中发送
:param request:
:return:
"""
try:
MailSenderManner(request.requestdata['send_manner'])
except:
raise MailUnkownMannerError("未识别的邮件发送方式")
try:
MailSendType(request.requestdata['send_type'])
except:
raise MailUnkownTypeError("未识别的邮件类型")
mail_host = request.app['config']['notice']['mail']['email_from_host']
mail_port = request.app['config']['notice']['mail']['email_port']
mail_send_user = request.app['config']['notice']['mail']['email_from_user']
mail_send_pwd = request.app['config']['notice']['mail']['email_from_pass']
if request.requestdata['send_manner'] == MailSenderManner.Direct.value:
for mail in request.requestdata['mails']:
partial = functools.partial(self.send, mail_host, mail_port, mail_send_user,
mail_send_pwd,mail_send_user,mail.get("to", ""),mail.get("cc", ""),
mail['subject'],mail['content'],
False,False,MailSendType(request.requestdata['send_type']))
request.app.loop.run_in_executor(self.executor,partial)
if request.requestdata['send_manner'] == MailSenderManner.Queue.value:
nsq_producer = await create_nsq_producer(host='127.0.0.1', port=4150,
heartbeat_interval=30000,
feature_negotiation=True,
tls_v1=True,
snappy=False,
deflate=False,
deflate_level=0,
loop=asyncio.get_event_loop())
request.requestdata.update({"mail_host": mail_host,
"mail_port":mail_port,
"mail_send_user":mail_send_user,
"mail_send_pwd":mail_send_pwd})
await nsq_producer.pub('mail', json.dumps(request.requestdata))
return await self.reply_ok('')
def send(self, mail_host='', mail_port=25, username='', password='',
email_from='', email_to='',cc_to='', subject='', content='',
ssl=False, raise_exception=False, mail_type=MailSendType.Plain):
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = email_from
msg['To'] = email_to
msg['Cc'] = cc_to
send_body = None
if mail_type == MailSendType.Plain:
send_body = MIMEText(content, 'plain', 'utf-8')
elif mail_type == MailSendType.Html:
template_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(template_dir), trim_blocks=True)
html_mail = env.get_template('templates/alarm_email.html').render(
subject=subject,
error_list=[{
"subject": subject,
"content": content
}]
)
send_body = MIMEText(html_mail, 'html', 'utf-8')
msg.attach(send_body)
smtp = smtplib.SMTP_SSL() if ssl else smtplib.SMTP()
try:
code, message = smtp.connect(host=mail_host, port=mail_port)
if code < 200 or code > 299:
raise smtplib.SMTPResponseException(code, message)
smtp.login(username, password)
smtp.sendmail(email_from, email_to.split(',')+cc_to.split(','),msg.as_string())
except:
import traceback
traceback.print_exc()
smtp.quit()
async def mail_consumer(self):
nsq_consumer = await create_nsq_consumer(lookupd_http_addresses=[('127.0.0.1', 4161)], max_in_flight=200)
await nsq_consumer.subscribe('mail', 'alarm')
for waiter in nsq_consumer.wait_messages():
message = await waiter
try:
mail_data = json.loads(message.body)
for mail in mail_data['mails']:
asyncio.get_event_loop().run_in_executor(self.executor,
self.send,
mail_data['mail_host'],
mail_data['mail_port'],
mail_data['mail_send_user'],
mail_data['mail_send_pwd'],
mail_data['mail_send_user'],
mail.get('to', ""),
mail.get("cc", ""),
mail['subject'],
mail['content'],
False,
False,
MailSendType(mail_data['send_type'])
)
await message.fin()
except:
import traceback
traceback.print_exc()
await message.fin()
logs:
2018-07-26 13:04:01;DEBUG;server start at 0.0.0.0:10001
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
reconnect writer
reconnect writer
reconnect writer
reconnect writer
reconnect writer
reconnect writer
reconnect writer
reconnect writer
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
reconnect writer
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
reconnect writer
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
======== Running on http://0.0.0.0:10001 ========
(Press CTRL+C to quit)
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
cpu usage snapthot:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
25943 root 20 0 1741100 31584 5976 R 100.0 0.4 6:00.03 python3.6
but on my mac, it seems work fine ! thanks.
2018-11-30 15:52:09:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
My code has a long processing time. Should I use message.touch() ? Is that maybe why it's disconnecting ?
I was having a problem with my own codebase and was trying to see if the problem originated from this lib. The problem is when subscribing to a topic and channel when using a HTTP consumer, I get the following message:
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 4150)
This seems to originate from the _poll_lookupd function in consumer.py:
async def _poll_lookupd(self, host, port):
nsqlookup_conn = NsqLookupd(host, port, loop=self._loop)
try:
res = await nsqlookup_conn.lookup(self.topic)
logger.debug('lookupd response')
logger.debug(res)
except Exception as tmp:
logger.error(tmp)
logger.exception(tmp)
for producer in res['producers']:
host = '127.0.0.1' # Why localhost here?
# producer['broadcast_address']
port = producer['tcp_port']
tmp_id = "tcp://{}:{}".format(host, port)
if tmp_id not in self._connections:
logger.debug(('host, port', host, port))
conn = await create_nsq(host, port, queue=self._queue,
loop=self._loop) # I get my error here
logger.debug(('conn.id:', conn.id))
self._connections[conn.id] = conn
self._rdy_control.add_connection(conn)
await nsqlookup_conn.close()
So my real question is, should the connection be made to localhost, even though nsqlookupd in my case is running on a different server? Or am I missing something regarding NSQ?
Thanks in advance!
Recently I've tried to write unit/integrational tests using asyncnsq
, but had no luck. I've tried to borrow some tests from the repository, but It looks like the current tests don't work. I've got many import errors, it seems like tests are outdated.
Any plans on fixing tests and integrating tox/pytest/flake8/black/isort/pre-commit/ci which could make the project more friendly for the new contributors? I'd like to offer my help if you are interested.
It seems uploaded to PyPI build has new code, but github repo does not. See:
$ wget https://files.pythonhosted.org/packages/e5/1f/14eb192fa5af5b6c070773bde4a395eff26e5f6560539d7277515e32370f/asyncnsq-1.1.2-py3-none-any.whl
$ unzip asyncnsq-1.1.2-py3-none-any.whl
$ ls -l asyncnsq/
total 136
-rw-r--r-- 1 albert staff 175 Oct 20 09:11 __init__.py
-rw-r--r-- 1 albert staff 8775 Nov 26 2018 connection.py
-rw-r--r-- 1 albert staff 643 Nov 26 2018 consts.py
-rw-r--r-- 1 albert staff 4149 Nov 26 2018 consumer.py
-rw-r--r-- 1 albert staff 1875 Nov 26 2018 containers.py
-rw-r--r-- 1 albert staff 2142 Nov 26 2018 exceptions.py
drwxr-xr-x 9 albert staff 288 Oct 20 09:12 http
-rw-r--r-- 1 albert staff 56 Nov 26 2018 log.py
-rw-r--r-- 1 albert staff 7881 Nov 26 2018 nsq.py
-rw-r--r-- 1 albert staff 2390 Nov 26 2018 producer.py
-rw-r--r-- 1 albert staff 6055 Nov 26 2018 protocol.py
-rw-r--r-- 1 albert staff 564 Nov 26 2018 selectors.py
drwxr-xr-x 11 albert staff 352 Oct 20 09:12 tcp
-rw-r--r-- 1 albert staff 3017 Jul 3 2019 utils.py
On Github:
运行一两天后,asyncnsq.create_nsq_producer ,pub的时候没有错误信息输出,但是没有成功发送消息
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.