Giter VIP home page Giter VIP logo

asyncnsq's People

Contributors

aohan237 avatar atugushev avatar jettify avatar patrick-ryan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

asyncnsq's Issues

License

I've noticed that the library has no license. Could it be added so that the users would know the terms of the use of this software?

SyntaxError: 'yield' inside async function

from asyncnsq import create_reader
Traceback (most recent call last):
File "", line 1, in
File "/usr/local/lib/python3.5/dist-packages/asyncnsq/init.py", line 6, in
from .consumer import NsqConsumer
File "/usr/local/lib/python3.5/dist-packages/asyncnsq/consumer.py", line 106
yield await self._queue.get()

100% cpu usage when subscribe topic

platform:
LSB Version: :core-4.1-amd64:core-4.1-noarch
Distributor ID: CentOS
Description: CentOS Linux release 7.5.1804 (Core)
Release: 7.5.1804
Codename: Core

python: python3.6
asyncnsq version: 0.4.1
code:

class Mail(BaseApi):
    executor = ThreadPoolExecutor(max_workers=20)

    def __init__(self):
        super(Mail, self).__init__()
        loop = asyncio.get_event_loop()
        task = [loop.create_task(self.mail_consumer()) for _ in range(10)]

    @arg_parser(("mails", list), ("send_manner", str), ("send_type", str))
    # @authentication()
    async def send_mail(self, request):
        """
        发送邮件,可直接发送,也可放入队列中发送
        :param request:
        :return:
        """

        try:
            MailSenderManner(request.requestdata['send_manner'])
        except:
            raise MailUnkownMannerError("未识别的邮件发送方式")

        try:
            MailSendType(request.requestdata['send_type'])
        except:
            raise MailUnkownTypeError("未识别的邮件类型")

        mail_host = request.app['config']['notice']['mail']['email_from_host']
        mail_port = request.app['config']['notice']['mail']['email_port']
        mail_send_user = request.app['config']['notice']['mail']['email_from_user']
        mail_send_pwd = request.app['config']['notice']['mail']['email_from_pass']


        if request.requestdata['send_manner'] == MailSenderManner.Direct.value:
            for mail in request.requestdata['mails']:
                partial = functools.partial(self.send, mail_host, mail_port, mail_send_user,
                                            mail_send_pwd,mail_send_user,mail.get("to", ""),mail.get("cc", ""),
                                            mail['subject'],mail['content'],
                                            False,False,MailSendType(request.requestdata['send_type']))
                request.app.loop.run_in_executor(self.executor,partial)

        if request.requestdata['send_manner'] == MailSenderManner.Queue.value:
            nsq_producer = await create_nsq_producer(host='127.0.0.1', port=4150,
                                                     heartbeat_interval=30000,
                                                     feature_negotiation=True,
                                                     tls_v1=True,
                                                     snappy=False,
                                                     deflate=False,
                                                     deflate_level=0,
                                                     loop=asyncio.get_event_loop())
            request.requestdata.update({"mail_host": mail_host,
                                        "mail_port":mail_port,
                                        "mail_send_user":mail_send_user,
                                        "mail_send_pwd":mail_send_pwd})
            await nsq_producer.pub('mail', json.dumps(request.requestdata))


        return await self.reply_ok('')


    def send(self, mail_host='', mail_port=25, username='', password='',
             email_from='', email_to='',cc_to='', subject='', content='',
             ssl=False, raise_exception=False, mail_type=MailSendType.Plain):

        msg = MIMEMultipart()
        msg['Subject'] = subject
        msg['From'] = email_from
        msg['To'] = email_to
        msg['Cc'] = cc_to
        send_body = None
        if mail_type == MailSendType.Plain:
            send_body = MIMEText(content, 'plain', 'utf-8')
        elif mail_type == MailSendType.Html:
            template_dir = os.path.dirname(os.path.abspath(__file__))

            env = Environment(loader=FileSystemLoader(template_dir), trim_blocks=True)

            html_mail = env.get_template('templates/alarm_email.html').render(
                subject=subject,
                error_list=[{
                    "subject": subject,
                    "content": content
                }]
            )
            send_body = MIMEText(html_mail, 'html', 'utf-8')

        msg.attach(send_body)

        smtp = smtplib.SMTP_SSL() if ssl else smtplib.SMTP()

        try:

            code, message = smtp.connect(host=mail_host, port=mail_port)
            if code < 200 or code > 299:
                raise smtplib.SMTPResponseException(code, message)
            smtp.login(username, password)
            smtp.sendmail(email_from, email_to.split(',')+cc_to.split(','),msg.as_string())
        except:
            import traceback
            traceback.print_exc()
            smtp.quit()

    async def mail_consumer(self):

        nsq_consumer = await create_nsq_consumer(lookupd_http_addresses=[('127.0.0.1', 4161)], max_in_flight=200)
        await nsq_consumer.subscribe('mail', 'alarm')
        for waiter in nsq_consumer.wait_messages():
            message = await waiter
            try:

                mail_data = json.loads(message.body)

                for mail in mail_data['mails']:

      
                    asyncio.get_event_loop().run_in_executor(self.executor,
                                                     self.send,
                                                     mail_data['mail_host'],
                                                     mail_data['mail_port'],
                                                     mail_data['mail_send_user'],
                                                     mail_data['mail_send_pwd'],
                                                     mail_data['mail_send_user'],
                                                     mail.get('to', ""),
                                                     mail.get("cc", ""),
                                                     mail['subject'],
                                                     mail['content'],
                                                     False,
                                                     False,
                                                     MailSendType(mail_data['send_type'])
                                                     )

                await message.fin()
            except:
                import traceback
                traceback.print_exc()
                await message.fin()

