Giter VIP home page Giter VIP logo

kafka-python's Introduction

Kafka Python client

image

image

image

image

image

image

image

DUE TO ISSUES WITH RELEASES, IT IS SUGGESTED TO USE https://github.com/wbarnha/kafka-python-ng FOR THE TIME BEING

Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators).

kafka-python is best used with newer brokers (0.9+), but is backwards-compatible with older versions (to 0.8.0). Some features will only be enabled on newer brokers. For example, fully coordinated consumer groups -- i.e., dynamic partition assignment to multiple consumers in the same group -- requires use of 0.9+ kafka brokers. Supporting this feature for earlier broker releases would require writing and maintaining custom leadership election and membership / health check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. See <https://kafka-python.readthedocs.io/en/master/compatibility.html> for more details.

Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help.

>>> pip install kafka-python

KafkaConsumer

KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+.

See <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html> for API and configuration details.

The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value:

>>> from kafka import KafkaConsumer >>> consumer = KafkaConsumer('my_favorite_topic') >>> for msg in consumer: ... print (msg)

>>> # join a consumer group for dynamic partition assignment and offset commits >>> from kafka import KafkaConsumer >>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') >>> for msg in consumer: ... print (msg)

>>> # manually assign the partition list for the consumer >>> from kafka import TopicPartition >>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') >>> consumer.assign([TopicPartition('foobar', 2)]) >>> msg = next(consumer)

>>> # Deserialize msgpack-encoded values >>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) >>> consumer.subscribe(['msgpackfoo']) >>> for msg in consumer: ... assert isinstance(msg.value, dict)

>>> # Access record headers. The returned value is a list of tuples >>> # with str, bytes for key and value >>> for msg in consumer: ... print (msg.headers)

>>> # Get consumer metrics >>> metrics = consumer.metrics()

KafkaProducer

KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. See <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html> for more details.

>>> from kafka import KafkaProducer >>> producer = KafkaProducer(bootstrap_servers='localhost:1234') >>> for _ in range(100): ... producer.send('foobar', b'some_message_bytes')

>>> # Block until a single message is sent (or timeout) >>> future = producer.send('foobar', b'another_message') >>> result = future.get(timeout=60)

>>> # Block until all pending messages are at least put on the network >>> # NOTE: This does not guarantee delivery or success! It is really >>> # only useful if you configure internal batching using linger_ms >>> producer.flush()

>>> # Use a key for hashed-partitioning >>> producer.send('foobar', key=b'foo', value=b'bar')

>>> # Serialize json messages >>> import json >>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) >>> producer.send('fizzbuzz', {'foo': 'bar'})

>>> # Serialize string keys >>> producer = KafkaProducer(key_serializer=str.encode) >>> producer.send('flipflap', key='ping', value=b'1234')

>>> # Compress messages >>> producer = KafkaProducer(compression_type='gzip') >>> for i in range(1000): ... producer.send('foobar', b'msg %d' % i)

>>> # Include record headers. The format is list of tuples with string key >>> # and bytes value. >>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])

>>> # Get producer performance metrics >>> metrics = producer.metrics()

Thread safety

The KafkaProducer can be used across threads without issue, unlike the KafkaConsumer which cannot.

While it is possible to use the KafkaConsumer in a thread-local manner, multiprocessing is recommended.

Compression

kafka-python supports the following compression formats:

  • gzip
  • LZ4
  • Snappy
  • Zstandard (zstd)

gzip is supported natively, the others require installing additional libraries. See <https://kafka-python.readthedocs.io/en/master/install.html> for more information.

Optimized CRC32 Validation

Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure python implementation for compatibility. To improve performance for high-throughput applications, kafka-python will use crc32c for optimized native code if installed. See <https://kafka-python.readthedocs.io/en/master/install.html> for installation instructions. See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.

Protocol

A secondary goal of kafka-python is to provide an easy-to-use protocol layer for interacting with kafka brokers via the python repl. This is useful for testing, probing, and general experimentation. The protocol support is leveraged to enable a KafkaClient.check_version() method that probes a kafka broker and attempts to identify which version it is running (0.8.0 to 2.6+).

