Comments (20)
Hmmm, this looks like some issue with multiprocessing init in Windows. It is a good bug to catch. I guess we have to take care of platform specific issues w.r.t windows (as documented at http://docs.python.org/2/library/multiprocessing.html#windows)
I will work on it and let you know.
from kafka-python.
Thankyou - happy to test something at the appropriate time if it helps
from kafka-python.
Looked into it further. Looks like in Windows, we cannot have methods as 'target' for Process.init()
We have to define a normal function and use it. This will take some work. In the mean-time can you continue using async=False ?
from kafka-python.
async=False is fine for now. My deployment target is likely to be Linux
anyway so no immediate rush. Thanks.
On 26 September 2013 08:39, Mahendra M [email protected] wrote:
Looked into it further. Looks like in Windows, we cannot have methods as
'target' for Process.init()
We have to define a normal function and use it. This will take some work.
In the mean-time can you continue using async=False ?—
Reply to this email directly or view it on GitHubhttps://github.com//issues/46#issuecomment-25148845
.
from kafka-python.
Have made a tentative fix. Cannot try it since I do not have windows.
Would it be possible for you to try running it from this branch and let me know if it worked?
https://github.com/mahendra/kafka-python/tree/windows
from kafka-python.
Unfortunately it's still not going:
<type 'exceptions.Exception'>
Can't pickle <built-in method recvfrom_into of _socket.socket object at 0x037478E0>: it's not found as __mai
n__.recvfrom_into
[26/Sep/2013 10:03:45] "GET /sigs/hits/536f0932f30a45159b9b629a34659764/0/ HTTP/1.1" 200 27
Traceback (most recent call last):
File "C:\Users\astock\workspace\checklist\checklist\sigcheck\views.py", line 36, in upload
request.upload_handlers = [QueueUploadHandler(docid)]
File "C:\Users\astock\workspace\checklist\checklist\sigcheck\QueueUploadHandler.py", line 27, in __init__
self.producer = SimpleProducer(self.kafka, "test", async=True)#False,
File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 185, in __init__
batch_send_every_t)
File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 125, in __init__
self.proc.start()
File "c:\python27\Lib\multiprocessing\process.py", line 130, in start
self._popen = Popen(self)
File "c:\python27\Lib\multiprocessing\forking.py", line 271, in __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "c:\python27\Lib\multiprocessing\forking.py", line 193, in dump
ForkingPickler(file, protocol).dump(obj)
File "c:\python27\Lib\pickle.py", line 224, in dump
self.save(obj)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 562, in save_tuple
save(element)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 686, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 548, in save_tuple
save(element)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 748, in save_global
(obj, module, name))
PicklingError: Can't pickle <built-in method recvfrom_into of _socket.socket object at 0x037478E0>: it's not
found as __main__.recvfrom_into
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:\python27\Lib\multiprocessing\forking.py", line 374, in main
self = load(from_parent)
File "c:\python27\Lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "c:\python27\Lib\pickle.py", line 858, in load
dispatch[key](self)
File "c:\python27\Lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError
from kafka-python.
@sweetcheeks24 also had this issue on Windows. I'm marking as wontfix since I'm not really sure there's anything we can do about it.
If you're on Windows, don't use async I guess :-/
from kafka-python.
Checked in detail. this can be fixed, but we will hold on it till the zookeeper branch is checked in. Some of those patches will be required for it.
from kafka-python.
@mumrah - we have to be careful of this feature. If multiprocessing is a problem in windows, things like MultiProcessConsumer will not work. We need to address this issue.
from kafka-python.
@watchforstock can you give my branch one more try. I have tried to ensure that the following feature will work in windows
Give a quick check. If things work, I will send a merge request to @mumrah
from kafka-python.
Looks like I'm still getting the same error unfortunately about not being able to pickle method recvfrom_into of _socket.socket
from kafka-python.
oh! ok. let me look into it. a trace would be helpful.
i will try and find myself a windows box for debugging. If I can't, I will keep bothering you.. :-)
from kafka-python.
I was able to get around the issue by switching multiprocessing.Process to threading.Thread. I am not sure of the overall impact that may have as I am only using the producer in my efforts for integrating with tcollector. (I should also add that I have not actually successfully produced anything since I am troubleshooting other tcollector/windows/python compatibility issues).
#self.proc = Process(target=self._send_upstream, args=(self.queue,))
self.proc = Thread(target=self._send_upstream, args=(self.queue,))
from kafka-python.
Thread will not exactly solve the problem. This module was for folks who did not want to use threads. I have new feature adding driver support to kafka, where you can switch between gevent, thread and process. Coming up soon.
Meanwhile, I figured out this problem (I hope). The issue is that the socket object is not pickle-able. So, I have written some hacks for it and pushed it to my "windows" branch. Could you give it one more try please?
Sorry about this. The branch is available here:
https://github.com/mahendra/kafka-python/tree/windows
from kafka-python.
Not sure if it is just my setup, but I got this:
Process Process-1:
Traceback (most recent call last):
File "c:\Python27\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "c:\Python27\lib\multiprocessing\process.py", line 114, in run
self._target(_self._args, *_self._kwargs)
File "F:\collectors\lib\kafka\producer.py", line 29, in _send_upstream
client.reinit()
AttributeError: 'NoneType' object has no attribute 'reinit'
from kafka-python.
Thanks for your efforts - I'll give it a try once I'm back in the office on
Monday
Thanks
On 4 October 2013 17:07, Mahendra M [email protected] wrote:
Thread will not exactly solve the problem. This module was for folks who
did not want to use threads. I have new feature adding driver support to
kafka, where you can switch between gevent, thread and process. Coming up
soon.Meanwhile, I figured out this problem (I hope). The issue is that the
socket object is not pickle-able. So, I have written some hacks for it and
pushed it to my "windows" branch. Could you give it one more try please?Sorry about this. The branch is available here:
https://github.com/mahendra/kafka-python/tree/windows—
Reply to this email directly or view it on GitHubhttps://github.com//issues/46#issuecomment-25710364
.
from kafka-python.
Actually, it was my mistake. I had not tested it thoroughly.
Have fixed it. Do test it once more when you have time. Will try testing it
myself as well.
On Sat, Oct 5, 2013 at 12:51 AM, sweetcheeks24 [email protected]:
Not sure if it is just my setup, but I got this:
Process Process-1:
Traceback (most recent call last):
File "c:\Python27\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "c:\Python27\lib\multiprocessing\process.py", line 114, in run
self._target(_self._args, *_self._kwargs)
File "F:\collectors\lib\kafka\producer.py", line 29, in _send_upstream
client.reinit()
AttributeError: 'NoneType' object has no attribute 'reinit'—
Reply to this email directly or view it on GitHubhttps://github.com//issues/46#issuecomment-25723855
.
Mahendra
from kafka-python.
The error has changed now so definitely making progress:
Traceback (most recent call last):
File "c:\python27\Lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "c:\python27\Lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 29, in _send_upstream
client.reinit()
File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\client.py", line 197, in reinit
conn.reinit()
File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\conn.py", line 118, in reinit
self._sock.close()
AttributeError: 'NoneType' object has no attribute 'close'
It appears that although the code runs, no messages are actually making it to the Kafka broker
from kafka-python.
I have done some testing from my side and made a pull request for this #61.
@watchforstock if you confirm that it works in windows, I will merge the ticket.
from kafka-python.
HI ,
I am using modified tail2kafka which is log2 kafka when i run i get the below error
Traceback (most recent call last):
File "log2kafka.py", line 46, in
main()
File "log2kafka.py", line 40, in main
producer = kafka.producer.Producer(options.topic,options.host,int(options.port))
File "/home/nandhu/kafka/kafka-storm-cassandra-master/kafka/producer.py", line 29, in init
self.connect()
File "/home/nandhu/kafka/kafka-storm-cassandra-master/kafka/io.py", line 21, in connect
self.socket.connect((self.host, self.port))
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
TypeError: coercing to Unicode: need string or buffer, int found
my Producer is import atexit
import contextlib
import itertools
import struct
import threading
import kafka.io
import kafka.request_type
class Producer(kafka.io.IO):
"""Class for sending data to a Kafka <http://sna-projects.com/kafka/>
_ broker.
:param topic: The topic to produce to.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
:param max_message_sz: The maximum allowed size of a produce request (in bytes). [default: 1MB]
"""
PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE
def init(self, topic, partition=3, host='localhost', port=9092, max_message_sz=104):
kafka.io.IO.init(self, host, port)
self.max_message_sz = max_message_sz
self.topic = topic
self.partition = partition
self.connect()
def _pack_payload(self, messages):
"""Pack a list of messages into a sendable buffer.
:param msgs: The packed messages to send.
:param size: The size (in bytes) of all the `messages` to send.
"""
payload = ''.join(messages)
payload_sz = len(payload)
topic_sz = len(self.topic)
# Create the request as::
# <REQUEST_ID: short>
# <TOPIC_SIZE: short>
# <TOPIC: bytes>
# <PARTITION: int>
# <BUFFER_SIZE: int>
# <BUFFER: bytes>
return struct.pack(
'>HH%dsii%ds' % (topic_sz, payload_sz),
self.PRODUCE_REQUEST_ID,
topic_sz,
self.topic,
self.partition,
payload_sz,
payload
)
def _pack_kafka_message(self, payload):
"""Pack a payload in a format kafka expects."""
return struct.pack('>i%ds' % len(payload), len(payload), payload)
def encode_request(self, messages):
"""Encode a sequence of messages for sending to a kafka broker.
Encoding a request can yeild multiple kafka messages if the payloads exceed
the maximum produce size.
:param messages: An iterable of :class:`Message <kafka.message>` objecjts.
:rtype: A generator of packed kafka messages ready for sending.
"""
encoded_msgs = []
encoded_msgs_sz = 0
for message in messages:
encoded = message.encode()
encoded_sz = len(encoded)
if encoded_sz + encoded_msgs_sz > self.max_message_sz:
yield self._pack_kafka_message(self._pack_payload(encoded_msgs))
encoded_msgs = []
encoded_msgs_sz = 0
msg = struct.pack('>i%ds' % encoded_sz, encoded_sz, encoded)
encoded_msgs.append(msg)
encoded_msgs_sz += encoded_sz
if encoded_msgs:
yield self._pack_kafka_message(self._pack_payload(encoded_msgs))
def send(self, messages):
"""Send a :class:Message <kafka.message>
or a sequence of Messages
to the Kafka server."""
if isinstance(messages, kafka.message.Message):
messages = [messages]
for message in self.encode_request(messages):
sent = self.write(message)
if sent != len(message):
raise IOError('Failed to send kafka message - sent %s/%s many bytes.' % (sent, len(message)))
@contextlib.contextmanager
def batch(self):
"""Send messages with an implict send
."""
messages = []
yield(messages)
self.send(messages)
class BatchProducer(Producer):
"""Class for batching messages to a Kafka <http://sna-projects.com/kafka/>
_ broker with periodic flushing.
:param topic: The topic to produce to.
:param batch_interval: The amount of time (in seconds) to wait for messages before sending.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
"""
MAX_RESPAWNS = 5
PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE
def init(self, topic, batch_interval, partition=3, host='localhost', port=9092):
Producer.init(self, topic, partition=partition, host=host, port=port)
self.batch_interval = batch_interval
self._message_queue = []
self.event = threading.Event()
self.lock = threading.Lock()
self.timer = None
atexit.register(self.close)
self.respawns = 0
def check_timer(self):
"""Starts the flush timer and restarts it after forks."""
if (self.timer and self.timer.is_alive()) or self.respawns > self.MAX_RESPAWNS:
return
self.respawns += 1
self.timer = threading.Thread(target=self._interval_timer)
self.timer.daemon = True
self.timer.start()
self.connect()
def _interval_timer(self):
"""Flush the message queue every batch_interval
seconds."""
while not self.event.is_set():
self.flush()
self.event.wait(self.batch_interval)
def enqueue(self, message):
"""Enqueue a message in the queue.
.. note:: These messages are implicitly sent every `batch_interval` seconds.
:param message: The message to queue for sending at next send interval.
"""
with self.lock:
self.check_timer()
self._message_queue.append(message)
def flush(self):
"""Send all messages in the queue now."""
with self.lock:
if len(self._message_queue) > 0:
self.send(self._message_queue)
# Reset the queue
del self._message_queue[:]
def close(self):
"""Shutdown the timer thread and flush the message queue."""
self.event.set()
self.flush()
if self.timer:
self.timer.join()
from kafka-python.
Related Issues (20)
- kafka.errors.NodeNotReadyError: NodeNotReadyError HOT 1
- Drop support for EOLed Python versions HOT 7
- Question about group.instance.id
- Support for Python versions >=3.9 HOT 27
- An exception occurs when the ConsumerCoordinator object is being deleted HOT 1
- When will be release on PyPi? Waiting for 3.12 support. HOT 1
- Add list of topics/consumers method to get consumers with the specific topic
- "import kafka" fails with "ModuleNotFoundError: No module named 'kafka.vendor.six.moves'" under Python 3.12 HOT 10
- Manual Offset Commit/Heartbeat Deadlock HOT 1
- Misdetection of xerial snappy on RedPanda
- One of the `_retrieve_offsets` calls is missing the `timeout_ms` variable - this leads to an infinite while loop
- Long running processes causing Heartbeat session expirations HOT 2
- Advice needed. How to connect to Kafka with Private Key Certificate
- Can the Private Key and Password be stored in memory instead of a file?
- Trying to Reuse Kafka Instance Fails
- Messages exceeding the size are silently dropped HOT 1
- invalid literal for int() with base 10: 'A.B.C.D' HOT 1
- MAINTAINER NOTICE: RELEASES ARE CURRENTLY PAUSED HERE, VISIT THE FORK HOT 1
- ReadTheDocs builds currently broken
- Kafka-python errore HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-python.