logs:

2018-07-26 13:04:01;DEBUG;server start at 0.0.0.0:10001
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
GET http://127.0.0.1:4161/lookup {'topic': 'mail'} None
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
reconnect writer
reconnect writer
reconnect writer
reconnect writer
reconnect writer
reconnect writer
reconnect writer
reconnect writer
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
reconnect writer
2018-07-26 13:04:01;DEBUG;lookupd response
2018-07-26 13:04:01;DEBUG;{'channels': ['alarm'], 'producers': [{'remote_address': '127.0.0.1:53764', 'hostname': 'iz2ze9997ltwi7y8i9ngjjz', 'broadcast_address': 'iz2ze9997ltwi7y8i9ngjjz', 'tcp_port': 4150, 'http_port': 4151, 'version': '1.0.0-compat'}]}
2018-07-26 13:04:01;DEBUG;('host, port', 'iz2ze9997ltwi7y8i9ngjjz', 4150)
reconnect writer
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
======== Running on http://0.0.0.0:10001 ========
(Press CTRL+C to quit)
2018-07-26 13:04:01;DEBUG;execute command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "sample_rate": 0, "snappy": false, "tls_v1": false, "heartbeat_interval": 30000, "feature_negotiation": true}'
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;('conn.id:', 'tcp://iz2ze9997ltwi7y8i9ngjjz:4150')
2018-07-26 13:04:01;DEBUG;execute command b'SUB mail alarm\n'
this is subscribe..............
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'{"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:01;DEBUG;got nsq data: (0, b'OK')
2018-07-26 13:04:01;DEBUG;execute command b'RDY 1\n'
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')
2018-07-26 13:04:31;DEBUG;got nsq data: (0, b'heartbeat')

cpu usage snapthot:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
25943 root 20 0 1741100 31584 5976 R 100.0 0.4 6:00.03 python3.6

image

but on my mac, it seems work fine ! thanks.

Why the tornado.iostream.StreamClosedError: Stream is closed come up ?