kafka-python's People

Contributors

alexcb avatar asdaraujo avatar aynroot avatar baisang avatar brutasse avatar carsonip avatar cclauss avatar dependabot[bot] avatar dpkp avatar ecanzonieri avatar jeffwidman avatar mahendra avatar mdaniel avatar mrtheb avatar ms7s avatar mumrah avatar mutability avatar rdiomar avatar sandello avatar se7entyse7en avatar sontek avatar ssaamm avatar toddpalino avatar turtlesoupy avatar tvoinarovskyi avatar tylerlubeck avatar vshlapakov avatar wbarnha avatar wizzat avatar zackdever avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-python's Issues

SimpleConsumer.seek with whence == 1

When trying to seek with whence equal to 1 we get an exception

----> 1 consumer.seek(1, 1)

/usr/local/lib/python2.7/dist-packages/kafka/consumer.pyc in seek(self, offset, whence)
     79         if whence == 1: # relative to current position
     80             for partition, _offset in self.offsets.items():
---> 81                 self.offset[partition] = _offset + offset
     82         elif whence in (0, 2): # relative to beginning or end
     83             # divide the request offset by number of partitions, distribute the remained evenly

AttributeError: 'SimpleConsumer' object has no attribute 'offset'

It looks like self.offset should be self.offsets

SimpleConsumer.commit() returns FailedPayloadsException

kafka version 0.8.0, kafka-python version: master

Traceback (most recent call last):
File "test_kafka.py", line 88, in
test2()
File "test_kafka.py", line 75, in test2
receive(1000)
File "test_kafka.py", line 48, in receive
consumer.commit()
File "/home/valens/test/kafka/consumer.py", line 144, in commit
resps = self.client.send_offset_commit_request(self.group, reqs)
File "/home/valens/test/kafka/client.py", line 318, in send_offset_commit_request
resps = self._send_broker_aware_request(payloads, encoder, decoder)
File "/home/valens/test/kafka/client.py", line 184, in _send_broker_aware_request
raise FailedPayloadsException(failed_payloads)
kafka.common.FailedPayloadsException: [OffsetCommitRequest(topic='performance_test', partition=0, offset=499, metadata=None), OffsetCommitRequest(topic='performance_test', partition=1, offset=499, metadata=None)]

[2013-12-13 11:53:32,137] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(Unknown Source)
at java.nio.ByteBuffer.get(Unknown Source)
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:42)
at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:41)
at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265)
at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:41)
at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:40)
at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:40)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:49)
at kafka.network.Processor.read(SocketServer.scala:353)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Unknown Source)
[2013-12-13 11:53:32,180] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

commit() is called mamually while every 1000 msg consumed. the topic has 2 partitions.

auto_commit() also returns the same error.

Please give me a hand...Thx

Support for gevent, process and threaded drivers

@mumrah I took the branch based on #35 and #36 - and added support for something called drivers.

Basically, if you look at the code base, we use Queue(), Event(), sleep(), socket and multiprocessing.Process() to run the show - workers, multiple-consumers and co-ordination.

Now, Queue, Event, Pool, sleep, socket etc. are provided by multiprocessing, threading and gevent. So, the code can be used as-is (almost) for running it as different process, threads or gevent co-routines.

When SimpleConsumer or MultiConsumer is initialized, we can specify driver = thread / process / gevent and

  • in SimpleConsumer - commit thread will run as a thread/process/gevent-coroutine
  • In MultiConsumer, the various fetches (among which the partitions are distributed) will be run under different threads, processes or gevent co-routines.

The changes are very negligible. The only complexity is in managing the duplicate connections. When a consumer is initialized, an appropriate driver (KafkaDriver) instance is created, which contains the modules that must be used.

Most involved work was in test cases :-)

You can see the diffs here (on top of #35 and #36).
mahendra/kafka-python@mpcommit...gevent

Will send a pull request after they are merged. Later will provide driver support for Producer

Consumer offsets discussion

I think we need to look at the way offsets are advanced and stored by the client, then go fix things rather than little one-off changes. I'm also doing this for my own sake to refresh myself with this code (it's been a while).

