Giter VIP home page Giter VIP logo

afkak's Introduction

Afkak: Twisted Python Kafka Client

PyPI calver: YY.MM.MICRO Apache 2.0 Documentation

Afkak is a Twisted-native Apache Kafka client library. It provides support for:

  • Producing messages, with automatic batching and optional compression.
  • Consuming messages, with group coordination and automatic commit.

Learn more in the documentation, download from PyPI, or review the contribution guidelines. Please report any issues on GitHub.

Status

Afkak supports these Pythons:

  • CPython 3.7, 3.8, and 3.9
  • PyPy3

We aim to support Kafka 1.1.x and later. Integration tests are run against these Kafka broker versions:

  • 0.9.0.1
  • 1.1.1

Testing against 2.0.0 is planned (see #45).

Newer broker releases will generally function, but not all Afkak features will work on older brokers. In particular, the coordinated consumer won’t work before Kafka 0.9.0.1. We don’t recommend deploying such old releases of Kafka anyway, as they have serious bugs.

Usage

High level

Note: This code is not meant to be runnable. See producer_example and consumer_example for runnable example code.

from afkak.client import KafkaClient
from afkak.consumer import Consumer
from afkak.producer import Producer
from afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS,
    PRODUCER_ACK_LOCAL_WRITE)

kClient = KafkaClient("localhost:9092")

# To send messages
producer = Producer(kClient)
d1 = producer.send_messages("my-topic", msgs=[b"some message"])
d2 = producer.send_messages("my-topic", msgs=[b"takes a list", b"of messages"])
# To get confirmations/errors on the sends, add callbacks to the returned deferreds
d1.addCallbacks(handleResponses, handleErrors)

# To wait for acknowledgements
# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to
#                         a local log before sending response
# [ the default ]
# PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed
#                            by all in sync replicas before sending a response
producer = Producer(kClient,
                    req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE,
                    ack_timeout=2000)

responseD = producer.send_messages("my-topic", msgs=[b"message"])

# Using twisted's @inlineCallbacks:
responses = yield responseD
if response:
    print(response[0].error)
    print(response[0].offset)

# To send messages in batch: You can use a producer with any of the
# partitioners for doing this. The following producer will collect
# messages in batch and send them to Kafka after 20 messages are
# collected or every 60 seconds (whichever comes first). You can
# also batch by number of bytes.
# Notes:
# * If the producer dies before the messages are sent, the caller would
# * not have had the callbacks called on the send_messages() returned
# * deferreds, and so can retry.
# * Calling producer.stop() before the messages are sent will
# errback() the deferred(s) returned from the send_messages call(s)
producer = Producer(kClient, batch_send=True,
                    batch_send_every_n=20,
                    batch_send_every_t=60)
responseD1 = producer.send_messages("my-topic", msgs=[b"message"])
responseD2 = producer.send_messages("my-topic", msgs=[b"message 2"])

# To consume messages
# define a function which takes a list of messages to process and
# possibly returns a deferred which fires when the processing is
# complete.
def processor_func(consumer, messages):
    #  Store_Messages_In_Database may return a deferred
    result = store_messages_in_database(messages)
    # record last processed message
    consumer.commit()
    return result

the_partition = 3  # Consume only from partition 3.
consumer = Consumer(kClient, "my-topic", the_partition, processor_func)
d = consumer.start(OFFSET_EARLIEST)  # Start reading at earliest message
# The deferred returned by consumer.start() will fire when an error
# occurs that can't handled by the consumer, or when consumer.stop()
# is called
yield d

consumer.stop()
kClient.close()

Keyed messages

from afkak.client import KafkaClient
from afkak.producer import Producer
from afkak.partitioner import HashedPartitioner, RoundRobinPartitioner

kafka = KafkaClient("localhost:9092")

# Use the HashedPartitioner so that the producer will use the optional key
# argument on send_messages()
producer = Producer(kafka, partitioner_class=HashedPartitioner)
producer.send_messages("my-topic", "key1", [b"some message"])
producer.send_messages("my-topic", "key2", [b"this method"])

Low level

from afkak.client import KafkaClient
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
    messages=[KafkaProtocol.encode_message(b"some message")])
resps = afkak.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()

resps[0].topic      # b"my-topic"
resps[0].partition  # 1
resps[0].error      # 0 (hopefully)
resps[0].offset     # offset of the first message sent in this request

Install

Afkak releases are available on PyPI.

Because the Afkak dependencies Twisted and python-snappy have binary extension modules you will need to install the Python development headers for the interpreter you wish to use:

Debian/Ubuntu: sudo apt-get install build-essential python3-dev pypy3-dev libsnappy-dev
OS X brew install python pypy snappy
pip install virtualenv

Then Afkak can be installed with pip as usual:

License

Copyright 2013, 2014, 2015 David Arthur under Apache License, v2.0. See LICENSE

Copyright 2014, 2015 Cyan, Inc. under Apache License, v2.0. See LICENSE

Copyright 2015–2021 Ciena Corporation under Apache License, v2.0. See LICENSE

This project began as a port of the kafka-python library to Twisted.

See AUTHORS.md for the full contributor list.

Tests

In order to run Afkak's tests, you need to install the dependencies as specified in the install section.

The Afkak test suite uses Tox to execute the tests in all the supported Python versions. The preferred method to run the tests is to install Tox in a virtual environment before running the tests:

make venv

Testing Strategy

Afkak has two types of tests:

  • Unit tests — unit tests are fast tests. They don't do I/O.

  • Integration tests — tests that run against a real Kafka broker.

Run the unit tests

To run all unit tests in all the supported Python versions (requires all the versions to be installed in the system where the tests will run):

make toxu

Alternatively, you might want to run unit tests in a list of specific Python versions:

.env/bin/tox -e py35-unit-snappy,py38-unit-snappy

Please run the tests on the minimum and maximum supported Python versions before submitting a pull request.

Run the integration tests

The integration tests will actually start up real local ZooKeeper instance and Kafka brokers, and send messages in using the client.

The makefile knows how to download several versions of Kafka. This will run just the integration tests against Kafka 1.1.1:

KAFKA_VER=1.1.1 make toxi

Run all the tests against the default Kafka version

make toxa

Run the integration tests against all the Kafka versions the Makefile knows about

make toxik

afkak's People

Contributors

alex8080 avatar davidkbainbridge avatar jcabmora avatar julian avatar mosteli avatar mvaught02 avatar phudgins-ciena avatar rthille avatar sebv avatar twm avatar warnerpr-cyan avatar wddimple6 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

afkak's Issues

Document FastMurmur2 extra

The FastMurmur2 extra pulls in C implementation of the Murmur2 hash function. It should be mentioned in the readme.

Mention varying platform support: the pyhash library requires PyPy 6.0 or later.

Readme examples not working

Producer and Consumer examples status 404 Not found. Is there any concrete demonstration of how to get started with afkak

Failed to fetch message after resetting fetch_offset when hitting OffsetOutOfRange error

bash-4.4# /opt/kafka/bin/kafka-consumer-groups.sh --describe --group "Email Dispatch Consumer" --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'Email Dispatch Consumer' has no active members.

TOPIC                   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
bp.crinoid.v1.emailRule 0          4               14              10              -               -               -

kafka topic dumps, there are message with offset 13 :-)