2018-11-30 15:52:09:ERROR:nsq.conn uncaught exception in data event
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 290, in _read_body
self.trigger(event.DATA, conn=self, data=data)
File "/usr/local/lib/python3.5/dist-packages/nsq/event.py", line 84, in trigger
ev(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 503, in _on_data
self.send(protocol.nop())
File "/usr/local/lib/python3.5/dist-packages/nsq/conn.py", line 296, in send
self.stream.write(self.encoder.encode(data))
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 387, in write
self._check_closed()
File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 925, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

My code has a long processing time. Should I use message.touch() ? Is that maybe why it's disconnecting ?

Question about _poll_lookupd() in consumer.py

I was having a problem with my own codebase and was trying to see if the problem originated from this lib. The problem is when subscribing to a topic and channel when using a HTTP consumer, I get the following message:
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 4150)
This seems to originate from the _poll_lookupd function in consumer.py:

async def _poll_lookupd(self, host, port):
        nsqlookup_conn = NsqLookupd(host, port, loop=self._loop)
        try:
            res = await nsqlookup_conn.lookup(self.topic)
            logger.debug('lookupd response')
            logger.debug(res)
        except Exception as tmp:
            logger.error(tmp)
            logger.exception(tmp)

        for producer in res['producers']:
            host = '127.0.0.1' # Why localhost here?
            # producer['broadcast_address']
            port = producer['tcp_port']
            tmp_id = "tcp://{}:{}".format(host, port)
            if tmp_id not in self._connections:
                logger.debug(('host, port', host, port))
                conn = await create_nsq(host, port, queue=self._queue,
                                        loop=self._loop) # I get my error here
                logger.debug(('conn.id:', conn.id))
                self._connections[conn.id] = conn
                self._rdy_control.add_connection(conn)
await nsqlookup_conn.close()

So my real question is, should the connection be made to localhost, even though nsqlookupd in my case is running on a different server? Or am I missing something regarding NSQ?

Thanks in advance!

How to run/write tests?

Recently I've tried to write unit/integrational tests using asyncnsq, but had no luck. I've tried to borrow some tests from the repository, but It looks like the current tests don't work. I've got many import errors, it seems like tests are outdated.

Any plans on fixing tests and integrating tox/pytest/flake8/black/isort/pre-commit/ci which could make the project more friendly for the new contributors? I'd like to offer my help if you are interested.

Code on PyPI and on GitHub are not the same

It seems uploaded to PyPI build has new code, but github repo does not. See:

$ wget https://files.pythonhosted.org/packages/e5/1f/14eb192fa5af5b6c070773bde4a395eff26e5f6560539d7277515e32370f/asyncnsq-1.1.2-py3-none-any.whl

$ unzip asyncnsq-1.1.2-py3-none-any.whl

$ ls -l asyncnsq/
total 136
-rw-r--r--   1 albert  staff   175 Oct 20 09:11 __init__.py
-rw-r--r--   1 albert  staff  8775 Nov 26  2018 connection.py
-rw-r--r--   1 albert  staff   643 Nov 26  2018 consts.py
-rw-r--r--   1 albert  staff  4149 Nov 26  2018 consumer.py
-rw-r--r--   1 albert  staff  1875 Nov 26  2018 containers.py
-rw-r--r--   1 albert  staff  2142 Nov 26  2018 exceptions.py
drwxr-xr-x   9 albert  staff   288 Oct 20 09:12 http
-rw-r--r--   1 albert  staff    56 Nov 26  2018 log.py
-rw-r--r--   1 albert  staff  7881 Nov 26  2018 nsq.py
-rw-r--r--   1 albert  staff  2390 Nov 26  2018 producer.py
-rw-r--r--   1 albert  staff  6055 Nov 26  2018 protocol.py
-rw-r--r--   1 albert  staff   564 Nov 26  2018 selectors.py
drwxr-xr-x  11 albert  staff   352 Oct 20 09:12 tcp
-rw-r--r--   1 albert  staff  3017 Jul  3  2019 utils.py

On Github:

image

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.