Suppose we have the following data in Kafka

Partition 0
========
Message 4
Message 5
Message 6
Message 7

Partition 1
========
Message 3
Message 4
Message 5
Message 6

And our starting (partition, offset) values are: (0, 4), (1, 3).

When the consumer __iter__ is called, it sets up a list of sub-iterators for each partition with their current offsets. So at this point, we have __iter_partition__(0, 4) and __iter_partition__(1, 3).

Here is an annotated version of the loop:

while True:
    # A
    req = FetchRequest(self.topic, partition, offset, 1024)
    (resp,) = self.client.send_fetch_request([req])
    assert resp.topic == self.topic
    assert resp.partition == partition
    next_offset = None
    for message in resp.messages:
        next_offset = message.offset
        # B
        yield message
        self.offsets[partition] = message.offset
        # C
    # D
    if next_offset is None:
        break
    else:
        offset = next_offset + 1

Values of offset, next_offset, and self.offsets[partition]

    offset  next_offset     self.offsets[partition]
A   4       -               4
B1  4       4               4
C1  4       4               4
B2  4       5               4
C2  4       5               5
D   4       5               5
A   6       -               5
B1  6       6               5
C1  6       6               6

From a consumer point of view, the state of self.offsets reflects the offset of the previous message. Since the generator will pause execution immediately after the yield, the state of self.offsets will not be updated until the next iteration of the generator. The initial thinking behind this was that you only advance the offsets after the message has been "processed" (whatever that means). However, this means if you commit the offsets in self.offset, they will lag behind.

Create consumers based on wild carded topics

Not sure if this is the best way of creating a question but here goes....

I searched the code but couldn't see anything obvious. I tried topic.* but it just looped waiting for it to be created.

The Java / Scala clients have a createMessageStreamsByFilter method which also listens to new topics matching the wildcard. It'd be nice if your API did this too ;-)

Cheers

Group in consumer, how does it work?

I use kafka v0.8, and I use the kafka-python 0.8branch
At first I guess the group in consumer is as same as group.id in kafka consumer
So I create 2 consumers, with same group name.
I think they should read different messages from kafka.
but both of them read all messages from kafka.
my code:https://gist.github.com/asdfsx/7956368
is there something wrong?

Kafka 0.8 Support

Kafka 0.8 is fairly far along in the development process. Should there be a kafka-python branch targeting the new protocol?

Install requires tox?

Hi,

We're just switching to using kafka-python in our environment and we're noticing that it's pulling additional dependencies because the installation requires tox. I was wondering if we could remove this dependency on installation because it only seems to be used by tests (and it's already in the tests_requires).

I'm happy to open a pull request to to remove this, I just wanted to be certain it's not required somewhere that I'm missing.

Does the newest version support Kafka 0.7.2

No handlers could be found for logger "kafka"
Traceback (most recent call last):
File "run.py", line 6, in
MQTT_consumer.factory()
File "/usr/local/kafka/consumer.py", line 21, in factory
super(MQTT_Consumer, self).factory();
File "/usr/local//kafka/kfk.py", line 15, in factory
self.kafka = KafkaClient(conf["Kafka"]["host"], conf["Kafka"]["port"])
File "build/bdist.linux-x86_64/egg/kafka/client.py", line 32, in init
File "build/bdist.linux-x86_64/egg/kafka/client.py", line 69, in _load_metadata_for_topics
Exception: All servers failed to process request

it's working well with kafka 0.8 but this.
how to fix it...

blocking consumer

Should SimpleConsumer block when there are no messages to be read? Based on the examples, I thought that would be the case but when I run them, they seem to exit as soon as all of the messages have been read.

Issues with SimpleConsumer when using auto commit

I was trying to use Simple Consumer to do auto commit, but ran into number of issues.

  • By default auto_commit is set to be false which is counter intuitive as the default value of auto commit is true in server config.
  • If consumer is created using

SimpleConsumer(kafka, "my-group", TOPIC_NAME, True)

then the auto commit does not work.

  • If consumer is created using