```bash-4.4# kafka_dump_topic bp.crinoid.v1.emailRule --property print.offset=true
Offset (9)  {"header": {"roleIds": [], "upstreamId": "c6763161-8ee8-40d2-99c9-f1e942b7439a", "traceId": "388586e5-77a3-4db0-8c83-3e2df04915c7", "timestamp": "2018-12-13T16:25:07.090725Z", "envelopeId": "d1b4f522-cd4d-4193-bcb9-e3e6c7a7ccb3"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "0ab5bc5e-5ec7-48b6-a88c-d16472d80e89", "upstreamId": "4194c1c3-82c9-4477-be48-3ff4a331e16a", "traceId": "a034e966-a456-444c-b8df-624e22230a20", "timestamp": "2018-12-13T16:25:05.428370Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64", "upstream_id": "00a743dc-296c-4a80-ab97-a38d9979e01b", "object_id": "", "trace_id": "a034e966-a456-444c-b8df-624e22230a20", "hop_id": "4194c1c3-82c9-4477-be48-3ff4a331e16a", "object_data": {"confirmation_link": "https://10.180.71.18/platform-ui/#/security/reset-password?token=IjEyNDk5NjdmLTBjNzktNDAxOS1iNTk4LWQ0ZGM0Yjk2NGQ2NCI:1gXTnN:Al5DS9_uIqSbGsP9mNLLbbqSe8A", "email": "[email protected]", "name": "Du"}, "op": "update_user_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "update_user_reset_password"}, "name": "Update_User_Reset_Password", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password expired"}, "template": "4d338952-7783-44cf-b40d-1fb4e68564b9", "notification_type": "email", "id": 24, "uuid": "68e502b8-8286-47b2-a15c-7f4d00824247"}}}}
Offset (10) {"header": {"roleIds": [], "upstreamId": "a5c630c3-7eaf-4442-8db2-652dcdd04b1e", "traceId": "ed9656b1-d364-4b00-ab73-e453e3915b05", "timestamp": "2018-12-13T16:40:46.198530Z", "envelopeId": "08f9ca93-64ca-4b2b-8f80-bd89cf27ab84"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "ee4c0637-88a1-4d46-b49f-eec5a2ec7c52", "upstreamId": "0b39d9e1-45df-43f2-b740-6afa8388d25c", "traceId": "ed9656b1-d364-4b00-ab73-e453e3915b05", "timestamp": "2018-12-13T16:40:42.893394Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64/reset_password", "upstream_id": "cec5f81d-5fe8-4f2d-9614-600cc61791f1", "object_id": "1249967f-0c79-4019-b598-d4dc4b964d64", "trace_id": "ed9656b1-d364-4b00-ab73-e453e3915b05", "hop_id": "0b39d9e1-45df-43f2-b740-6afa8388d25c", "object_data": {"email": "[email protected]", "name": "Du"}, "op": "email_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "email_reset_password"}, "name": "Admin_Password_Reset", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password reset"}, "template": "bf89ff20-5cc9-4325-b392-98a3670e8f93", "notification_type": "email", "id": 6, "uuid": "3541f8ca-d162-4480-afb0-e3d312d3e221"}}}}
Offset (11) {"header": {"roleIds": [], "upstreamId": "82d29005-f22f-4ab1-9970-807888ffcb07", "traceId": "256b58e4-7f89-4c4d-9dc3-cc52260f77ad", "timestamp": "2018-12-13T16:45:20.707517Z", "envelopeId": "19157fff-e097-4015-9503-c59314ab1e8c"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "a86e1467-9933-437c-9423-29dd7343851a", "upstreamId": "887b944a-077a-48bb-b4f1-ef6acf9f78ea", "traceId": "256b58e4-7f89-4c4d-9dc3-cc52260f77ad", "timestamp": "2018-12-13T16:45:13.762890Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64", "upstream_id": "3a73daf6-969b-43e3-83b8-669661e8f878", "object_id": "", "trace_id": "256b58e4-7f89-4c4d-9dc3-cc52260f77ad", "hop_id": "887b944a-077a-48bb-b4f1-ef6acf9f78ea", "object_data": {"confirmation_link": "https://10.180.71.18/platform-ui/#/security/reset-password?token=IjEyNDk5NjdmLTBjNzktNDAxOS1iNTk4LWQ0ZGM0Yjk2NGQ2NCI:1gXU6r:LpXhAB8Pyws4_qchzX9fx6C0lz0", "email": "[email protected]", "name": "Du"}, "op": "update_user_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "update_user_reset_password"}, "name": "Update_User_Reset_Password", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password expired"}, "template": "4d338952-7783-44cf-b40d-1fb4e68564b9", "notification_type": "email", "id": 24, "uuid": "68e502b8-8286-47b2-a15c-7f4d00824247"}}}}
Offset (12) {"header": {"roleIds": [], "upstreamId": "6ba43bdc-d66a-45c4-a6e3-de424b4fd598", "traceId": "f5ad34b0-3db1-4e00-9c10-3ad15abdfdbb", "timestamp": "2018-12-13T16:45:51.962691Z", "envelopeId": "8339e2a5-c35c-45d3-83dc-8577ad95a57c"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "32a7f929-01f5-4b26-a5e9-5aaa4a27b3b9", "upstreamId": "9a34be1d-cf58-4f2d-a379-8a800518b279", "traceId": "f5ad34b0-3db1-4e00-9c10-3ad15abdfdbb", "timestamp": "2018-12-13T16:45:48.068736Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64/reset_password", "upstream_id": "92ab38c8-4206-4eea-b3f7-0175ff079b20", "object_id": "1249967f-0c79-4019-b598-d4dc4b964d64", "trace_id": "f5ad34b0-3db1-4e00-9c10-3ad15abdfdbb", "hop_id": "9a34be1d-cf58-4f2d-a379-8a800518b279", "object_data": {"email": "[email protected]", "name": "Du"}, "op": "email_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "email_reset_password"}, "name": "Admin_Password_Reset", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password reset"}, "template": "bf89ff20-5cc9-4325-b392-98a3670e8f93", "notification_type": "email", "id": 6, "uuid": "3541f8ca-d162-4480-afb0-e3d312d3e221"}}}}
Offset (13) {"header": {"roleIds": [], "upstreamId": "b3bdb785-e2c3-4221-a061-62fa1ec377f7", "traceId": "93e015b8-a93f-4c50-9dca-6544c83fe89b", "timestamp": "2018-12-13T17:12:56.515914Z", "envelopeId": "597367d8-d7eb-425b-8156-53933e375820"}, "version": 1, "event": {"_type": "bp.crinoid.v1.FiredNotification", "object_data": {"message": {"header": {"envelopeId": "da8eca48-364c-42a2-b19b-8839125886b7", "upstreamId": "ca6be94a-a24e-4638-aec1-6ec22937f58c", "traceId": "93e015b8-a93f-4c50-9dca-6544c83fe89b", "timestamp": "2018-12-13T17:12:51.277018Z"}, "version": 1, "event": {"_type": "bp.v1.ObjectChanged", "object_type": "/tron/api/v1/users/1249967f-0c79-4019-b598-d4dc4b964d64", "upstream_id": "4ab1feb7-9f67-448f-b63a-be570258971e", "object_id": "", "trace_id": "93e015b8-a93f-4c50-9dca-6544c83fe89b", "hop_id": "ca6be94a-a24e-4638-aec1-6ec22937f58c", "object_data": {"confirmation_link": "https://10.180.71.18/platform-ui/#/security/reset-password?token=IjEyNDk5NjdmLTBjNzktNDAxOS1iNTk4LWQ0ZGM0Yjk2NGQ2NCI:1gXUXb:wx7lSZThjXEVdtn8YjOikPpomkA", "email": "[email protected]", "name": "Du"}, "op": "update_user_reset_password"}}, "rule": {"remaining_invocations": null, "_type": "bp.crinoid.v1.emailRule", "extra_compare": {"op": "update_user_reset_password"}, "name": "Update_User_Reset_Password", "trigger_type": "bp.v1.ObjectChanged", "topic": "bp.tron.v1.changes", "notification_params": {"to": "{{ get('message.event.object_data.email') }}", "subject": "Blue Planet account password expired"}, "template": "4d338952-7783-44cf-b40d-1fb4e68564b9", "notification_type": "email", "id": 24, "uuid": "68e502b8-8286-47b2-a15c-7f4d00824247"}}}}```

