dpkp / kafka-python Goto Github PK
View Code? Open in Web Editor NEWPython client for Apache Kafka
Home Page: http://kafka-python.readthedocs.io/
License: Apache License 2.0
Python client for Apache Kafka
Home Page: http://kafka-python.readthedocs.io/
License: Apache License 2.0
kafka version 0.8.0, kafka-python version: master
Traceback (most recent call last):
File "test_kafka.py", line 88, in
test2()
File "test_kafka.py", line 75, in test2
receive(1000)
File "test_kafka.py", line 48, in receive
consumer.commit()
File "/home/valens/test/kafka/consumer.py", line 144, in commit
resps = self.client.send_offset_commit_request(self.group, reqs)
File "/home/valens/test/kafka/client.py", line 318, in send_offset_commit_request
resps = self._send_broker_aware_request(payloads, encoder, decoder)
File "/home/valens/test/kafka/client.py", line 184, in _send_broker_aware_request
raise FailedPayloadsException(failed_payloads)
kafka.common.FailedPayloadsException: [OffsetCommitRequest(topic='performance_test', partition=0, offset=499, metadata=None), OffsetCommitRequest(topic='performance_test', partition=1, offset=499, metadata=None)]
[2013-12-13 11:53:32,137] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(Unknown Source)
at java.nio.ByteBuffer.get(Unknown Source)
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:42)
at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:41)
at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265)
at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:41)
at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:40)
at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:40)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:49)
at kafka.network.Processor.read(SocketServer.scala:353)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Unknown Source)
[2013-12-13 11:53:32,180] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
commit() is called mamually while every 1000 msg consumed. the topic has 2 partitions.
auto_commit() also returns the same error.
Please give me a hand...Thx
Hey,
This isn't code related but I'm marking it as an issue because it just caused me about an hour of debugging and will probably burn others. The pip package here: https://pypi.python.org/pypi/kafka points at https://github.com/duanhongyi/kafka, which appears to be an (unofficial?) fork of the this codebase which contains a fundamental bug in SimpleProducer.
Is it possible to start publishing official copies of this repo to pypi and remove this fork?
It appears that the multiprocessor consumer puts all events back into a single queue. However, to process these events using multiprocessing, no queue is needed and there should be a way to define the code that is executed in each process upon event arrival.
I was trying to use Simple Consumer to do auto commit, but ran into number of issues.
SimpleConsumer(kafka, "my-group", TOPIC_NAME, True)
then the auto commit does not work.
SimpleConsumer(kafka, "my-group", TOPIC_NAME, True, auto_commit_every_n=100)
then the code fails with unable to find reference for ReentrantTimer
SimpleConsumer(kafka, "my-group", TOPIC_NAME, True, 100, 5000)
the code fails with unable to find reference to send_offset_commit_request which is defined incorrectly.
self.send_offset_commit_request(self.group, reqs)
SimpleConsumer(kafka, "my-group", TOPIC_NAME, True, auto_commit_every_n=100)
can proceed further.
But now I am stuck at
kafka-python/kafka/util.py", line 45, in relative_unpack
raise BufferUnderflowError("Not enough data left")
BufferUnderflowError: Not enough data left
diff --git a/kafka/consumer.py b/kafka/consumer.py
index f123113..7d658b1 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,6 +1,7 @@
from itertools import izip_longest, repeat
import logging
from threading import Lock
+from util import ReentrantTimer
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -126,7 +127,7 @@ class SimpleConsumer(object):
log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
offset, self.group, self.topic, partition))
reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
- resps = self.send_offset_commit_request(self.group, reqs)
+ resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
assert resp.error == 0
self.count_since_commit = 0
Not sure if this is the best way of creating a question but here goes....
I searched the code but couldn't see anything obvious. I tried topic.* but it just looped waiting for it to be created.
The Java / Scala clients have a createMessageStreamsByFilter method which also listens to new topics matching the wildcard. It'd be nice if your API did this too ;-)
Cheers
I found out during testing that the constructor makes a never ending recursive call with _load_metadata_for_topics
. This has caused me a bit of trouble when a consumer was started while a topic had offline partitions for a long period.
The code assumes a situation like this will only be temporary and thus sleeps and retries forever.
A consumer is affected by an offline partitions for a topic even if it isn't even interested in reading from it. i.e. topic xyz had no master (broker down) and consumer was only expected to read from topic abc. Since the initial _load_metadata_for_topics
is loading metadata for all topics, it affected even though it only needs the metadata from the one of the topic.
My suggestion (pull request will be coming) would be to simply avoid recursion altogether and rely on the fact the metadata will be loaded on the fly (lazy) in _get_leader_for_partition
if a request is made for a specific topic which was available at first. If it's still unavailable, the request will fail, as it should.
Hello,
I was wondering what is the recommended way to overcome the issue "Could not send request" on the call to commit().
SimpleConsumer always returns all the entries.
I am using Kafka 0.8.0.
Thanks,
Dima
Hi,
We're just switching to using kafka-python in our environment and we're noticing that it's pulling additional dependencies because the installation requires tox. I was wondering if we could remove this dependency on installation because it only seems to be used by tests (and it's already in the tests_requires
).
I'm happy to open a pull request to to remove this, I just wanted to be certain it's not required somewhere that I'm missing.
I use kafka v0.8, and I use the kafka-python 0.8branch
At first I guess the group in consumer is as same as group.id in kafka consumer
So I create 2 consumers, with same group name.
I think they should read different messages from kafka.
but both of them read all messages from kafka.
my code:https://gist.github.com/asdfsx/7956368
is there something wrong?
Due to use of @unittest.expectedFailure
@mumrah I took the branch based on #35 and #36 - and added support for something called drivers.
Basically, if you look at the code base, we use Queue(), Event(), sleep(), socket and multiprocessing.Process() to run the show - workers, multiple-consumers and co-ordination.
Now, Queue, Event, Pool, sleep, socket etc. are provided by multiprocessing, threading and gevent. So, the code can be used as-is (almost) for running it as different process, threads or gevent co-routines.
When SimpleConsumer or MultiConsumer is initialized, we can specify driver = thread / process / gevent and
The changes are very negligible. The only complexity is in managing the duplicate connections. When a consumer is initialized, an appropriate driver (KafkaDriver) instance is created, which contains the modules that must be used.
Most involved work was in test cases :-)
You can see the diffs here (on top of #35 and #36).
mahendra/kafka-python@mpcommit...gevent
Will send a pull request after they are merged. Later will provide driver support for Producer
I have a queue consist of 4 partitions, containing [49, 44, 30, 29] total : 152 messages.
Then i use SimpleComsumer connect to it, use peeding() to get messge num, test seek().
Here is my test code & result:
consumer = SimpleConsumer(kafka, "my-group", "my-replicated-topic", auto_commit=False)
print(consumer.pending()) #orgin result is 152
consumer.seek(0, 0)
print(consumer.pending()) #after seek 0 result is 152, correct
consumer.seek(1, 0)
print(consumer.pending()) #after seek 1 result is 150, should be 151
consumer.seek(10, 0)
print(consumer.pending()) #after seek 10 result is 138, shold be 142
consumer.seek(50, 0)
print(consumer.pending()) #after seek 50 result is 98, shold be 102
consumer.seek(100, 0)
print(consumer.pending()) #after seek 100 result is 48, shold be 52
Hi,
I'm having trouble using an asynchronous producer. I'm running the latest copy of the code from github.
I have a simple script which exhibits the problem:
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
if __name__ == '__main__':
kafka = KafkaClient("192.168.15.136", 9092)
producer = SimpleProducer(kafka, "test", async=True)
producer.send_messages("Hello")
The log output is as follows:
Traceback (most recent call last):
File "kafka_test.py", line 7, in <module>
producer = SimpleProducer(kafka, "test", async=True)
File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 176, in __init__
batch_send_every_t)
File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 72, in __init__
self.proc.start()
File "c:\python27\Lib\multiprocessing\process.py", line 130, in start
self._popen = Popen(self)
File "c:\python27\Lib\multiprocessing\forking.py", line 271, in __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "c:\python27\Lib\multiprocessing\forking.py", line 193, in dump
ForkingPickler(file, protocol).dump(obj)
File "c:\python27\Lib\pickle.py", line 224, in dump
self.save(obj)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\multiprocessing\forking.py", line 66, in dispatcher
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 401, in save_reduce
save(args)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 548, in save_tuple
save(element)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 686, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 548, in save_tuple
save(element)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 748, in save_global
(obj, module, name))
pickle.PicklingError: Can't pickle <built-in method recvfrom_into of _socket.socket object at 0x02D36F48>: it's not found as __main__.recvfrom_into
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:\python27\Lib\multiprocessing\forking.py", line 374, in main
self = load(from_parent)
File "c:\python27\Lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "c:\python27\Lib\pickle.py", line 858, in load
dispatch[key](self)
File "c:\python27\Lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError
The same code works if I change async to False. This is running on Python 2.7.3 on Windows against Kafka 0.8.0. Any suggestions gratefully received
This will be done as soon as #37 and #33 are merged. Should be few lines of code.
You can see the diffs here (on top of #37).
mahendra/kafka-python@gevent...gevent-producer
Hi,
my application uses the KafkaClient directly to make calls to Kafka. Once in while, mostly when calling send_fetch_request
, the application gets a BufferUnderflowError
most likely caused by truncated responses received from the broker. My issue lies with the fact I believe some forms of partial messages but not all of them are trapped by the client.
What I mean is, in decode_fetch_response
, only the code handling the MessageSet (_decode_message_set_iter
) will actually trap BufferUnderflowError
and raise StopIteration. On the other hand, all other calls relying on util.py methods to unpack data (e.g. read_short_string) will raise the BufferUnderflowError which will only be caught by my application.
Callstack example:
[...]
resps = client.send_fetch_request(reqs, fail_on_error=False)
File "~/lib/site-packages/kafka/client.py", line 228, in send_fetch_request
KafkaProtocol.decode_fetch_response)
File "~/lib/site-packages/kafka/client.py", line 160, in _send_broker_aware_request
for response in decoder_fn(response):
File "~/lib/site-packages/kafka/protocol.py", line 265, in decode_fetch_response
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
File "~/lib/site-packages/kafka/util.py", line 56, in relative_unpack
raise BufferUnderflowError("Not enough data left")
BufferUnderflowError: Not enough data left
Is this expected? As I mentioned, anything similar happening within _decode_message_set_iter
would end up with a StopIteration instead.
I am simply looking for the proper way to manage this in my app. I guess this would also apply to the SimpleConsumer implementation provided by the lib.
For reference, the explanation of truncated fetch responses: http://search-hadoop.com/m/4TaT40IBqM1
cheers,
marc
Need setup.py
so this can be installed as a package
I feel, we must add some retry support for producer.
If client.send_produce_request() fails, because a broker (for a partition) goes down, we can simply do
This will ensure that the new broker for a partition will come up and we can send the message again.
This can be done after #33 is merged.
I'm interested in doing continuous topic discovery. Based on ticket #38, it appears that integrating a zookeeper client is not preferred. Will the Kafka changes of moving offsets and partitions into the API instead of ZK allow for topic discovery? Any ideas on what would be a good work around for the time being?
Hi,
Trying to commit after consuming messages creates an exception, both with the latest kafka out of git and the latest release. I tried using auto_commit_every_n and I tried calling commit manually, but both produce this error:
File "/Users/rosejn/dm/hubble/venv/lib/python2.7/site-packages/kafka/conn.py", line 51, in _consume_response_iter
raise Exception("Got no response from Kafka")
Please let me know if I can help track this down.
Thanks,
Jeff
import json
from kafka.client import KafkaClient, FetchRequest, ProduceRequest
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
def produce_example(client):
producer = SimpleProducer(client, "my-topic")
producer.send_messages(json.dumps({'foo': 2, 'bar': "test me!!!!"}))
def consume_example(client):
consumer = SimpleConsumer(client, "test-group", "my-topic",
auto_commit_every_n=1)
for message in consumer:
print(message)
#consumer.commit()
def main():
client = KafkaClient("localhost", 9092)
produce_example(client)
produce_example(client)
consume_example(client)
main()
Until 0.8.1, auto commit should not be enabled by default
Kafka 0.8 is fairly far along in the development process. Should there be a kafka-python branch targeting the new protocol?
@mahendra
@jcrobak
@mrtheb
@motherhubbard
@stephenarmstrong
@rdiomar
@cosbynator
Hello, all. I've pulled this list of names from the open pull requests. I've opened this issue to let you all know my spare time for kafka-python has dropped to nearly zero. Between getting more involved in the Apache Kafka project, new duties at my day job, and a new member of my family, I'm finding it hard to keep up with the rate of pull requests.
As such, I need some help maintaining the code. If anyone is interested in becoming a contributor, please email me directly and we can discuss it.
Otherwise, I'm just going to start accepting pull requests as long as Travis CI says they're OK and the code doesn't look too awful.
Thanks for understanding, and thank you all for your contributions so far!
It would be great to have benchmark/performance tests to compare Kafka client implementations and make a decision which one to use.
I think we need to look at the way offsets are advanced and stored by the client, then go fix things rather than little one-off changes. I'm also doing this for my own sake to refresh myself with this code (it's been a while).
Suppose we have the following data in Kafka
Partition 0
========
Message 4
Message 5
Message 6
Message 7
Partition 1
========
Message 3
Message 4
Message 5
Message 6
And our starting (partition, offset) values are: (0, 4), (1, 3).
When the consumer __iter__
is called, it sets up a list of sub-iterators for each partition with their current offsets. So at this point, we have __iter_partition__(0, 4)
and __iter_partition__(1, 3)
.
Here is an annotated version of the loop:
while True:
# A
req = FetchRequest(self.topic, partition, offset, 1024)
(resp,) = self.client.send_fetch_request([req])
assert resp.topic == self.topic
assert resp.partition == partition
next_offset = None
for message in resp.messages:
next_offset = message.offset
# B
yield message
self.offsets[partition] = message.offset
# C
# D
if next_offset is None:
break
else:
offset = next_offset + 1
Values of offset, next_offset, and self.offsets[partition]
offset next_offset self.offsets[partition]
A 4 - 4
B1 4 4 4
C1 4 4 4
B2 4 5 4
C2 4 5 5
D 4 5 5
A 6 - 5
B1 6 6 5
C1 6 6 6
From a consumer point of view, the state of self.offsets
reflects the offset of the previous message. Since the generator will pause execution immediately after the yield
, the state of self.offsets
will not be updated until the next iteration of the generator. The initial thinking behind this was that you only advance the offsets after the message has been "processed" (whatever that means). However, this means if you commit the offsets in self.offset
, they will lag behind.
I wasn't able to get it to work with kafka 0.7.3 running on top of zookeeper. Barfed when instantiating the KafkaClient object. Said it failed to connect.
My server is definitely running - I was able to get another python kafka client (https://github.com/dsully/pykafka) to work. But it seems like this is the official client and I want to be a good citizen here.
using older version of kafka-python (https://github.com/kngenie/kafka-python/tree/0.8.0beta1) with Kafka 0.8.0beta. but I believe this is relevant to kafka-python master, too.
observed symptom: fetch request fails with BufferUnderflowError while decoding response message. received message data looks okay, except for it's missing a few bytes at the tail. error happens when message size (first int32 = Size) is one of (4097, 4098, 4099, 4100).
cause: _consume_response_iter() is calculating messageSize wrong. no need to subtract 4 from Size.
In consumer.py, if you un-comment the lines (for 0.8.1) to enable OffsetFetchRequest on consumer init, the following error occurs at the broker
kafka.common.KafkaException: Wrong request type 7
at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:51)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:49)
at kafka.network.Processor.read(SocketServer.scala:345)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Unknown Source)
and kafka-python raises an exception
Noticed this in kafka trunk as well as 0.8
Running against Kafka 0.8.0-Beta1
When the message being consumed is greater than the client.bufsize (in consumer.py) the fetch size is grown (by factor of 1.5) starting from 1.
Once this value becomes greater than client.bufsize (and large enough to contain the message from Kafka) the fetch request hangs in Kafka and socket timeouts are seen in the python Kafka client
The call that hangs in Kafka can be seen below:
TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 25; ClientId: kafka-python; ReplicaId: -1; MaxWait: 1000000 ms; MinBytes: 4987 bytes; RequestInfo: [topic,0] -> PartitionFetchInfo(0,4096) (kafka.network.RequestChannel$)
(this is then followed by a socket time out)
As the minbytes is greater than then partition fetch request range; kafka then hangs (I guess waiting to read more bytes) which never come.
If I grow the client.bufsize and leave the fetch_size at 1 I can then consume my message using the python client.
I'll do some more reading when I have time to look at this but I thought i'd post my observations for your comments.
In addition to Gzip, we should support Snappy compression
When sending a batch of messages in, it starts to fail at some point. E.g., 1000 works, but 10000 fails
See subject :)
I just have a SimpleConsumer trying to read messages from Kafka.
I am not able to get beyond the 1st message
Traceback :
File "testKaf.py", line 22, in testme
for m in q1:
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer.py", line 330, in iter
yield it.next()
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer.py", line 371, in iter_partition
min_bytes=fetch_size)
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 277, in send_fetch_request
KafkaProtocol.decode_fetch_response)
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 166, in _send_broker_aware_request
correlation_id=requestId, payloads=payloads)
File "/usr/local/lib/python2.7/dist-packages/kafka/protocol.py", line 249, in encode_fetch_request
len(grouped_payloads))
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
I have been using kafka-python on a projectwhere I am streaming json strings through a kafka topic. This seems to work well but every so often long before a topic is empty the consumer.get_messages (using SimpleConsumer) starts given me back empty arrays.
If I try and base64 encode the data before its sent then any message I try and read from the consumer comes back as an empty array. Is there any way that certain character combinations in the message could be causing this? Or is it a message size issue?
When I run the kafka-console-consumer.sh on the same topic it has no problem.
Using kafka 0.8
Hi,
I'm new to kafka, I'm using kafka-0.8.0
the code looks like
kafka = KafkaClient("localhost", 9092)
consumer = SimpleConsumer(kafka, "my-group", "test")
for message in consumer:
print message
When runing ,it'll be stuck here
OffsetAndMessage(offset=166799, message=Message(magic=0, attributes=0, key=None, value='{"topic": "test", "content": "test"}'))
Traceback (most recent call last):
File "run.py", line 8, in <module>
kfk.recv_message()
File "/usr/local/websoft/kafka_server_side/kfk.py", line 30, in recv_message
for message in self.consumer:
File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 340, in __iter__
File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 161, in _auto_commit
File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 145, in commit
File "build/bdist.linux-x86_64/egg/kafka/client.py", line 290, in send_offset_commit_request
File "build/bdist.linux-x86_64/egg/kafka/client.py", line 168, in _send_broker_aware_request
File "build/bdist.linux-x86_64/egg/kafka/conn.py", line 83, in recv
File "build/bdist.linux-x86_64/egg/kafka/conn.py", line 37, in _consume_response
File "build/bdist.linux-x86_64/egg/kafka/conn.py", line 51, in _consume_response_iter
Exception: Got no response from Kafka
Exception NameError: "global name 'ConsumerNoMoreData' is not defined" in <generator object __iter_partition__ at 0x11b6190> ignored
Producer is working fine and it never stop, I can get the message from kafka-console client .
Should SimpleConsumer block when there are no messages to be read? Based on the examples, I thought that would be the case but when I run them, they seem to exit as soon as all of the messages have been read.
We are having some problems where our process reads from a kafka queue. It read for a while and then gets the following exception. Only solution we have so far is to export the queue data using the command line client, shut down kafka and zookeeper, delete the queue and then restart and re-import the queue data.
Exception
'i' format requires -2147483648 <= number <= 2147483647
Traceback (most recent call last):
File "/home/socialtracker/SocialTracking/common/basedaemon.py", line 115, in run
for message in consumer:
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/consumer.py", line 329, in iter
yield (partition, it.next())
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/consumer.py", line 372, in iter_partition
min_bytes=fetch_size)
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/client.py", line 277, in send_fetch_request
KafkaProtocol.decode_fetch_response)
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/client.py", line 166, in _send_broker_aware_request
correlation_id=requestId, payloads=payloads)
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/protocol.py", line 249, in encode_fetch_request
len(grouped_payloads))
error: 'i' format requires -2147483648 <= number <= 2147483647
None
The SimpleConsumer has some commented-out code for pulling offsets from the brokers: https://github.com/mumrah/kafka-python/blob/33cde520de9067845d4c7159a2c2834846e1957f/kafka/consumer.py#L100
Is this commented out on purpose? It's unclear to me if "Uncomment for 0.8.1" is referring to a version of Kafka or a version of kafka-python.
I tried to uncomment this, and I get an error in encode_offset_fetch_request
/write_short_string
. Is offset handling broken or should I be able to get this to work?
I am using a work-around now of consumer.seek(0, 0)
, but this is not ideal, since I have to reprocess every entry each run.
When trying to seek with whence
equal to 1
we get an exception
----> 1 consumer.seek(1, 1)
/usr/local/lib/python2.7/dist-packages/kafka/consumer.pyc in seek(self, offset, whence)
79 if whence == 1: # relative to current position
80 for partition, _offset in self.offsets.items():
---> 81 self.offset[partition] = _offset + offset
82 elif whence in (0, 2): # relative to beginning or end
83 # divide the request offset by number of partitions, distribute the remained evenly
AttributeError: 'SimpleConsumer' object has no attribute 'offset'
It looks like self.offset
should be self.offsets
Right now, when decoding Messages, we don't do anything if we encounter Messages that have been compressed. We should check the "attributes" mask for the compression codec and add support
Hi,
I'm aware that there is no current support for zookeeper. But how can I connect to multiple brokers within the same KafkaClient? Passing an argument like "broker1,broker2" doesn't work.
Thanks in advance,
Andre
Instead of relying on a single Kafka broker for bootstrapping the cluster, we could accept a list of brokers.
Implement a Queue.Queue
that delegates producing/consuming to two multiprocessing.Process
I've been playing with your module (awesome job by the way!) with everything running locally. I now have an chance to use this with a remote kafka server but running into some problems. Perhaps you can help me.
from kafka.client import KafkaClient
from kafka.client import SimpleProducer
client = KafkaClient('x.x.x.x', 9092)
producer = SimpleProducer(client, 'test')
Up to this point everything works fine, I don't see any tracebacks or any error messages on the kafka server; however, when I try and send a message producer.send_messages('this is a test')
I get a traceback
----> 1 producer.send_messages('this is a test')
/usr/local/lib/python2.7/dist-packages/kafka/producer.pyc in send_messages(self, *msg)
20 req = ProduceRequest(self.topic, self.next_partition.next(),
21 messages=[create_message(m) for m in msg])
---> 22 resp = self.client.send_produce_request([req])[0]
23 assert resp.error == 0
/usr/local/lib/python2.7/dist-packages/kafka/client.pyc in send_produce_request(self, payloads, acks, timeout, fail_on_error, callback)
164 resps = self._send_broker_aware_request(payloads,
165 partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout),
--> 166 KafkaProtocol.decode_produce_response)
167 out = []
168 for resp in resps:
/usr/local/lib/python2.7/dist-packages/kafka/client.pyc in _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn)
123 # For each broker, send the list of request payloads
124 for broker, payloads in payloads_by_broker.items():
--> 125 conn = self._get_conn_for_broker(broker)
126 requestId = self._next_id()
127 request = encoder_fn(client_id=KafkaClient.CLIENT_ID, correlation_id=requestId, payloads=payloads)
/usr/local/lib/python2.7/dist-packages/kafka/client.pyc in _get_conn_for_broker(self, broker)
39 "Get or create a connection to a broker"
40 if (broker.host, broker.port) not in self.conns:
---> 41 self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize)
42 return self.conns[(broker.host, broker.port)]
43
/usr/local/lib/python2.7/dist-packages/kafka/conn.pyc in __init__(self, host, port, bufsize)
19 self.bufsize = bufsize
20 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
---> 21 self._sock.connect((host, port))
22 self._sock.settimeout(10)
23
/usr/lib/python2.7/socket.pyc in meth(name, self, *args)
222
223 def meth(name,self,*args):
--> 224 return getattr(self._sock,name)(*args)
225
226 for _m in _socketmethods:
error: [Errno 111] Connection refused
I did a little digging with the client object client.brokers
and saw {0: BrokerMetadata(nodeId=0, host='0.0.0.0', port=9092)}
host='0.0.0.0'
I'm not too sure why it's using localhost
So I tried
from kafka.common import BrokerMetadata
newBroker = BrokerMetadata(0, 'x.x.x.x', 9092)
client.brokers[0] = newBroker
producer = SimpleProducer(client, 'test')
producer.send_messages('this is a test')
and got the same traceback as before
I'm still a little new to kafka, so it's probably something trivial but the current setup I have is zookeeper and a kafka server with one broker and kafka's console consumer running on a remote box1. I'm running your module from another box2. Do I need to have something else running box2? I can telnet into box1 with the host:port with no problems. I can also take the client
and do a client.close()
and watch kafka on box2 tell me Closing socket connection to x.x.x.x
So KafkaClient
is connecting to it.
Both boxes are running Ubuntu 12.04, python 2.7 and Kafka 0.8
box2 has kafka 0.8
box1 has the head of your repo
Implemented simple Zookeeper support. Using Kazoo.
This was built on top of the gevent patch (#37).
You can have a look at the code here:
mahendra/kafka-python@gevent...zookeeper
Not tested yet. Will keep updating the code as I progress with the tests.
No handlers could be found for logger "kafka"
Traceback (most recent call last):
File "run.py", line 6, in
MQTT_consumer.factory()
File "/usr/local/kafka/consumer.py", line 21, in factory
super(MQTT_Consumer, self).factory();
File "/usr/local//kafka/kfk.py", line 15, in factory
self.kafka = KafkaClient(conf["Kafka"]["host"], conf["Kafka"]["port"])
File "build/bdist.linux-x86_64/egg/kafka/client.py", line 32, in init
File "build/bdist.linux-x86_64/egg/kafka/client.py", line 69, in _load_metadata_for_topics
Exception: All servers failed to process request
it's working well with kafka 0.8 but this.
how to fix it...
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.