SimpleConsumer(kafka, "my-group", TOPIC_NAME, True, auto_commit_every_n=100)

then the code fails with unable to find reference for ReentrantTimer

  • If the code is changed to use

SimpleConsumer(kafka, "my-group", TOPIC_NAME, True, 100, 5000)

the code fails with unable to find reference to send_offset_commit_request which is defined incorrectly.

self.send_offset_commit_request(self.group, reqs)

  • I have enclosed the diff below that I had to do to ensure that

SimpleConsumer(kafka, "my-group", TOPIC_NAME, True, auto_commit_every_n=100)

can proceed further.

But now I am stuck at

kafka-python/kafka/util.py", line 45, in relative_unpack
    raise BufferUnderflowError("Not enough data left")
BufferUnderflowError: Not enough data left

Code diff to workaround issues mentioned above

diff --git a/kafka/consumer.py b/kafka/consumer.py
index f123113..7d658b1 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,6 +1,7 @@
 from itertools import izip_longest, repeat
 import logging
 from threading import Lock
+from util import ReentrantTimer

 from kafka.common import (
     ErrorMapping, FetchRequest,
@@ -126,7 +127,7 @@ class SimpleConsumer(object):
                     log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
                         offset, self.group, self.topic, partition))
                     reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
-            resps = self.send_offset_commit_request(self.group, reqs)
+            resps = self.client.send_offset_commit_request(self.group, reqs)
             for resp in resps:
                 assert resp.error == 0
             self.count_since_commit = 0

Issue with SimpleConsumer

Hi,
I'm new to kafka, I'm using kafka-0.8.0
the code looks like

kafka = KafkaClient("localhost", 9092)
consumer = SimpleConsumer(kafka, "my-group", "test")
for message in consumer:
    print message

When runing ,it'll be stuck here

OffsetAndMessage(offset=166799, message=Message(magic=0, attributes=0, key=None, value='{"topic": "test", "content": "test"}'))
Traceback (most recent call last):
  File "run.py", line 8, in <module>
    kfk.recv_message()
  File "/usr/local/websoft/kafka_server_side/kfk.py", line 30, in recv_message
    for message in self.consumer:
  File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 340, in __iter__
  File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 161, in _auto_commit
  File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 145, in commit
  File "build/bdist.linux-x86_64/egg/kafka/client.py", line 290, in send_offset_commit_request
  File "build/bdist.linux-x86_64/egg/kafka/client.py", line 168, in _send_broker_aware_request
  File "build/bdist.linux-x86_64/egg/kafka/conn.py", line 83, in recv
  File "build/bdist.linux-x86_64/egg/kafka/conn.py", line 37, in _consume_response
  File "build/bdist.linux-x86_64/egg/kafka/conn.py", line 51, in _consume_response_iter
Exception: Got no response from Kafka
Exception NameError: "global name 'ConsumerNoMoreData' is not defined" in <generator object __iter_partition__ at 0x11b6190> ignored

Producer is working fine and it never stop, I can get the message from kafka-console client .

Exception 'i' format requires -2147483648 <= number <= 2147483647

We are having some problems where our process reads from a kafka queue. It read for a while and then gets the following exception. Only solution we have so far is to export the queue data using the command line client, shut down kafka and zookeeper, delete the queue and then restart and re-import the queue data.


Exception
'i' format requires -2147483648 <= number <= 2147483647
Traceback (most recent call last):
File "/home/socialtracker/SocialTracking/common/basedaemon.py", line 115, in run
for message in consumer:
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/consumer.py", line 329, in iter
yield (partition, it.next())
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/consumer.py", line 372, in iter_partition
min_bytes=fetch_size)
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/client.py", line 277, in send_fetch_request
KafkaProtocol.decode_fetch_response)
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/client.py", line 166, in _send_broker_aware_request
correlation_id=requestId, payloads=payloads)
File "/home/socialtracker/SocialTracking/env/local/lib/python2.7/site-packages/kafka/protocol.py", line 249, in encode_fetch_request
len(grouped_payloads))
error: 'i' format requires -2147483648 <= number <= 2147483647
None

Does this still work with 0.7.3?