Added following log messages to afkak installed with ntf-dispatcher to track down why consumer stops consuming messages after offset reset to latest when hitting the OffsetOutOfRange error.

2018-12-13T16:52:05.128724+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses: [FetchResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=1, highwaterMark=-1, messages=<generator object _decode_message_set_iter at 0x7f3a04d67b40>)]", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 566, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.127854Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.129702+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses _check_error: FetchResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=1, highwaterMark=-1, messages=<generator object _decode_message_set_iter at 0x7f3a04d67b40>)", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 569, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.128474Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.130384+00:00 onxv1259 dispatcher-email_0: {"msg": "isinstance(responseOrErrcode, int): False", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 763, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.128818Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.130722+00:00 onxv1259 dispatcher-email_0: {"msg": "errno: 1", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 768, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.129217Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.131078+00:00 onxv1259 dispatcher-email_0: {"msg": "cls: <class 'afkak.common.OffsetOutOfRangeError'>", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 774, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.129565Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.131441+00:00 onxv1259 dispatcher-email_0: {"msg": "Reset offset to -1", "namespace": "afkak.consumer", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/consumer.py", "code_line": 725, "code_func": "_handle_fetch_error", "timestamp": "2018-12-13T16:52:05.130108Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.138426+00:00 onxv1259 dispatcher-email_0: {"msg": "send_offset_request [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 508, "code_func": "send_offset_request", "timestamp": "2018-12-13T16:52:05.133847Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.138767+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses: [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 566, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.134468Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.139132+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses _check_error: OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 569, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.134844Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.139456+00:00 onxv1259 dispatcher-email_0: {"msg": "isinstance(responseOrErrcode, int): False", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 763, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.135194Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.139756+00:00 onxv1259 dispatcher-email_0: {"msg": "errno: 0", "namespace": "afkak.common", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/common.py", "code_line": 768, "code_func": "_check_error", "timestamp": "2018-12-13T16:52:05.135484Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.140090+00:00 onxv1259 dispatcher-email_0: {"msg": "client._handle_responses out: [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]", "namespace": "afkak.client", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/client.py", "code_line": 589, "code_func": "_handle_responses", "timestamp": "2018-12-13T16:52:05.135788Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.140398+00:00 onxv1259 dispatcher-email_0: {"msg": "consumer._handle_fetch_response: [OffsetResponse(topic=u'bp.crinoid.v1.emailRule', partition=0, error=0, offsets=(13,))]", "namespace": "afkak.consumer", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/consumer.py", "code_line": 775, "code_func": "_handle_fetch_response", "timestamp": "2018-12-13T16:52:05.136192Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.140711+00:00 onxv1259 dispatcher-email_0: {"msg": "self._msg_block_d: None", "namespace": "afkak.consumer", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/consumer.py", "code_line": 788, "code_func": "_handle_fetch_response", "timestamp": "2018-12-13T16:52:05.136506Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}
2018-12-13T16:52:05.141153+00:00 onxv1259 dispatcher-email_0: {"msg": "messages to deliver: []", "namespace": "afkak.consumer", "priority": 6, "pid": 1, "tid": 139887261095680, "code_file": "build/bdist.linux-x86_64/egg/afkak/consumer.py", "code_line": 853, "code_func": "_handle_fetch_response", "timestamp": "2018-12-13T16:52:05.136835Z", "app": "dispatcher-email", "app_instance": "0", "host": "onxv1259", "container": "117fc7fa5429"}

@rthille Can you please provide any insight and appreciated the help?

TestFailover.test_switch_leader intermittent failures

The afkak.test.test_failover_integration.TestFailover.test_switch_leader tests fails intermittently with the following message:

Traceback (most recent call last):
  File "/home/travis/build/ciena/afkak/afkak/test/testutil.py", line 87, in wrapper
    return func(self)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 169, in wrapper
    raise exc_type(exc_value).with_traceback(tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 143, in errback
    failure.raiseException()
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 385, in raiseException
    raise self.value.with_traceback(self.tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1386, in _inlineCallbacks
    result = g.send(result)
  File "/home/travis/build/ciena/afkak/afkak/test/test_failover_integration.py", line 141, in test_switch_leader
    self.assertEqual(count, 22 * index)
AssertionError: 20 != 22

https://travis-ci.org/ciena/afkak/jobs/414207454

Publish afkak releases to PyPI

It would be convenient to be able to install Afkak using "pip install afkak". It looks like only 2.4.0 was ever published to pypi, although pip install 'git+https://github.com/ciena/[email protected]#egg=afkak' does work in the meantime.

Putting new releases on pypi also aids in discoverability of the project.

KafkaUnavailableError

The afkak client and producer are used for opencord/voltha kafka access. We are seeing every 10+ minutes the following error:Unable to load metadata from configured hosts: <twisted.python.failure.Failure afkak.common.KafkaUnavailableError: All servers ([<KafkaBrokerClient 192.168.0.202:9092 clientId=u'afkak-client' connected>]) failed to process request>

When the error occurs there is no indication in the kafka-docker logs of any problem/error.
When this occurs the voltha/kafka_proxy stop /restart is generated which results in:

[2018-10-03 17:49:11,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

Which is an expected message.

This error occurs when using afkak 3.0.0.dev0 and 2.4.0.

TestAfkakClientIntegration.test_roundtrip_large_request intermittent failures

The integration test afkak.test.test_client_integration.TestAfkakClientIntegration.test_roundtrip_large_request intermittently fails with NotLeaderForPartitionError:

Traceback (most recent call last):
  File "/home/travis/build/ciena/afkak/afkak/test/testutil.py", line 87, in wrapper
    return func(self)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 169, in wrapper
    raise exc_type(exc_value).with_traceback(tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 143, in errback
    failure.raiseException()
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 385, in raiseException
    raise self.value.with_traceback(self.tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1384, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 408, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/test/test_client_integration.py", line 131, in test_roundtrip_large_request
    produce_resp, = yield self.client.send_produce_request([produce])
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1386, in _inlineCallbacks
    result = g.send(result)
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 463, in send_produce_request
    returnValue(self._handle_responses(resps, fail_on_error, callback))
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 564, in _handle_responses
    check_error(resp)
  File "/home/travis/build/ciena/afkak/afkak/common.py", line 300, in check_error
    raise error(responseOrErrcode)
afkak.common.NotLeaderForPartitionError: ProduceResponse(topic='test_roundtrip_large_request-lomJaTsqwi', partition=0, error=6, offset=-1)

https://travis-ci.org/ciena/afkak/jobs/412822017

Or with UnknownTopicOrPartitionError:

Traceback (most recent call last):
  File "/home/travis/build/ciena/afkak/afkak/test/testutil.py", line 87, in wrapper
    return func(self)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 169, in wrapper
    raise exc_type(exc_value).with_traceback(tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 143, in errback
    failure.raiseException()
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 385, in raiseException
    raise self.value.with_traceback(self.tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1384, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 408, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/test/test_client_integration.py", line 131, in test_roundtrip_large_request
    produce_resp, = yield self.client.send_produce_request([produce])
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1386, in _inlineCallbacks
    result = g.send(result)
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 463, in send_produce_request
    returnValue(self._handle_responses(resps, fail_on_error, callback))
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 564, in _handle_responses
    check_error(resp)
  File "/home/travis/build/ciena/afkak/afkak/common.py", line 300, in check_error
    raise error(responseOrErrcode)
afkak.common.UnknownTopicOrPartitionError: ProduceResponse(topic='test_roundtrip_large_request-JjyTDqjQBS', partition=0, error=3, offset=-1)

https://travis-ci.org/ciena/afkak/jobs/412822019#L581
https://travis-ci.org/ciena/afkak/jobs/412822025#L584

Write release procedure

As a follow-up to #67, write up a release procedure that describes the steps to release Afkak and publish it to GitHub releases and PyPI.

TestPerformanceIntegration.test_throughput failure

I have only seen this once:

ERROR: afkak.test.test_performance_integration.TestPerformanceIntegration.test_throughput
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 151, in maybeDeferred
    result = f(*args, **kw)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/utils.py", line 201, in runWithWarningsSuppressed
    reraise(exc_info[1], exc_info[2])
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/compat.py", line 463, in reraise
    raise exception.with_traceback(traceback)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/utils.py", line 197, in runWithWarningsSuppressed
    result = f(*a, **kw)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 169, in wrapper
    raise exc_type(exc_value).with_traceback(tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 143, in errback
    failure.raiseException()
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 467, in raiseException
    raise self.value.with_traceback(self.tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
    result = g.send(result)
  File "/home/travis/build/ciena/afkak/afkak/test/testutil.py", line 211, in tearDown
    self.assertFalse(dcs)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/trial/_synctest.py", line 384, in assertFalse
    super(_Assertions, self).assertFalse(condition, msg)
  File "/opt/python/3.5.6/lib/python3.5/unittest/case.py", line 676, in assertFalse
    raise self.failureException(msg)
twisted.trial.unittest.FailTest: [<twisted.internet.base.DelayedCall object at 0x7f9b3a024c18>] is not false

Kafka 0.8.3 issue

Hi! Not sure if current version of library is support 0.8.3, but mainly it looks workable, but after half an hour after start, my application suddenly raise this exception:

afkak.common.KafkaUnavailableError: Unable to load metadata from configured hosts: <twisted.python.failure.Failure afkak.common.KafkaUnavailableError: All servers [[('172.19.0.5', 9092)]] failed to process request>

And from this point I am unable to send message to kafka. If I restarting my app it continue work for half hour and it repeating.

Error starting a new consumer group when earliest offset > 0

Hi,

I think I've found a bug when trying to start a new consumer-group consumer against a partition (i.e. there is no committed offset for the group in kafka) and the earliest offset in the partition is > 0, e.g. because the kafka log cleaner has deleted old messages.

e.g. consumer created and started like:

consumer = Consumer(
    client=my_kafka_client,
    topic="mytopic",
    partition=0,
    processor=my_event_handler,
    consumer_group="myroup",
)
consumer.start(OFFSET_COMMITTED)

If the earliest offset in the partition is > 0 the consumer will loop indefinitely giving this error:

Failure fetching messages from kafka: <twisted.python.failure.Failure afkak.common.OffsetOutOfRangeError: FetchResponse(topic='mytopic', partition=0, error=1, highwaterMark=-1, messages=<generator object _decode_message_set_iter at 0x0000000107623d80>)>

I believe this is what's happening in the background:

  • Kafka topic has one partition, earliest offset is 10.
  • New consumer sends initial OffsetFetchRequest for partition and consumer_group=foo.
  • Kafka has no committed offset for foo, returns OffsetFetchResponse with offset = -1.
  • Afkak sets consumer._fetch_offset to -1 + 1 = 0
  • Consumer makes fetch request for offset=0
  • Kafka returns OffsetOutOfRangeError since 0 < 10.
  • Afkak loops infinitely re-requesting messages from offset=0

Suggested Fix:

  • Check for -1 in the offset response and set the offset to OFFSET_EARLIEST, e.g.
def _handle_offset_response(self, response):
    ...
   response = response[0]
   if hasattr(response, 'offsets'):
       # It's a response to an OffsetRequest
       self._fetch_offset = response.offsets[0]
   else:
       # It's a response to an OffsetFetchRequest
       if response.offset == -1:
           # Kafka has no committed offset for this group 
           # so read from earliest available
           self._fetch_offset = OFFSET_EARLIEST
       else: 
           self._fetch_offset = response.offset + 1

        self._last_committed_offset = response.offset
        ...

I've tried this locally and it seems to work but I'm struggling to write a suitable unit test for this case to submit a patch.

Regards,
Stephen

Consumer: FailedPayloadError when stopping consumer

When the consumer is stopped, a Traceback with the FailedPayloadError is printed to stdout. This problem happens when running the consumer_example script.

To replicate this issue I followed this procedure

  • setup and activated a Python 2.7 virtual environment,
  • downloaded consumer_example , changed its mode to 755, updated the topic to be wutang (a topic that only has 5 messages) and changed the Consumer to start consuming from OFFSET_EARLIEST
  • installed afkak from the tip at this repo
  • executed the consumer_example script.

The following is the output of the last two steps:

(py27) » pip install https://github.com/ciena/afkak/archive/master.zip
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 won't be maintained after that date. A future version of pip will drop support for Python 2.7. More details about Python 2 support in pip, can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support
Collecting https://github.com/ciena/afkak/archive/master.zip
  Downloading https://github.com/ciena/afkak/archive/master.zip
     - 1.3MB 3.7MB/s
Collecting six (from afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/73/fb/00a976f728d0d1fecfe898238ce23f502a721c0ac0ecfedb80e0d88c64e9/six-1.12.0-py2.py3-none-any.whl
Collecting Twisted>=18.7.0 (from afkak==19.6.0a1)
Collecting PyHamcrest>=1.9.0 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/9a/d5/d37fd731b7d0e91afcc84577edeccf4638b4f9b82f5ffe2f8b62e2ddc609/PyHamcrest-1.9.0-py2.py3-none-any.whl
Collecting zope.interface>=4.4.2 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/a2/a2/e68c37eb2ef9bf942e0ace19f4cf6fe3e7c650932fb587bfde3c608f7d77/zope.interface-4.6.0-cp27-cp27mu-manylinux1_x86_64.whl
Collecting Automat>=0.3.0 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/a3/86/14c16bb98a5a3542ed8fed5d74fb064a902de3bdd98d6584b34553353c45/Automat-0.7.0-py2.py3-none-any.whl
Collecting hyperlink>=17.1.1 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/7f/91/e916ca10a2de1cb7101a9b24da546fb90ee14629e23160086cf3361c4fb8/hyperlink-19.0.0-py2.py3-none-any.whl
Collecting attrs>=17.4.0 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/23/96/d828354fa2dbdf216eaa7b7de0db692f12c234f7ef888cc14980ef40d1d2/attrs-19.1.0-py2.py3-none-any.whl
Collecting incremental>=16.10.1 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/f5/1d/c98a587dc06e107115cf4a58b49de20b19222c83d75335a192052af4c4b7/incremental-17.5.0-py2.py3-none-any.whl
Collecting constantly>=15.1 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/b9/65/48c1909d0c0aeae6c10213340ce682db01b48ea900a7d9fce7a7910ff318/constantly-15.1.0-py2.py3-none-any.whl
Requirement already satisfied: setuptools in ./py27/lib/python2.7/site-packages (from PyHamcrest>=1.9.0->Twisted>=18.7.0->afkak==19.6.0a1) (41.0.1)
Collecting idna>=2.5 (from hyperlink>=17.1.1->Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/14/2c/cd551d81dbe15200be1cf41cd03869a46fe7226e7450af7a6545bfc474c9/idna-2.8-py2.py3-none-any.whl
Building wheels for collected packages: afkak
  Building wheel for afkak (setup.py) ... done
  Created wheel for afkak: filename=afkak-19.6.0a1-py2.py3-none-any.whl size=171403 sha256=88515ebc3f110662546223648e5a775f327a1750e41f5fbed76e1bb0651f2efa
  Stored in directory: /tmp/pip-ephem-wheel-cache-Se8Sm2/wheels/82/f3/6a/8e09678abee87d881ef9a51df33b29e6d73a1852cf38c3d60f
Successfully built afkak
Installing collected packages: six, PyHamcrest, zope.interface, attrs, Automat, idna, hyperlink, incremental, constantly, Twisted, afkak
Successfully installed Automat-0.7.0 PyHamcrest-1.9.0 Twisted-19.2.1 afkak-19.6.0a1 attrs-19.1.0 constantly-15.1.0 hyperlink-19.0.0 idna-2.8 incremental-17.5.0 six-1.12.0 zope.interface-4.6.0

(py27) » ./consumer_example 172.16.1.13:9092
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=0, message=<Message v0 key='HkQCjJda' value='ueYRT'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=1, message=<Message v0 key='HkQCjJda' value='nAjxY'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=2, message=<Message v0 key='HkQCjJda' value='VHOoH'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=3, message=<Message v0 key='HkQCjJda' value='afkyo'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=4, message=<Message v0 key='HkQCjJda' value='sWcZb'>)
consumer_example INFO

consumer_example INFO Time is up, stopping consumers...
consumer_example ERROR Consumer failed: [Failure instance: Traceback: <class 'afkak.common.FailedPayloadsError'>: ([], [(FetchRequest(topic=u'wutang', partition=0, offset=5, max_bytes=131072), <twisted.python.failure.Failure twisted.internet.defer.CancelledError: >)])
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:460:callback
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:568:_startRunCallbacks
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:654:_runCallbacks
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:1475:gotResult
--- <exception caught here> ---
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/afkak/client.py:694:send_fetch_request
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:1418:_inlineCallbacks
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/afkak/client.py:1216:_send_broker_aware_request
]
afkak.brokerclient INFO _KafkaBrokerClient<node_id=1001 172.16.1.12:9092 connected>: Connection closed: <twisted.python.failure.Failure twisted.internet.error.ConnectionDone: Connection was closed cleanly.>

This problem also happens in a Python3 environment (which requires fixing consumer_example to replicate)
This problems does not happen with afkak 3.0.0

TestAfkakClientIntegration.test_roundtrip_large_request assertion error

Continuing on from #31...

FAIL: afkak.test.test_client_integration.TestAfkakClientIntegration.test_roundtrip_large_request
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/ciena/afkak/afkak/test/testutil.py", line 88, in wrapper
    return func(self)
  File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/nose/twistedtools.py", line 143, in errback
    failure.raiseException()
  File "/home/travis/build/ciena/afkak/.tox/py27-int-snappy-murmur/lib/python2.7/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
    result = g.send(result)
  File "/home/travis/build/ciena/afkak/afkak/test/test_client_integration.py", line 128, in test_roundtrip_large_request
    self.assertEqual(len(messages), 1)
AssertionError: 0 != 1

Seen once. Python 2.7, Kafka 1.1.1.

CPU Spikes

Hey there.

I'm in the middle of diagnosing this for us, so if I find the culprit I'll just close with the right resolution, but by any chance, have you guys noticed that CPU spikes and memory tanks if the Kafka cluster goes unreliable, even if it comes back again?

I.e., that the library actually doesn't recover for producers when kafka does?

Cheers,
-J

Consumer fails with `RecursionError: Maximum recursion depth exceeded`

When a Consumer fetches a response with a very large number of messages, it is possible to trigger a RecursionError:

ERROR Consumer failed: [Failure instance: Traceback: <class 'RecursionError'>: maximum recursion depth exceeded
/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/twisted/internet/defer.py:311:addCallbacks
/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/twisted/internet/defer.py:654:_runCallbacks
/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/afkak/consumer.py:904:<lambda>
/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/afkak/consumer.py:890:_process_messages
--- <exception caught here> ---
/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/twisted/internet/defer.py:151:maybeDeferred
./bppf7354:56:process
/usr/lib/python3.5/logging/__init__.py:1279:info
/usr/lib/python3.5/logging/__init__.py:1414:_log
/usr/lib/python3.5/logging/__init__.py:1384:makeRecord
/usr/lib/python3.5/logging/__init__.py:269:__init__

This problem can be reproduced by producing a number of messages larger than sys.getrecursionlimit() in a topic and instantiating a Consumer for that topic with auto_commit_every_n=1 and calling its start method with OFFSET_EARLIEST.

DNS resolving issue inside docker container

I have simple application that just connecting to kafka server and publishing messages to some topic, one message per minute. I am running my application inside docker container, and I am providing hostname of kafka server container to my application container. And it works fine for a few minutes, but at some point it starts eat 100% of cpu and print stuff like this. Looks like it try to resolve hostname, if I am providing kafka ip address instead of hostname, it not happen.

DNSDatagramProtocol starting on 15129
Starting protocol <twisted.names.dns.DNSDatagramProtocol object at 0x7fd30a5f8f90>
(UDP Port 15129 Closed)
Stopping protocol <twisted.names.dns.DNSDatagramProtocol object at 0x7fd30a5f8f90>
DNSDatagramProtocol starting on 21453
Starting protocol <twisted.names.dns.DNSDatagramProtocol object at 0x7fd30a5f8f90>
(UDP Port 21453 Closed)
Stopping protocol <twisted.names.dns.DNSDatagramProtocol object at 0x7fd30a5f8f90>
DNSDatagramProtocol starting on 54252
Starting protocol <twisted.names.dns.DNSDatagramProtocol object at 0x7fd30a5ddd10>
(UDP Port 54252 Closed)
Stopping protocol <twisted.names.dns.DNSDatagramProtocol object at 0x7fd30a5ddd10>

Example scripts do not work with Python3

Example scripts (e.g. consumer_example) do not run in Python3:

(venv) » ./consumer_example 172.16.1.13:9092
main function encountered error
Traceback (most recent call last):
  File "/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
    result = g.send(result)
  File "./consumer_example", line 42, in consume
    client = yield ready_client(reactor, hosts, topic)
  File "/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/twisted/internet/defer.py", line 1613, in unwindGenerator
    return _cancellableInlineCallbacks(gen)
  File "/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/twisted/internet/defer.py", line 1529, in _cancellableInlineCallbacks
    _inlineCallbacks(None, g, status)
--- <exception caught here> ---
  File "./consumer_example", line 42, in consume
    client = yield ready_client(reactor, hosts, topic)
  File "/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
    result = g.send(result)
  File "./consumer_example", line 30, in ready_client
    yield client.load_metadata_for_topics(topic)
  File "/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/afkak/client.py", line 460, in load_metadata_for_topics
    topics = tuple(_coerce_topic(t) for t in topics)
  File "/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/afkak/client.py", line 460, in <genexpr>
    topics = tuple(_coerce_topic(t) for t in topics)
  File "/home/ubuntu/sandbox/afkak/venv/lib/python3.5/site-packages/afkak/_util.py", line 36, in _coerce_topic
    raise TypeError('topic={!r} must be text'.format(topic))
builtins.TypeError: topic=b'example_topic' must be text

KafkaUnavailableError due to bad hosts argument

I passed a value like [('host1', 1234), ('host2', 1234)] as the first argument to KafkaClient, which isn't supported. Instead of a ValueError or TypeError, I got a KafkaUnavailableError at a later time which wrapped the appropriate TypeError:

Traceback (most recent call last):
  File "/home/tmost/dev/porch/afkak/.tox/py27-int-snappy-murmur/local/lib/python2.7/site-packages/twisted/internet
/defer.py", line 653, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
  File "/home/tmost/dev/porch/afkak/afkak/client.py", line 346, in _handleMetadataErr
    "hosts: {!r}".format(err))
KafkaUnavailableError: Unable to load metadata from configured hosts: <twisted.python.failure.Failure exceptions.TypeError: not all arguments converted during string formatting>

In this case TypeError should be raised synchronously by the KafkaClient constructor.

TestConsumerIntegration.test_large_messages failure

ERROR: afkak.test.test_consumer_integration.TestConsumerIntegration.test_large_messages
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 151, in maybeDeferred
    result = f(*args, **kw)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/utils.py", line 201, in runWithWarningsSuppressed
    reraise(exc_info[1], exc_info[2])
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/compat.py", line 464, in reraise
    raise exception.with_traceback(traceback)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/utils.py", line 197, in runWithWarningsSuppressed
    result = f(*a, **kw)
  File "/home/travis/build/ciena/afkak/afkak/test/testutil.py", line 88, in wrapper
    return func(self)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 169, in wrapper
    raise exc_type(exc_value).with_traceback(tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 143, in errback
    failure.raiseException()
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 467, in raiseException
    raise self.value.with_traceback(self.tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/test/test_consumer_integration.py", line 109, in test_large_messages
    0, [str(x) for x in range(10)])
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/test/test_consumer_integration.py", line 48, in send_messages
    resp, = yield self.client.send_produce_request([produce])
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
    result = g.send(result)
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 465, in send_produce_request
    returnValue(self._handle_responses(resps, fail_on_error, callback))
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 566, in _handle_responses
    _check_error(resp)
  File "/home/travis/build/ciena/afkak/afkak/common.py", line 723, in _check_error
    raise error
afkak.common.UnknownTopicOrPartitionError

Support Kafka 2.0

This issue tracks changes to Afkak required to support Kafka 2.0.0. See the upgrade guide:

KIP-283 Message Down-Conversion requires client changes

KIP-283 improves message down-conversion handling on Kafka broker, which has typically been a memory-intensive operation. The KIP adds a mechanism by which the operation becomes less memory intensive by down-converting chunks of partition data at a time which helps put an upper bound on memory consumption. With this improvement, there is a change in FetchResponse protocol behavior where the broker could send an oversized message batch towards the end of the response with an invalid offset. Such oversized messages must be ignored by consumer clients, as is done by KafkaConsumer.

KIP-283 also adds new topic and broker configurations message.downconversion.enable and log.message.downconversion.enable respectively to control whether down-conversion is enabled. When disabled, broker does not perform any down-conversion and instead sends an UNSUPPORTED_VERSION error to the client.

New Protocol Versions

TODO evaluate the following changes:

  • KIP-279: OffsetsForLeaderEpochResponse v1 introduces a partition-level leader_epoch field.
  • KIP-219: Bump up the protocol versions of non-cluster action requests and responses that are throttled on quota violation.
  • KIP-290: Bump up the protocol versions ACL create, describe and delete requests and responses.

KafkaClient.close() doesn't fully clean up

KakfaClient.close() terminates outstanding connections and fails operations that were waiting on requests in those brokers, but the client may internally retry due to those failures. This can lead to both:

  • Connection attempts in the endpoint_factory
  • Delayed calls in the reactor (for timeouts)

The reactor should be clean once the deferred returned by close() has fired.

This is of less impact than it might seem because Producer and Consumer clean up outstanding requests when closed. Once that happens KafkaClient.close() is sufficient.

Afkak does not set creation timestamps in kafka messages

We've noticed that our kafka messages arriving with time=-1. We were able to work around this by changing our kafka brokers' log.message.timestamp.type setting from the default of CreateTime to LogAppendTime. However, we'd prefer that kafka messages get stamped with the creation time, which we believe requires changes to the producer client.

Consumer: long max_fetch_time can cause timeouts

The Consumer argument fetch_max_wait_time sets the upper bound of how long the FetchRequest polls for results. The default is 150 milliseconds, meaning that Consumer issues more than 6 fetch requests per second (and you need one Consumer per partition).

The related fetch_size_bytes argument sets the desired amount of data to fetch. If a topic is low traffic or sees bursts of traffic you might want to reduce the overhead of polling by setting fetch_max_wait_time to a high value and fetch_size_bytes to a low one — say, 30 seconds and 1 byte respectively.

However, those settings can cause problems. The FetchRequest long poll will tie up a broker connection up to fetch_max_wait_time, after all. The broker only processes one request at a time. Worse, requests are sent down the connection (pipelined) before a response to the previous request, and at client-side timeout on the request begins when it is sent. This means that a slow FetchRequest can cause client-side timeouts of other FetchRequests (triggering retries) while the broker wastes time processing an already-expired FetchRequest that was pipelined.

The net result is that Afkak spews timeout errors and wastes broker resources. Depending on how messages are distributed among partitions this could prevent any forward progress.

The same mechanisms can also result in delay and possible duplication of ProduceRequests when a KafkaClient is shared between a Producer and a Consumer.

Workarounds

To avoid these problems:

  • Don't share a KafkaClient among multiple Consumers
  • Don't share a KafkaClient between a Consumer and a Producer

Obviously these aren't very satisfying, as each KafkaClient will maintain a separate metadata cache, increasing load to the backend.

Solutions

  • Broker clients shouldn't pipeline an unlimited number of requests.
  • Broker clients should treat long poll requests like FetchRequest specially. If multiple such requests must be sent to the same broker at once the broker client should open additional connections.
  • Consumer must learn to handle more than one partition at a time, reducing the total number of outstanding fetch requests.

Configure Travis-CI

We should run at least some of the tests automatically when a pull request is filed.

Afkak update for latest broker protocol support in broker version `3.4`

I'm documenting this quickly for internet reference and to gather feedback on the plan to address the items outlined here, which also align with those stated in #45.

Our immediate goal is to support the latest broker protocol, which at the time of writing is 3.x. This is no small task, considering we are currently at 0.9.0 and protocol version 0. In the process, we must also support the various intermediary protocols. With this in mind, I propose several changes to simplify the task and maintain backwards compatibility with the existing internal and public APIs throughout afkak.

  • Refactor afkak.common structs from named-tuples to a class: As mentioned in #85, the change from a named-tuple to a class is not backwards compatible in the current setup. For instance, adding a new timestamp field to the message/message set struct would be a breaking change. By refactoring to a class, we can make the timestamp field (and all other fields) optional within the struct, thus avoiding breaking changes.
  • Migrate from setup.py to pyproject.toml as per PEP-621 (completed in #131)
  • Set up formatting and update documentation (completed in #131)
  • Port to pytest for the benefit of fixtures
  • Add support for KIP-35 - Retrieving protocol version. Before adding support for new protocol versions, the client must be able to query the broker to determine the appropriate version to use.

Changes from current 0.9.0.1

The following are some notable changes and improvements. This list is not exhaustive and is subject to change. It aims to identify some changes in Kafka that could be incorporated into afkak to enhance its functionality:

  • Idempotent Producer kip-98: Introduced in Kafka 0.11.0.0, this significant feature enables producers to write to Kafka idempotently, increasing data consistency by reducing duplicates.
  • Record Headers: From Kafka 0.11.0.0 onwards, headers can be added to Kafka messages, allowing for the inclusion of arbitrary metadata for purposes such as routing, timestamps, or message schemas.
  • Exactly Once Semantics (EOS): Introduced in 0.11.0.0, EOS improves data consistency and reliability by ensuring each message is delivered exactly once.
  • KIP-421: This enhancement to the Kafka ListGroups API includes additional metadata fields that provide information about the state of consumer groups, improving monitoring and management.
  • KIP-429: Kafka Consumer Incremental Rebalance Protocol (2.3.0) - This proposal aims to reduce rebalance times and enhance overall consumer performance.
  • KIP-117: AdminClient API: This new public API facilitates interaction with Kafka for administrative tasks, offering a higher-level abstraction than existing low-level administrative interfaces.
  • KIP-339: Create a new IncrementalAlterConfigs API (2.2.0) - This proposal suggests a more flexible method of updating configurations incrementally.

Wishlist

  • Refactor afkak.kafkacodec: As discussed in #85, this area requires attention.

pip install does not work

root@spider:/code# pip install afkak
Collecting afkak
  Could not find a version that satisfies the requirement afkak (from versions: )
No matching distribution found for afkak

also is this plugin compatible with python3?

Python 3.6, 3.7 support

What's the status of py3.6 and py3.7 support? I see there's a prerelease build in the releases list that mentions WIP py3 support, but the readme in the repository only says it supports 3.5. Should I expect any issues with 3.6/3.7?

TestAfkakClientIntegration.test_produce_request intermittent failure

I've been seeing this failure intermittently on Travis CI:

======================================================================
ERROR: afkak.test.test_client_integration.TestAfkakClientIntegration.test_produce_request
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/ciena/afkak/afkak/test/testutil.py", line 87, in wrapper
    return func(self)
  File "/home/travis/build/ciena/afkak/.tox/pypy-int/site-packages/nose/twistedtools.py", line 143, in errback
    failure.raiseException()
  File "/home/travis/build/ciena/afkak/.tox/pypy-int/site-packages/twisted/internet/defer.py", line 1384, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/pypy-int/site-packages/twisted/python/failure.py", line 408, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/test/test_client_integration.py", line 102, in test_produce_request
    produce_resp, = yield self.client.send_produce_request([produce])
  File "/home/travis/build/ciena/afkak/.tox/pypy-int/site-packages/twisted/internet/defer.py", line 1386, in _inlineCallbacks
    result = g.send(result)
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 444, in send_produce_request
    returnValue(self._handle_responses(resps, fail_on_error, callback))
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 544, in _handle_responses
    check_error(resp)
  File "/home/travis/build/ciena/afkak/afkak/common.py", line 300, in check_error
    raise error(responseOrErrcode)
NotLeaderForPartitionError: ProduceResponse(topic='test_produce_request-XNMuQNvXuz', partition=0, error=6, offset=-1)
-------------------- >> begin captured logging << --------------------
2018-07-25 17:40:48,941 INFO testutil.py:154: Setting up test afkak.test.test_client_integration.TestAfkakClientIntegration.test_produce_request
2018-07-25 17:40:48,948 DEBUG client.py:260: <KafkaClient clientId=afkak.test.test_client_integration brokers=[('127.0.0.1', 34924), ('127.0.0.1', 42213), ('127.0.0.1', 49547)] timeout=2.5>: load_metadata_for_topics: ('test_produce_request-XNMuQNvXuz',)
2018-07-25 17:40:48,948 DEBUG client.py:794: _sbur: sending request: 32 to broker: <KafkaBrokerClient 127.0.0.1:34924 Id=afkak.test.test_client_integration Connected=True>
2018-07-25 17:40:48,949 DEBUG client.py:747: _mrtb: sending request: 32 to broker: <KafkaBrokerClient 127.0.0.1:34924 Id=afkak.test.test_client_integration Connected=True>
2018-07-25 17:40:48,965 DEBUG client.py:275: <KafkaClient clientId=afkak.test.test_client_integration brokers=[('127.0.0.1', 34924), ('127.0.0.1', 42213), ('127.0.0.1', 49547)] timeout=2.5>: Broker/Topic metadata: {0: BrokerMetadata(node_id=0, host='127.0.0.1', port=42213), 1: BrokerMetadata(node_id=1, host='127.0.0.1', port=34924), 2: BrokerMetadata(node_id=2, host='127.0.0.1', port=49547)}/{'test_produce_request-XNMuQNvXuz': TopicMetadata(topic='test_produce_request-XNMuQNvXuz', topic_error_code=5, partition_metadata={})}
2018-07-25 17:40:48,969 DEBUG client.py:659: <KafkaClient clientId=afkak.test.test_client_integration brokers=[('127.0.0.1', 34924), ('127.0.0.1', 42213), ('127.0.0.1', 49547)] timeout=2.5>: _update_brokers: [('127.0.0.1', 42213), ('127.0.0.1', 34924), ('127.0.0.1', 49547)] remove: False
2018-07-25 17:40:48,974 WARNING client.py:301: No partitions for test_produce_request-XNMuQNvXuz, Err:5
2018-07-25 17:40:48,994 DEBUG testutil.py:125: Still waiting topic creation: No metadata for topic test_produce_request-XNMuQNvXuz found..
2018-07-25 17:40:49,008 DEBUG client.py:260: <KafkaClient clientId=afkak.test.test_client_integration brokers=[('127.0.0.1', 34924), ('127.0.0.1', 42213), ('127.0.0.1', 49547)] timeout=2.5>: load_metadata_for_topics: ('test_produce_request-XNMuQNvXuz',)
2018-07-25 17:40:49,035 DEBUG client.py:794: _sbur: sending request: 33 to broker: <KafkaBrokerClient 127.0.0.1:34924 Id=afkak.test.test_client_integration Connected=True>
2018-07-25 17:40:49,036 DEBUG client.py:747: _mrtb: sending request: 33 to broker: <KafkaBrokerClient 127.0.0.1:34924 Id=afkak.test.test_client_integration Connected=True>
2018-07-25 17:40:49,047 DEBUG client.py:275: <KafkaClient clientId=afkak.test.test_client_integration brokers=[('127.0.0.1', 34924), ('127.0.0.1', 42213), ('127.0.0.1', 49547)] timeout=2.5>: Broker/Topic metadata: {0: BrokerMetadata(node_id=0, host='127.0.0.1', port=42213), 1: BrokerMetadata(node_id=1, host='127.0.0.1', port=34924), 2: BrokerMetadata(node_id=2, host='127.0.0.1', port=49547)}/{'test_produce_request-XNMuQNvXuz': TopicMetadata(topic='test_produce_request-XNMuQNvXuz', topic_error_code=5, partition_metadata={})}
2018-07-25 17:40:49,051 DEBUG client.py:659: <KafkaClient clientId=afkak.test.test_client_integration brokers=[('127.0.0.1', 34924), ('127.0.0.1', 42213), ('127.0.0.1', 49547)] timeout=2.5>: _update_brokers: [('127.0.0.1', 42213), ('127.0.0.1', 34924), ('127.0.0.1', 49547)] remove: False
2018-07-25 17:40:49,053 WARNING client.py:301: No partitions for test_produce_request-XNMuQNvXuz, Err:5
2018-07-25 17:40:49,078 DEBUG testutil.py:125: Still waiting topic creation: No metadata for topic test_produce_request-XNMuQNvXuz found..
2018-07-25 17:40:49,079 DEBUG client.py:260: <KafkaClient clientId=afkak.test.test_client_integration brokers=[('127.0.0.1', 34924), ('127.0.0.1', 42213), ('127.0.0.1', 49547)] timeout=2.5>: load_metadata_for_topics: ('test_produce_request-XNMuQNvXuz',)
2018-07-25 17:40:49,079 DEBUG client.py:794: _sbur: sending request: 34 to broker: <KafkaBrokerClient 127.0.0.1:49547 Id=afkak.test.test_client_integration Connected=True>
2018-07-25 17:40:49,080 DEBUG client.py:747: _mrtb: sending request: 34 to broker: <KafkaBrokerClient 127.0.0.1:49547 Id=afkak.test.test_client_integration Connected=True>
2018-07-25 17:40:49,083 DEBUG client.py:275: <KafkaClient clientId=afkak.test.test_client_integration brokers=[('127.0.0.1', 34924), ('127.0.0.1', 42213), ('127.0.0.1', 49547)] timeout=2.5>: Broker/Topic metadata: {0: BrokerMetadata(node_id=0, host='127.0.0.1', port=42213), 1: BrokerMetadata(node_id=1, host='127.0.0.1', port=34924), 2: BrokerMetadata(node_id=2, host='127.0.0.1', port=49547)}/{'test_produce_request-XNMuQNvXuz': TopicMetadata(topic='test_produce_request-XNMuQNvXuz', topic_error_code=0, partition_metadata={1: PartitionMetadata(topic='test_produce_request-XNMuQNvXuz', partition=1, partition_error_code=0, leader=1, replicas=(1, 2, 0), isr=(1, 2, 0)), 0: PartitionMetadata(topic='test_produce_request-XNMuQNvXuz', partition=0, partition_error_code=0, leader=0, replicas=(0, 1, 2), isr=(0, 1, 2))})}
2018-07-25 17:40:49,085 DEBUG client.py:659: <KafkaClient clientId=afkak.test.test_client_integration brokers=[('127.0.0.1', 34924), ('127.0.0.1', 42213), ('127.0.0.1', 49547)] timeout=2.5>: _update_brokers: [('127.0.0.1', 42213), ('127.0.0.1', 34924), ('127.0.0.1', 49547)] remove: False
2018-07-25 17:40:49,086 INFO testutil.py:127: Topic test_produce_request-XNMuQNvXuz exists. Partition metadata: [PartitionMetadata(topic='test_produce_request-XNMuQNvXuz', partition=0, partition_error_code=0, leader=0, replicas=(0, 1, 2), isr=(0, 1, 2)),
 PartitionMetadata(topic='test_produce_request-XNMuQNvXuz', partition=1, partition_error_code=0, leader=1, replicas=(1, 2, 0), isr=(1, 2, 0))]
2018-07-25 17:40:49,093 DEBUG client.py:747: _mrtb: sending request: 35 to broker: <KafkaBrokerClient 127.0.0.1:42213 Id=afkak.test.test_client_integration Connected=False>
2018-07-25 17:40:49,093 DEBUG brokerclient.py:385: <KafkaBrokerClient 127.0.0.1:42213 Id=afkak.test.test_client_integration Connected=False>: _connect
2018-07-25 17:40:49,112 DEBUG brokerclient.py:392: <KafkaBrokerClient 127.0.0.1:42213 Id=afkak.test.test_client_integration Connected=False>: _connect got connector: <twisted.internet.tcp.Connector instance at 0x00007fb18eeae560>
2018-07-25 17:40:49,117 DEBUG brokerclient.py:245: <KafkaBrokerClient 127.0.0.1:42213 Id=afkak.test.test_client_integration Connected=True>: buildProtocol:<afkak.protocol.KafkaProtocol instance at 0x00007fb18eeae5e0> addr:IPv4Address(TCP, '127.0.0.1', 42213)
2018-07-25 17:40:49,119 DEBUG client.py:604: Broker:<KafkaBrokerClient 127.0.0.1:42213 Id=afkak.test.test_client_integration Connected=True> state changed:Connected for reason:None
2018-07-25 17:40:49,176 ERROR client.py:546: Error found in response: ProduceResponse(topic='test_produce_request-XNMuQNvXuz', partition=0, error=6, offset=-1)
--------------------- >> end captured logging << ---------------------

https://travis-ci.org/ciena/afkak/jobs/408167176

TLS Failing

I have started a broker with tls enabled,

def tls_client_endpoint(reactor, host, port):
    return wrapClientTLS(
        optionsForClientTLS(hostname=host),
        HostnameEndpoint(reactor, host, port),
    )
@defer.inlineCallbacks
def ready_client(reactor, netloc, topic):
    client = KafkaClient(netloc, reactor=reactor, endpoint_factory=tls_client_endpoint)
    e = True
    while e:
        try:
            yield client.load_metadata_for_topics(topic)
            e = client.metadata_error_for_topic(topic)
            if e:
                log.info("Error getting metadata for topic {}: {} (will retry)".format(topic, e))
        except Exception as err:
            log.info("Error getting metadata for topic {}: {} (will retry)".format(topic, err))

    defer.returnValue(client)

with the above code snippet, I am trying to run the consumer, but I am noticing

(2021-12-03 21:37:53,235) [INFO] Error getting metadata for topic linus_logs_app: Unable to load metadata from configured hosts: <twisted.python.failure.Failure afkak.common.KafkaUnavailableError: Failed to bootstrap from hosts [('127.0.0.1', 9093)]> (will retry)
...
OpenSSL.SSL.Error: [('SSL routines', 'tls_process_server_certificate', 'certificate verify failed')]`

can someone help me debug the issue ?

Consumer: max fetch time defaults don't match

The Consumer class's fetch_max_wait_time parameter defaults to FETCH_MAX_WAIT_TIME = 100 milliseconds:

afkak/afkak/consumer.py

Lines 46 to 47 in b7e0a71

FETCH_MIN_BYTES = 64 * 1024 # server waits for min. 64K bytes of messages
FETCH_MAX_WAIT_TIME = 100 # server waits 100 millisecs for messages

Internally, Consumer calls KafkaClient.send_fetch_request(). This method has a max_wait_time parameter with a default value of KafkaClient.DEFAULT_FETCH_SERVER_WAIT_MSECS:

afkak/afkak/client.py

Lines 151 to 158 in b7e0a71

# Default timeout msec for fetch requests. This is how long the server
# will wait trying to get enough bytes of messages to fulfill the fetch
# request. When this times out on the server side, it sends back a
# response with as many bytes of messages as it has. See the docs for
# more caveats on this timeout.
DEFAULT_FETCH_SERVER_WAIT_MSECS = 5000
# Default minimum amount of message bytes sent back on a fetch request
DEFAULT_FETCH_MIN_BYTES = 4096

It doesn't make sense that there are two sets of defaults. It doesn't make sense that they differ.

The defaults in KafkaClient probably affect 0 real users, since everyone uses the Consumer wrapper. They don't really make sense, either. If you want to reduce polling overhead by increasing the fetch long poll period you probably want to reduce the fetch size to something like 1, lest you add tons of latency when the message rate is low.

Status of afkak wrt Kafka 1.0?

Hi,

I am evaluating message systems, and Kafka is a big contender. Unfortunately many of our client systems still run CentOS6, python3 is out of the question, and everything uses twisted. Since this library seems to be the most applicable one for my use case, I am wondering about the maintenance status. I've seen issues that mention some features of 0.9 not being implemented yet, and the last commit has also been a long time ago. More recent forks do not seem to exist.

What are the plans with this library? Thanks.

ResourceWarning running integration tests on Python 3.8

  afkak.test.int.test_client_integration
    TestAfkakClientIntegration
      test_commit_fetch_offsets ...                                          [OK]
  /opt/hostedtoolcache/Python/3.8.9/x64/lib/python3.8/subprocess.py:848: RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used

Version mismatch

bash-3.2$ git clone https://github.com/ciena/afkak.git
Cloning into 'afkak'...
remote: Enumerating objects: 78, done.
remote: Counting objects: 100% (78/78), done.
remote: Compressing objects: 100% (54/54), done.
remote: Total 4855 (delta 39), reused 51 (delta 23), pack-reused 4777
Receiving objects: 100% (4855/4855), 1.41 MiB | 1004.00 KiB/s, done.
Resolving
Resolving deltas: 100% (3655/3655), done.
bash-3.2$ cd afkak/
bash-3.2$ make toxi
Makefile:28: *** Version in setup.py (20.10.0) does not match afkak/init.py (20.9.0). Stop.
bash-3.2$

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.