Hello,
I am writing a program where I need to listen to events and get the results of actions continuously. Ideally I want to put the events and action results into an asyncio
queue and process that queue. Been trying to get this working for past two days, but its not yet successful. Here is my current code:
I have an ActionListener
which continuously send_action()
and put()
the result into a queue. Similarly I have an EventsListener
which pushes all received events to a queue, and finally an EventsProcess
which fetches items from the queue and processes it.
All these three are run in a main program using multiprocessing.Process()
action_listener.py:
import asyncio
import logging
from panoramisk import Manager
class ActionListener:
def __init__(self, options, queue):
self.loop = asyncio.get_event_loop()
self.events_queue = queue
self.manager = Manager(loop=self.loop, **options)
self.manager.log.addHandler(logging.NullHandler())
async def ping(self):
await self.manager.connect()
while True:
message = await self.manager.send_action({'Action': 'ping'})
print (f"action: putting a message into queue")
# p = yield from self.manager.send_action({'Action': 'SIPpeers'})
await self.events_queue.put(message)
await asyncio.sleep(5)
self.manager.close()
def run(self):
self.loop.run_until_complete(self.ping())
self.loop.close()
events_listener.py:
import asyncio
import logging
from panoramisk import Manager
class EventsListener:
def __init__(self, options, queue):
self.loop = asyncio.get_event_loop()
self.events_queue = queue
self.manager = Manager(loop=self.loop, **options)
self.manager.log.addHandler(logging.NullHandler())
self.manager.register_event("*", self.handle_events)
async def handle_events(self, manager, message):
print (f"events: putting a message into queue")
await self.events_queue.put(message)
async def connect(self):
await self.manager.connect()
def run(self):
try:
self.loop.run_until_complete(self.connect())
self.loop.run_forever()
finally:
self.loop.close()
events_process.py:
import asyncio
import logging
from pprint import pprint
class EventsProcess:
def __init__(self):
self.loop = asyncio.get_event_loop()
self.events_queue = asyncio.Queue()
async def process(self):
while True:
message = await self.events_queue.get()
pprint (message)
def get_queue(self):
return self.events_queue
def run(self):
self.loop.run_until_complete(self.process())
self.loop.close()
and finally the main program is as follows:
from multiprocessing import Process
from events_listener import EventsListener
from action_listener import ActionListener
from events_process import EventsProcess
def run_events_listener(config, events_queue):
listener = EventsListener(config["manager"], events_queue)
listener.run()
def run_action_listener(config, events_queue):
action = ActionListener(config["manager"], events_queue)
action.run()
def run_events_process(p):
p.run()
def main(args=None):
p = EventsProcess()
events_queue = p.get_queue()
p1 = Process(name="erbot_events", target=run_events_listener, args=(config, events_queue,))
p2 = Process(name="erbot_actions",target=run_action_listener, args=(config,events_queue,))
p3 = Process(name="erbot_process",target=run_events_process, args=(p,))
p1.start()
p2.start()
p3.start()
In the above program, message = await self.events_queue.get()
do not seems to be working. No message is being printed, though I can see that putting events does work.
I have removed all unnecessary parts from the program and just posted the bare bones of it. I am sure I am missing some thing with running both action and events together.
I am sure many too would have similar requirement and a working example of both action and events in tandem would be of much help for some one new to this project.