I wasn't able to get it to work with kafka 0.7.3 running on top of zookeeper. Barfed when instantiating the KafkaClient object. Said it failed to connect.

My server is definitely running - I was able to get another python kafka client (https://github.com/dsully/pykafka) to work. But it seems like this is the official client and I want to be a good citizen here.

Infinite recursion in _load_metadata_for_topics is problematic when metadata is unstable in Kafka

I found out during testing that the constructor makes a never ending recursive call with _load_metadata_for_topics. This has caused me a bit of trouble when a consumer was started while a topic had offline partitions for a long period.

The code assumes a situation like this will only be temporary and thus sleeps and retries forever.

A consumer is affected by an offline partitions for a topic even if it isn't even interested in reading from it. i.e. topic xyz had no master (broker down) and consumer was only expected to read from topic abc. Since the initial _load_metadata_for_topics is loading metadata for all topics, it affected even though it only needs the metadata from the one of the topic.

My suggestion (pull request will be coming) would be to simply avoid recursion altogether and rely on the fact the metadata will be loaded on the fly (lazy) in _get_leader_for_partition if a request is made for a specific topic which was available at first. If it's still unavailable, the request will fail, as it should.

Call for contributors

@mahendra
@jcrobak
@mrtheb
@motherhubbard
@stephenarmstrong
@rdiomar
@cosbynator

Hello, all. I've pulled this list of names from the open pull requests. I've opened this issue to let you all know my spare time for kafka-python has dropped to nearly zero. Between getting more involved in the Apache Kafka project, new duties at my day job, and a new member of my family, I'm finding it hard to keep up with the rate of pull requests.

As such, I need some help maintaining the code. If anyone is interested in becoming a contributor, please email me directly and we can discuss it.

Otherwise, I'm just going to start accepting pull requests as long as Travis CI says they're OK and the code doesn't look too awful.

Thanks for understanding, and thank you all for your contributions so far!

Add retry support for producer

I feel, we must add some retry support for producer.

If client.send_produce_request() fails, because a broker (for a partition) goes down, we can simply do

  • conn.close() and
  • client.conns.pop(broker)
  • try sending the message again, possibly after a retry delay.

This will ensure that the new broker for a partition will come up and we can send the message again.

This can be done after #33 is merged.

Fetching messages of at least 4k (by default) hangs in Kafka. Socket timeouts are then seen

Running against Kafka 0.8.0-Beta1

When the message being consumed is greater than the client.bufsize (in consumer.py) the fetch size is grown (by factor of 1.5) starting from 1.

Once this value becomes greater than client.bufsize (and large enough to contain the message from Kafka) the fetch request hangs in Kafka and socket timeouts are seen in the python Kafka client

The call that hangs in Kafka can be seen below:
TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 25; ClientId: kafka-python; ReplicaId: -1; MaxWait: 1000000 ms; MinBytes: 4987 bytes; RequestInfo: [topic,0] -> PartitionFetchInfo(0,4096) (kafka.network.RequestChannel$)
(this is then followed by a socket time out)

As the minbytes is greater than then partition fetch request range; kafka then hangs (I guess waiting to read more bytes) which never come.

If I grow the client.bufsize and leave the fetch_size at 1 I can then consume my message using the python client.

I'll do some more reading when I have time to look at this but I thought i'd post my observations for your comments.

Issue in offset fetch

In consumer.py, if you un-comment the lines (for 0.8.1) to enable OffsetFetchRequest on consumer init, the following error occurs at the broker

kafka.common.KafkaException: Wrong request type 7
    at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:51)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:49)
    at kafka.network.Processor.read(SocketServer.scala:345)
    at kafka.network.Processor.run(SocketServer.scala:245)
    at java.lang.Thread.run(Unknown Source)

and kafka-python raises an exception

Noticed this in kafka trunk as well as 0.8

Access multiple brokers

Hi,

I'm aware that there is no current support for zookeeper. But how can I connect to multiple brokers within the same KafkaClient? Passing an argument like "broker1,broker2" doesn't work.

Thanks in advance,

Andre

Consumer dies after getting 1st message - payload size keeps increasing

