nimzymaina / flask_kafka Goto Github PK
View Code? Open in Web Editor NEWFlask Kafka consumer full implementation example. Ideal for a microservices architecture.
Flask Kafka consumer full implementation example. Ideal for a microservices architecture.
Hi,
I love flask_kafka but I have been battling trying to improve the consumer message throughput from the ~30-40 messages per second that I can do on my modern computer.
I tried changing the underlying library from the pure python Kafka library to the librdkafka based confluent_kafka and the message throughput went up from 30-5000 per second with no other changes.
I made a fork and changed the example file to show what change is needed in this library to make it work:
https://github.com/andydavidson/flask_kafka
Do you think this is something we can adopt into the mainline flask_kafka project? It's only really the change in configuration, e.g. bootstrap-servers becomes bootstrap.servers that means the change will impact most of the users. :-(
I am not happy with the interrupt behaviour - the consumer sometimes fails to stop when you send the signal with ctrl-c, but perhaps someone with a little more experience can improve this.
Usually the kafka event handlers need to perform database operations. It would be nice if the application context can be added into the extension so that common database packges such as SQLAlchmey
can be used without worrying about session management. This feature request is something similar to how Flask-CeleryExt is implemented.
Hello @NimzyMaina Thanks for creating this awesome module. However, after consistently using it, I realized that during kafka led errors such as CommitFailedError, the consumer stops working and gets hung up and also stops consuming next messages in the queue. I want to log this exception for certain messages and then move steadily to consume the next message. Can you please guide me on this?
I am facing this error again and again.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/home/sneha/sneha/rps/flask_AI_AWS/model/flask_kafka/consumer.py", line 59, in _start
for msg in self.consumer:
File "/home/sneha/sneha/rps/venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1192, in next
return self.next_v2()
File "/home/sneha/sneha/rps/venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1200, in next_v2
return next(self._iterator)
File "/home/sneha/sneha/rps/venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1115, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/sneha/sneha/rps/venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 654, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/sneha/sneha/rps/venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 674, in _poll_once
self._coordinator.poll()
File "/home/sneha/sneha/rps/venv/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 286, in poll
self.ensure_active_group()
File "/home/sneha/sneha/rps/venv/lib/python3.6/site-packages/kafka/coordinator/base.py", line 427, in ensure_active_group
time.sleep(self.config['retry_backoff_ms'] / 1000)
File "/usr/lib/python3.6/threading.py", line 243, in exit
return self._lock.exit(*args)
RuntimeError: cannot release un-acquired lock
Any suggestion on this?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.