I just have a SimpleConsumer trying to read messages from Kafka.

I am not able to get beyond the 1st message

Traceback :

File "testKaf.py", line 22, in testme
for m in q1:
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer.py", line 330, in iter
yield it.next()
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer.py", line 371, in iter_partition
min_bytes=fetch_size)
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 277, in send_fetch_request
KafkaProtocol.decode_fetch_response)
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 166, in _send_broker_aware_request
correlation_id=requestId, payloads=payloads)
File "/usr/local/lib/python2.7/dist-packages/kafka/protocol.py", line 249, in encode_fetch_request
len(grouped_payloads))
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

Support Gzipped messages

Right now, when decoding Messages, we don't do anything if we encounter Messages that have been compressed. We should check the "attributes" mask for the compression codec and add support

SimpleConsumer seek & pending

I have a queue consist of 4 partitions, containing [49, 44, 30, 29] total : 152 messages.
Then i use SimpleComsumer connect to it, use peeding() to get messge num, test seek().

Here is my test code & result:
consumer = SimpleConsumer(kafka, "my-group", "my-replicated-topic", auto_commit=False)
print(consumer.pending()) #orgin result is 152
consumer.seek(0, 0)
print(consumer.pending()) #after seek 0 result is 152, correct
consumer.seek(1, 0)
print(consumer.pending()) #after seek 1 result is 150, should be 151
consumer.seek(10, 0)
print(consumer.pending()) #after seek 10 result is 138, shold be 142
consumer.seek(50, 0)
print(consumer.pending()) #after seek 50 result is 98, shold be 102
consumer.seek(100, 0)
print(consumer.pending()) #after seek 100 result is 48, shold be 52

Connecting to remote Kafka server

I've been playing with your module (awesome job by the way!) with everything running locally. I now have an chance to use this with a remote kafka server but running into some problems. Perhaps you can help me.

from kafka.client import KafkaClient
from kafka.client import SimpleProducer

client = KafkaClient('x.x.x.x', 9092)
producer = SimpleProducer(client, 'test')

Up to this point everything works fine, I don't see any tracebacks or any error messages on the kafka server; however, when I try and send a message producer.send_messages('this is a test') I get a traceback

----> 1 producer.send_messages('this is a test')

/usr/local/lib/python2.7/dist-packages/kafka/producer.pyc in send_messages(self, *msg)
     20         req = ProduceRequest(self.topic, self.next_partition.next(),
     21             messages=[create_message(m) for m in msg])
---> 22         resp = self.client.send_produce_request([req])[0]
     23         assert resp.error == 0

/usr/local/lib/python2.7/dist-packages/kafka/client.pyc in send_produce_request(self, payloads, acks, timeout, fail_on_error, callback)
    164         resps = self._send_broker_aware_request(payloads,
    165                     partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout),
--> 166                     KafkaProtocol.decode_produce_response)
    167         out = []
    168         for resp in resps:

/usr/local/lib/python2.7/dist-packages/kafka/client.pyc in _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn)
    123         # For each broker, send the list of request payloads
    124         for broker, payloads in payloads_by_broker.items():
--> 125             conn = self._get_conn_for_broker(broker)
    126             requestId = self._next_id()
    127             request = encoder_fn(client_id=KafkaClient.CLIENT_ID, correlation_id=requestId, payloads=payloads)

/usr/local/lib/python2.7/dist-packages/kafka/client.pyc in _get_conn_for_broker(self, broker)
     39         "Get or create a connection to a broker"
     40         if (broker.host, broker.port) not in self.conns:
---> 41             self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize)
     42         return self.conns[(broker.host, broker.port)]
     43 

/usr/local/lib/python2.7/dist-packages/kafka/conn.pyc in __init__(self, host, port, bufsize)
     19         self.bufsize = bufsize
     20         self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
---> 21         self._sock.connect((host, port))
     22         self._sock.settimeout(10)
     23 

/usr/lib/python2.7/socket.pyc in meth(name, self, *args)
    222 
    223 def meth(name,self,*args):
--> 224     return getattr(self._sock,name)(*args)
    225 
    226 for _m in _socketmethods:

error: [Errno 111] Connection refused

I did a little digging with the client object client.brokers and saw {0: BrokerMetadata(nodeId=0, host='0.0.0.0', port=9092)}

host='0.0.0.0' I'm not too sure why it's using localhost

So I tried

from kafka.common import BrokerMetadata
newBroker = BrokerMetadata(0, 'x.x.x.x', 9092)
client.brokers[0] = newBroker
producer = SimpleProducer(client, 'test')
producer.send_messages('this is a test')

and got the same traceback as before

I'm still a little new to kafka, so it's probably something trivial but the current setup I have is zookeeper and a kafka server with one broker and kafka's console consumer running on a remote box1. I'm running your module from another box2. Do I need to have something else running box2? I can telnet into box1 with the host:port with no problems. I can also take the client and do a client.close() and watch kafka on box2 tell me Closing socket connection to x.x.x.x So KafkaClient is connecting to it.

Both boxes are running Ubuntu 12.04, python 2.7 and Kafka 0.8
box2 has kafka 0.8
box1 has the head of your repo

consumer.commit() causes Exception

Hi,
Trying to commit after consuming messages creates an exception, both with the latest kafka out of git and the latest release. I tried using auto_commit_every_n and I tried calling commit manually, but both produce this error:

File "/Users/rosejn/dm/hubble/venv/lib/python2.7/site-packages/kafka/conn.py", line 51, in _consume_response_iter
raise Exception("Got no response from Kafka")

Please let me know if I can help track this down.

Thanks,
Jeff

import json
from kafka.client import KafkaClient, FetchRequest, ProduceRequest
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer

def produce_example(client):
producer = SimpleProducer(client, "my-topic")
producer.send_messages(json.dumps({'foo': 2, 'bar': "test me!!!!"}))

def consume_example(client):
consumer = SimpleConsumer(client, "test-group", "my-topic",
auto_commit_every_n=1)

for message in consumer:
    print(message)

#consumer.commit()

def main():
client = KafkaClient("localhost", 9092)
produce_example(client)
produce_example(client)
consume_example(client)

main()

Handling of BufferUnderflowError

Hi,

my application uses the KafkaClient directly to make calls to Kafka. Once in while, mostly when calling send_fetch_request, the application gets a BufferUnderflowError most likely caused by truncated responses received from the broker. My issue lies with the fact I believe some forms of partial messages but not all of them are trapped by the client.

What I mean is, in decode_fetch_response, only the code handling the MessageSet (_decode_message_set_iter) will actually trap BufferUnderflowError and raise StopIteration. On the other hand, all other calls relying on util.py methods to unpack data (e.g. read_short_string) will raise the BufferUnderflowError which will only be caught by my application.

Callstack example:

[...]
    resps = client.send_fetch_request(reqs, fail_on_error=False)
  File "~/lib/site-packages/kafka/client.py", line 228, in send_fetch_request
    KafkaProtocol.decode_fetch_response)
  File "~/lib/site-packages/kafka/client.py", line 160, in _send_broker_aware_request
    for response in decoder_fn(response):
  File "~/lib/site-packages/kafka/protocol.py", line 265, in decode_fetch_response
    ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
  File "~/lib/site-packages/kafka/util.py", line 56, in relative_unpack
    raise BufferUnderflowError("Not enough data left")
BufferUnderflowError: Not enough data left

Is this expected? As I mentioned, anything similar happening within _decode_message_set_iter would end up with a StopIteration instead.

I am simply looking for the proper way to manage this in my app. I guess this would also apply to the SimpleConsumer implementation provided by the lib.

For reference, the explanation of truncated fetch responses: http://search-hadoop.com/m/4TaT40IBqM1

cheers,
marc

Support Consumer-Coordination / Rebalancing

Implemented simple Zookeeper support. Using Kazoo.

This was built on top of the gevent patch (#37).

  • For Producer, it provides support retries in case there is an error during send
  • For SimpleConsumer, it provides support for re-balancing of partitions among other kafka-python clients. (We can look at full support for Java/Scala clients once the new design is finalized). This will allow us to use kafka as a backend for systems like celery.

You can have a look at the code here:
mahendra/kafka-python@gevent...zookeeper

Not tested yet. Will keep updating the code as I progress with the tests.

Offset Handling

The SimpleConsumer has some commented-out code for pulling offsets from the brokers: https://github.com/mumrah/kafka-python/blob/33cde520de9067845d4c7159a2c2834846e1957f/kafka/consumer.py#L100

Is this commented out on purpose? It's unclear to me if "Uncomment for 0.8.1" is referring to a version of Kafka or a version of kafka-python.

I tried to uncomment this, and I get an error in encode_offset_fetch_request/write_short_string. Is offset handling broken or should I be able to get this to work?

I am using a work-around now of consumer.seek(0, 0), but this is not ideal, since I have to reprocess every entry each run.

Problem using async=True

Hi,

I'm having trouble using an asynchronous producer. I'm running the latest copy of the code from github.

I have a simple script which exhibits the problem:

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer

if __name__ == '__main__':

    kafka = KafkaClient("192.168.15.136", 9092)
    producer = SimpleProducer(kafka, "test", async=True)

    producer.send_messages("Hello")

The log output is as follows:

Traceback (most recent call last):
  File "kafka_test.py", line 7, in <module>
    producer = SimpleProducer(kafka, "test", async=True)
  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 176, in __init__
    batch_send_every_t)
  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 72, in __init__
    self.proc.start()
  File "c:\python27\Lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "c:\python27\Lib\multiprocessing\forking.py", line 271, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "c:\python27\Lib\multiprocessing\forking.py", line 193, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "c:\python27\Lib\pickle.py", line 224, in dump
    self.save(obj)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\multiprocessing\forking.py", line 66, in dispatcher
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 401, in save_reduce
    save(args)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 548, in save_tuple
    save(element)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 686, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 548, in save_tuple
    save(element)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 748, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <built-in method recvfrom_into of _socket.socket object at 0x02D36F48>: it's not found as __main__.recvfrom_into
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "c:\python27\Lib\multiprocessing\forking.py", line 374, in main
    self = load(from_parent)
  File "c:\python27\Lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "c:\python27\Lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "c:\python27\Lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError

The same code works if I change async to False. This is running on Python 2.7.3 on Windows against Kafka 0.8.0. Any suggestions gratefully received

Packaging

Need setup.py so this can be installed as a package

Base64 encoded/json messages causing issues?

I have been using kafka-python on a projectwhere I am streaming json strings through a kafka topic. This seems to work well but every so often long before a topic is empty the consumer.get_messages (using SimpleConsumer) starts given me back empty arrays.

If I try and base64 encode the data before its sent then any message I try and read from the consumer comes back as an empty array. Is there any way that certain character combinations in the message could be causing this? Or is it a message size issue?

When I run the kafka-console-consumer.sh on the same topic it has no problem.

Using kafka 0.8

Multiprocess consumer bottleneck

It appears that the multiprocessor consumer puts all events back into a single queue. However, to process these events using multiprocessing, no queue is needed and there should be a way to define the code that is executed in each process upon event arrival.

Topic discovery

I'm interested in doing continuous topic discovery. Based on ticket #38, it appears that integrating a zookeeper client is not preferred. Will the Kafka changes of moving offsets and partitions into the API instead of ZK allow for topic discovery? Any ideas on what would be a good work around for the time being?

Create benchmark tests

It would be great to have benchmark/performance tests to compare Kafka client implementations and make a decision which one to use.

response decoding fails with BufferUnderflowError if message size falls in [4097, 4100] range.

using older version of kafka-python (https://github.com/kngenie/kafka-python/tree/0.8.0beta1) with Kafka 0.8.0beta. but I believe this is relevant to kafka-python master, too.

observed symptom: fetch request fails with BufferUnderflowError while decoding response message. received message data looks okay, except for it's missing a few bytes at the tail. error happens when message size (first int32 = Size) is one of (4097, 4098, 4099, 4100).

cause: _consume_response_iter() is calculating messageSize wrong. no need to subtract 4 from Size.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.