Giter VIP home page Giter VIP logo

kaffe's Introduction

Kaffe

Module Version Hex Docs Total Download License Last Updated

An opinionated, highly specific, Elixir wrapper around Brod: the Erlang Kafka client. ☕

NOTE: Although we're using this in production at Spreedly it is still under active development. The API may change and there may be serious bugs we've yet to encounter.

Table of Contents generated with DocToc

Installation

  1. Add kaffe to your list of dependencies in mix.exs:

    def deps do
      [{:kaffe, "~> 1.0"}]
    end
  2. Ensure kaffe is started with your application:

    def application do
      [applications: [:logger, :kaffe]]
    end
  3. Configure a Kaffe Consumer and/or Producer

Kaffe Consumer Usage

Consumers receive a list of messages and work as part of the :brod_group_member behavior. This has a few important benefits:

  1. Group members assign a "subscriber" to each partition in the topic. Because Kafka topics scale with partitions, having a worker per partition usually increases throughput.
  2. Group members correctly handle partition assignments across multiple clients in a consumer group. This means that this mode of operation will scale horizontally (e.g., multiple dynos on Heroku).
  3. Downstream processing that benefits from batching (like writing to another Kafka topic) is more easily supported.

There is also legacy support for single message consumers, which process one message at a time using the :brod_group_subscriber behavior. This was the original mode of operation for Kaffe but is slow and does not scale. For this reason it is considered deprecated.

Kaffe GroupMember - Batch Message Consumer

  1. Define a handle_messages/1 function in the provided module.

    handle_messages/1 This function (note the pluralization) will be called with a list of messages, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.

    The module's handle_messages/1 function must return :ok or Kaffe will throw an error. The Kaffe consumer will block until your handle_messages/1 function returns :ok.

    defmodule MessageProcessor do
      def handle_messages(messages) do
        for %{key: key, value: value} = message <- messages do
          IO.inspect message
          IO.puts "#{key}: #{value}"
        end
        :ok # Important!
      end
    end
  2. The configuration options for the GroupMember consumer are a superset of those for Kaffe.Consumer, except for :async_message_ack, which is not supported. The additional options are:

    • :rebalance_delay_ms The time to allow for rebalancing among workers. The default is 10,000, which should give the consumers time to rebalance when scaling.

    • :max_bytes Limits the number of message bytes received from Kafka for a particular topic subscriber. The default is 1MB. This parameter might need tuning depending on the number of partitions in the topics being read (there is one subscriber per topic per partition). For example, if you are reading from two topics, each with 32 partitions, there is the potential of 64MB in buffered messages at any one time.

    • :min_bytes Sets a minimum threshold for the number of bytes to fetch for a batch of messages. The default is 0MB.

    • :max_wait_time Sets the maximum number of milliseconds that the broker is allowed to collect min_bytes of messages in a batch of messages.

    • :offset_reset_policy Controls how the subscriber handles an expired offset. See the Kafka consumer option, auto.offset.reset. Valid values for this option are:

      • :reset_to_earliest Reset to the earliest available offset.
      • :reset_to_latest Reset to the latest offset.
      • :reset_by_subscriber The subscriber receives the OffsetOutOfRange error.

    More information in the Brod consumer.

    • :worker_allocation_strategy Controls how workers are allocated with respect to consumed topics and partitions.

      • :worker_per_partition The default (for backward compatibilty) and allocates a single worker per partition across topics. This is useful for managing concurrent processing of messages that may be received from any consumed topic.

      • :worker_per_topic_partition This strategy allocates a worker per topic partition. This means there will be a worker for every topic partition consumed. Unless you need to control concurrency across topics, you should use this strategy.

      config :kaffe,
        consumer: [
          endpoints: [kafka: 9092],
          topics: ["interesting-topic"],
          consumer_group: "your-app-consumer-group",
          message_handler: MessageProcessor,
          offset_reset_policy: :reset_to_latest,
          max_bytes: 500_000,
          worker_allocation_strategy: :worker_per_topic_partition,
      
          #optional
          sasl: %{
            mechanism: :plain,
            login: System.get_env("KAFFE_PRODUCER_USER"),
            password: System.get_env("KAFFE_PRODUCER_PASSWORD")
          }
        ],
  3. Add Kaffe.GroupMemberSupervisor as a supervisor in your supervision tree.

    defmodule MyApp.Application do
      use Application
    
      def start(_type, _args) do
        children = [
          %{
            id: Kaffe.GroupMemberSupervisor,
            start: {Kaffe.GroupMemberSupervisor, :start_link, []},
            type: :supervisor
          }
        ]
    
        opts = [strategy: :one_for_one, name: MyApp.Application.Supervisor]
        Supervisor.start_link(children, opts)
      end
    end

Managing how offsets are committed

In some cases you may not want to commit back the most recent offset after processing a list of messages. For example, if you're batching messages to be sent elsewhere and want to ensure that a batch can be rebuilt should there be an error further downstream. In that example you might want to keep the offset of the first message in your batch so your consumer can restart back at that point to reprocess and rebatch the messages.

Your message handler can respond in the following ways to manage how offsets are committed back:

:ok - commit back the most recent offset and request more messages {:ok, :no_commit} - do not commit back the most recent offset and request more messages from the offset of the last message {:ok, offset} - commit back at the offset specified and request messages from that point forward

Example:

defmodule MessageProcessor do
  def handle_messages(messages) do
    for %{key: key, value: value} = message <- messages do
      IO.inspect message
      IO.puts "#{key}: #{value}"
    end
    {:ok, :no_commit}
  end
end

Kaffe Consumer - Single Message Consumer (Deprecated)

For backward compatibility only! Kaffe.GroupMemberSupervisor is recommended instead!

  1. Add a handle_message/1 function to a local module (e.g. MessageProcessor). This function will be called with each Kafka message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.

    The module's handle_message/1 function must return :ok or Kaffe will throw an error. In normal (synchronous consumer) operation the Kaffe consumer will block until your handle_message/1 function returns :ok.

    Example

    defmodule MessageProcessor do
      def handle_message(%{key: key, value: value} = message) do
        IO.inspect message
        IO.puts "#{key}: #{value}"
        :ok # The handle_message function MUST return :ok
      end
    end

    Message Structure

    %{
      attributes: 0,
      crc: 1914336469,
      key: "kafka message key",
      magic_byte: 0,
      offset: 41,
      partition: 17,
      topic: "some-kafka-topic",
      value: "the actual kafka message value is here",
      ts: 1234567890123, # timestamp in milliseconds
      ts_type: :append  # timestamp type: :undefined | :create | :append
    }
  2. Configure your Kaffe Consumer in your mix config

    config :kaffe,
      consumer: [
        endpoints: [kafka: 9092], # that's [hostname: kafka_port]
        topics: ["interesting-topic"], # the topic(s) that will be consumed
        consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
        message_handler: MessageProcessor, # the module from Step 1 that will process messages
    
        # optional
        async_message_ack: false, # see "async message acknowledgement" below
        start_with_earliest_message: true # default false
      ],

    The start_with_earliest_message field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted, your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.

    Heroku Configuration

    To configure a Kaffe Consumer for a Heroku Kafka compatible environment including SSL omit the endpoint and instead set heroku_kafka_env: true

    config :kaffe,
      consumer: [
        heroku_kafka_env: true,
        topics: ["interesting-topic"],
        consumer_group: "your-app-consumer-group",
        message_handler: MessageProcessor
      ]

    With that setting in place Kaffe will automatically pull required info from the following ENV variables:

    • KAFKA_URL
    • KAFKA_CLIENT_CERT
    • KAFKA_CLIENT_CERT_KEY
    • KAFKA_TRUSTED_CERT (not used yet)
  3. Add Kaffe.Consumer as a worker in your supervision tree

    worker(Kaffe.Consumer, [])

async message acknowledgement

If you need asynchronous message consumption:

  1. Add a handle_message/2 function to your processing module. This function will be called with the Consumer pid and the Kafka message. When your processing is complete you will need to call Kaffe.Consumer.ack(pid, message) to acknowledge the offset.

  2. Set async to true when you start the Kaffe.Consumer

    consumer_group = "demo-commitlog-consumer"
    topic = "commitlog"
    message_handler = MessageProcessor
    async = true
    
    worker(Kaffe.Consumer, [consumer_group, topics, message_handler, async])
    
    # … in your message handler module
    
    def handle_message(pid, message) do
      spawn_message_processing_worker(pid, message)
      :ok # MUST return :ok
    end
    
    # … somewhere in your system when the worker is finished processing
    
    Kaffe.Consumer.ack(pid, message)

NOTE: Asynchronous consumption means your system will no longer provide any backpressure to the Kaffe.Consumer. You will also need to add robust measures to your system to ensure that no messages are lost in processing. I.e., if you spawn 5 workers processing a series of asynchronous messages from Kafka and 1 of them crashes without acknowledgement then it's possible and likely that the message will be skipped entirely.

Kafka only tracks a single numeric offset, not individual messages. If a message fails and a later offset is committed then the failed message will not be sent again.

It's possible that your topic and system are entirely ok with losing some messages (i.e. frequent metrics that aren't individually important).

Kaffe Producer Usage

Kaffe.Producer handles producing messages to Kafka and will automatically select the topic partitions per message or can be given a function to call to determine the partition per message. Kaffe automatically inserts a Kafka timestamp with each message.

Configure your Kaffe Producer in your mix config

config :kaffe,
  producer: [
    endpoints: [kafka: 9092], # [hostname: port]
    topics: ["kafka-topic"],

    # optional
    partition_strategy: :md5,
    ssl: true,
    sasl: %{
      mechanism: :plain,
      login: System.get_env("KAFFE_PRODUCER_USER"),
      password: System.get_env("KAFFE_PRODUCER_PASSWORD")
    }
  ]

The partition_strategy setting can be one of:

  • :md5: (default) provides even and deterministic distribution of the messages over the available partitions based on an MD5 hash of the key
  • :random: select a random partition for each message
  • function: a given function to call to determine the correct partition

You can also set any of the Brod producer configuration options in the producer section - see the Brod sources for a list of keys and their meaning.

If the Kafka broker is configured with SASL_PLAINTEXT auth, the sasl option can be added.

If using Confluent Hosted Kafka, also add ssl: true as shown above.

Heroku Configuration

To configure a Kaffe Producer for a Heroku Kafka compatible environment, including SSL, omit the endpoint and instead set heroku_kafka_env: true

config :kaffe,
  producer: [
    heroku_kafka_env: true,
    topics: ["kafka-topic"],

    # optional
    partition_strategy: :md5
  ]

With that setting in place Kaffe will automatically pull required info from the following ENV variables:

  • KAFKA_URL
  • KAFKA_CLIENT_CERT
  • KAFKA_CLIENT_CERT_KEY
  • KAFKA_TRUSTED_CERT

Producing to Kafka

Currently only synchronous message production is supported.

There are several ways to produce:

  • topic/message_list - Produce each message in the list to the given topic. The messages are produced to the correct partition based on the configured partitioning strategy.

    Each item in the list is a tuple of the key and value: {key, value}.

    Kaffe.Producer.produce_sync("topic", [{"key1", "value1"}, {"key2", "value2"}])
  • topic/partition/message_list - Produce each message in the list to the given topic/partition.

    Each item in the list is a tuple of the key and value: {key, value}.

    Kaffe.Producer.produce_sync("topic", 2, [{"key1", "value1"}, {"key2", "value2"}])
  • key/value - The key/value will be produced to the first topic given to the producer when it was started. The partition will be selected with the chosen strategy or given function.

    Kaffe.Producer.produce_sync("key", "value")
  • topic/key/value - The key/value will be produced to the given topic.

    Kaffe.Producer.produce_sync("whitelist", "key", "value")
  • topic/partition/key/value - The key/value will be produced to the given topic/partition.

    Kaffe.Producer.produce_sync("whitelist", 2, "key", "value")

    NOTE: With this approach Kaffe will not calculate the next partition since it assumes you're taking over that job by giving it a specific partition.

Testing

Setup

In order to run the end-to-end tests, a Kafka topic is required. It must:

  • be named kaffe-test
  • have 32 partitions

If using the kafka-topics.sh script that comes with the Kafka distribution, you may use something like:

kafka-topics.sh --zookeeper localhost:2181 --create --partitions 32 --replication-factor 1 --topic kaffe-test

Running

# unit tests
mix test
# end to end test
mix test --only e2e

Copyright and License

Copyright (c) 2017 Spreedly, Inc.

This software is released under the MIT License.

kaffe's People

Contributors

alexkovalevych avatar andyleclair avatar britth avatar dams avatar davidsantoso avatar edmondfrank avatar fatcatt316 avatar hdeters avatar jessiahr avatar kianmeng avatar madshargreave avatar marinakr avatar mortezahosseini avatar reachfh avatar redmaner avatar rodrigues avatar rwdaigle avatar schwarzgeist avatar sdball avatar shamilpd avatar zeneixe avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kaffe's Issues

Tests!

We need tests! At least some tests!

GroupCoordinatorNotAvailable error

I have been using Kaffe successfully for connecting to Kafka running Docker on Windows 10 (my Docker-compose file can be found here). However, when updating the Kafka Docker images, it stopped working, and I received below exception:

15:15:10.634 [info]  group coordinator (groupId=test-consumer-group,memberId=,generation=0,pid=#PID<0.179.0>):
failed to join group
reason::GroupCoordinatorNotAvailable

At the same time, the Kafka broker reported

broker_1           | [2017-09-13 13:20:28,975] ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

Clearly, I can go back to an older Docker image, but this newer image is working with the JavaScript client, so I hope that Kaffe can use it too.

[Question] Multiple consumers multiple Kafka clusters

Hi,
I've got another newbie question about Kaffe. I've got two different clusters and I would like to set up consumers for both of them in the same app. Unfortunately I'm at a loss on how to configure Kaffe in my config.ex for this.

Any suggestions?

Update to brod 2.4.1

In 2.3.4 brod fixed a "block forever" race condition that would cause the brod client to become unresponsive. We should go ahead and get updated to the latest version: 2.4.1.

kaffe cannot recover from unreachable Kafka

Hello!

I was integrating kaffe into a consumer-only application I'm working on and I've encountered the following behaviour:

(Assuming I have everything set up pretty much default using the examples from the README)

(1) If I start my application and Kafka is not reachable (e.g. getting econnrefused), kaffe never recovers from that. If Kafka comes back online, the errors

error] gen_server my_consumer_group terminated with reason: [{{"localhost",9092},{econnrefused....

stop, but my consumers are never initialized.

(2) If, during normal use of my app, Kafka becomes unreachable, kaffe will never recover from this. When Kafka is reachable again, the error messages stop, but consumers are not re-created.

Am I making a mistake here? Do I need a special ceremony to make this work? I tried wrapping the Kaffe.GroupMemberSupervisor with a supervisor3 so that it can be restarted after a delay in case it dies, but this didn't seem to solve the issue.

I'm using Elixir 1.9.0 and kaffe 1.14.1 on MacOS 10.14.6.

Configuration examples are wrong

Hey! I use following versions:

Erlang/OTP 21 [erts-10.0]
Elixir 1.7.0
Kaffe 1.9.0

When I try configure kaffe in my new application by following instructions from your readme I got this error:

16:12:40.775 [error] GenServer :kaffe_producer_client terminating
** (FunctionClauseError) no function clause matching in :kpro_connection.host/1
    (kafka_protocol) src/kpro_connection.erl:553: :kpro_connection.host(:localhost)
    (kafka_protocol) src/kpro_connection.erl:100: :kpro_connection.start/3
    (kafka_protocol) src/kpro_brokers.erl:303: :kpro_brokers.connect_any/3
    (brod) src/brod_client.erl:510: :brod_client.ensure_metadata_connection/1
    (brod) src/brod_client.erl:268: :brod_client.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

Quick exploring of kpro_connection file shows that host/1 function is expects for string arg: https://github.com/klarna/kafka_protocol/blob/master/src/kpro_connection.erl#L564

So, this error can be fixed by changing endpoints: [kafka: 9092] to endpoints: [{"kafka", 9092}]

Also I created a demo application: https://github.com/TakteS/kaffe_test
You can start it with iex -S mix and error will occurs

Can't Start Kaffe Consumer

Hi,

I am unsure when this happened, but I was experiencing a new issue recently. With Elixir 1.4, this fails silently. On 1.3, we get the following:

email-service_1  | Generated email_service app
email-service_1  | 
email-service_1  | =INFO REPORT==== 22-Jun-2017::14:48:30 ===
email-service_1  |     application: email_service
email-service_1  |     exited: {{shutdown,
email-service_1  |               {failed_to_start_child,'Elixir.Kaffe.Consumer',
email-service_1  |                {noproc,
email-service_1  |                 {gen_server,call,
email-service_1  |                  [brod_sup,
email-service_1  |                   {start_child,
email-service_1  |                    {'email-service',
email-service_1  |                     {brod_client,start_link,
email-service_1  |                      [[{kafka,9092}],
email-service_1  |                       'email-service',
email-service_1  |                       [{auto_start_producers,false},
email-service_1  |                        {allow_topic_auto_creation,false},
email-service_1  |                        {begin_offset,earliest}]]},
email-service_1  |                     {permanent,10},
email-service_1  |                     5000,worker,
email-service_1  |                     [brod_client]}},
email-service_1  |                   infinity]}}}},
email-service_1  |              {'Elixir.EmailService.Application',start,[normal,[]]}}
email-service_1  |     type: temporary
email-service_1  | ** (Mix) Could not start application email_service: EmailService.Application.start(:normal, []) returned an error: shutdown: failed to start child: Kaffe.Consumer
email-service_1  |     ** (EXIT) exited in: :gen_server.call(:brod_sup, {:start_child, {:"email-service", {:brod_client, :start_link, [[kafka: 9092], :"email-service", [auto_start_producers: false, allow_topic_auto_creation: false, begin_offset: :earliest]]}, {:permanent, 10}, 5000, :worker, [:brod_client]}}, :infinity)
email-service_1  |         ** (EXIT) no process

Low-volume consumers regularly replay old partition offsets

We're experiencing a recurring issue with low-volume consumers (like for error topics) where the partition offset isn't being regularly ack'd back to the brokers causing the same messages to be replayed. While some offset replay is expected with higher volume topics, I don't think it should be happening with low volume ones as well.

Consider the following evidence:

Every time the canary (kafkacat-consumer) dyno restarts, it reads in a few messages. That's suspect (knowing our error message production - from other dynos - is not that even).

Consider also that at each of these dyno restarts, the partition offsets of the messages received by the consumer don't always increase! See partition29 as an example, where it drifts sideways for two measurements (which also should never happen), then up, then down.

This tells me that Kaffe consumers for low-volume topics don't ack back their offsets to the broker frequently enough and when they're restarted they start back on an offset that was already received. I know we've looked at this in the past, and I believe you said every 5s the partition is ack'd, but I see reason to believe that's not the case. It appears to be more volume based than anything (once very x messages?).

We should make sure low-volume consumption behaves more predictably.

Abstract away brod

Currently clients of Kaffe need to setup their own brod clients. That is suboptimal. Instead Kaffe should encapsulate a set of brod client standards. The Kaffe clients should only need to specify the broker endpoints.

Support per-partition workers

Due to some limitations in Brod, Kaffe is not readily scalable.

However, in that same issue, the Brod maintainers suggested some ways to work around the limitations. The approach taken by Brucke seems particularly interesting.

We need to investigate these approaches so we can get higher throughput when processing larger volumes of Kafka data.

Not all partitions are processed

I was seeing some odd behavior where one topic in a consumer group being processed fully, but on another topic, a few of the partitions are completely ignored for a good while (even after the other partitions have no lag).

I'm not sure why, but lowering the max_bytes from the default to 100_000 helped resolve this issue.

I'm on kaffe 1.4.0, elixir 1.5.2. Running in an umbrella app under an erlang release created by distillery. 2 Kafka nodes, 2 nodes of my application, 2 topics, each with 10 partitions.

Connecting to a TLS-based Kafka instance under AWS MSK?

In the Kaffe documentation, it appears that the only SSL-based Kafka connections that are discussed are for Heroku Kafka environments.

We have an Elixir application that needs to connect via TLS to an AWS MSK instance of Kafka. Can this be supported under Kaffe? If so, are there any docs or examples for this?

Thank you very much.

More brod options?

Hi

Is is planned to add an option to supply the remaining brod consumer options like min_bytes and max_wait_time

Support connecting to Confluent Kafka

I'd like to connect to Confluent Cloud's managed Kafka clusters using this library, but its proving to be tricky. The following producer config does currently not work with confluent cloud:

config :kaffe,
  ...
  producer: [
    # [hostname: port]
    endpoints: ["my-confluent-cloud-host": 9092],
    topics: ["test-topic"],

    # optional
    partition_strategy: :random,
    sasl: %{
      mechanism: :plain,
      login: "xxxxxxxxxxxxxxxx",
      password: "xxxxxxxxxxxxxxxx"
    }
  ]

The error I get is:

iex(1)>
15:56:38.906 [error] Failed to connect to my-confluent-cloud-host:9092                                                  
Make sure 'ssl' option is in client config, 
or make sure connecting to 'SASL_PLAINTEXT://' listener

This bubbles up from kafka_protocol (see: https://github.com/klarna/kafka_protocol/blob/master/src/kpro_connection.erl#L542) and is because confluent expects SASL_PLAIN, i.o.w. SASL over SSL.

I see there is support for Heroku, but I don't have a cert/key from confluent I can use. Do I need to generate this myself? Is confluent cloud generally supported?

Support starting multiple consumers

With Heroku Kafka we have topics with a lot of partitions! In the Heroku Kaffe setup #8 we support consuming from all of them but we have the opportunity to have more than one consuming process in the same consumer group to increase message processing parallelization.

Missing documentation

I'm looking to use Kaffe to create Kafka consumer. My use case requires that I control when an and what offset is committed. The Managing how offsets are committed section of the README seems to be missing a paragraph or two on how to actually commit an offset if one chooses to return {:ok, :no_commit} as the return value of handle_messages/1.

Any help would be appreciated.

Partition allocations duplicated when using multiple dynos

When scaling kafka-whitelist-index on Heroku to 2 dynos, it appears that partition assignments are identical for each dyno.

This causes messages to be consumed by each dyno, which limits the effective scalability of the application.

2017-03-08T19:47:29.145314+00:00 heroku[consumer.2]: State changed from up to down
2017-03-08T19:47:29.147891+00:00 heroku[consumer.1]: State changed from up to down
2017-03-08T19:47:29.854918+00:00 heroku[consumer.2]: Stopping all processes with SIGTERM
2017-03-08T19:47:30.022952+00:00 heroku[consumer.1]: Stopping all processes with SIGTERM
2017-03-08T19:47:30.221263+00:00 heroku[consumer.2]: Process exited with status 143
2017-03-08T19:47:30.372872+00:00 heroku[consumer.1]: Process exited with status 143
2017-03-08T19:47:28.804552+00:00 app[api]: Scaled to consumer@0:Standard-1X web@0:Standard-1X by user [email protected]
2017-03-08T19:47:13+00:00 app[heroku-postgres]: source=DATABASE sample#current_transaction=25179184.0 sample#db_size=6582024728.0bytes sample#tables=2 sample#active-connections=24 sample#waiting-connections=0 sample#index-cache-hit-rate=0.98828 sample#table-cache-hit-rate=0.95299 sample#load-avg-1m=0.08 sample#load-avg-5m=0.045 sample#load-avg-15m=0.045 sample#read-iops=3.1208 sample#write-iops=37.497 sample#memory-total=8173804.0kB sample#memory-free=146208kB sample#memory-cached=7584544.0kB sample#memory-postgres=71836.0kB
2017-03-08T19:47:58+00:00 app[heroku-postgres]: source=DATABASE sample#current_transaction=25179922.0 sample#db_size=6582245912.0bytes sample#tables=2 sample#active-connections=3 sample#waiting-connections=0 sample#index-cache-hit-rate=0.98828 sample#table-cache-hit-rate=0.95299 sample#load-avg-1m=0.03 sample#load-avg-5m=0.035 sample#load-avg-15m=0.045 sample#read-iops=0.45763 sample#write-iops=0.034958 sample#memory-total=8173804.0kB sample#memory-free=245564kB sample#memory-cached=7585036.0kB sample#memory-postgres=20156.0kB
2017-03-08T19:49:08.809026+00:00 app[api]: Scaled to consumer@2:Standard-1X web@0:Standard-1X by user [email protected]
2017-03-08T19:49:14.155070+00:00 heroku[consumer.2]: Starting process with command `mix run --no-halt`
2017-03-08T19:49:14.467206+00:00 heroku[consumer.1]: Starting process with command `mix run --no-halt`
2017-03-08T19:49:14.826415+00:00 heroku[consumer.2]: State changed from starting to up
2017-03-08T19:49:15.205487+00:00 heroku[consumer.1]: State changed from starting to up
2017-03-08T19:49:20.101522+00:00 app[consumer.2]: group coordinator (groupId=index,memberId=,generation=0,pid=#PID<0.195.0>):
2017-03-08T19:49:20.101542+00:00 app[consumer.2]: connected to group coordinator ec2-34-195-140-72.compute-1.amazonaws.com:9096
2017-03-08T19:49:20.142827+00:00 app[consumer.2]: group coordinator (groupId=index,memberId=nonode@nohost/<0.195.0>-59c8c562-c8de-4464-9360-dfeb0a719e98,generation=1,pid=#PID<0.195.0>):
2017-03-08T19:49:20.142840+00:00 app[consumer.2]: elected=true
2017-03-08T19:49:20.143712+00:00 app[consumer.2]: group coordinator (groupId=index,memberId=nonode@nohost/<0.195.0>-59c8c562-c8de-4464-9360-dfeb0a719e98,generation=1,pid=#PID<0.195.0>):
2017-03-08T19:49:20.143714+00:00 app[consumer.2]: assignments received:
2017-03-08T19:49:20.143715+00:00 app[consumer.2]: index-batch:
2017-03-08T19:49:20.143726+00:00 app[consumer.2]:     partition=0 begin_offset=735120
2017-03-08T19:49:20.143727+00:00 app[consumer.2]:     partition=1 begin_offset=690076
2017-03-08T19:49:20.143728+00:00 app[consumer.2]:     partition=2 begin_offset=621090
2017-03-08T19:49:20.143728+00:00 app[consumer.2]:     partition=3 begin_offset=599866
2017-03-08T19:49:20.143729+00:00 app[consumer.2]:     partition=4 begin_offset=571337
2017-03-08T19:49:20.143729+00:00 app[consumer.2]:     partition=5 begin_offset=532940
2017-03-08T19:49:20.143730+00:00 app[consumer.2]:     partition=6 begin_offset=549349
2017-03-08T19:49:20.143730+00:00 app[consumer.2]:     partition=7 begin_offset=536644
2017-03-08T19:49:20.143731+00:00 app[consumer.2]:     partition=8 begin_offset=493229
2017-03-08T19:49:20.143731+00:00 app[consumer.2]:     partition=9 begin_offset=478531
2017-03-08T19:49:20.143732+00:00 app[consumer.2]:     partition=10 begin_offset=455589
2017-03-08T19:49:20.143733+00:00 app[consumer.2]:     partition=11 begin_offset=437179
2017-03-08T19:49:20.143733+00:00 app[consumer.2]:     partition=12 begin_offset=445078
2017-03-08T19:49:20.143734+00:00 app[consumer.2]:     partition=13 begin_offset=426732
2017-03-08T19:49:20.143734+00:00 app[consumer.2]:     partition=14 begin_offset=403720
2017-03-08T19:49:20.143735+00:00 app[consumer.2]:     partition=15 begin_offset=414390
2017-03-08T19:49:20.143736+00:00 app[consumer.2]:     partition=16 begin_offset=414437
2017-03-08T19:49:20.143736+00:00 app[consumer.2]:     partition=17 begin_offset=412652
2017-03-08T19:49:20.143737+00:00 app[consumer.2]:     partition=18 begin_offset=406215
2017-03-08T19:49:20.143737+00:00 app[consumer.2]:     partition=19 begin_offset=409315
2017-03-08T19:49:20.143738+00:00 app[consumer.2]:     partition=20 begin_offset=397055
2017-03-08T19:49:20.143738+00:00 app[consumer.2]:     partition=21 begin_offset=396992
2017-03-08T19:49:20.143739+00:00 app[consumer.2]:     partition=22 begin_offset=398619
2017-03-08T19:49:20.143740+00:00 app[consumer.2]:     partition=23 begin_offset=387111
2017-03-08T19:49:20.143740+00:00 app[consumer.2]:     partition=24 begin_offset=374766
2017-03-08T19:49:20.143741+00:00 app[consumer.2]:     partition=25 begin_offset=373108
2017-03-08T19:49:20.143741+00:00 app[consumer.2]:     partition=26 begin_offset=361589
2017-03-08T19:49:20.143742+00:00 app[consumer.2]:     partition=27 begin_offset=377395
2017-03-08T19:49:20.143742+00:00 app[consumer.2]:     partition=28 begin_offset=354547
2017-03-08T19:49:20.143743+00:00 app[consumer.2]:     partition=29 begin_offset=340911
2017-03-08T19:49:20.143744+00:00 app[consumer.2]:     partition=30 begin_offset=347559
2017-03-08T19:49:20.143744+00:00 app[consumer.2]:     partition=31 begin_offset=347518
2017-03-08T19:49:20.143745+00:00 app[consumer.2]: whitelist:
2017-03-08T19:49:20.143745+00:00 app[consumer.2]:     partition=0 begin_offset=23083880
2017-03-08T19:49:20.143746+00:00 app[consumer.2]:     partition=1 begin_offset=2103280
2017-03-08T19:49:20.143746+00:00 app[consumer.2]:     partition=2 begin_offset=2132920
2017-03-08T19:49:20.143747+00:00 app[consumer.2]:     partition=3 begin_offset=2097410
2017-03-08T19:49:20.143747+00:00 app[consumer.2]:     partition=4 begin_offset=2094067
2017-03-08T19:49:20.143748+00:00 app[consumer.2]:     partition=5 begin_offset=2122689
2017-03-08T19:49:20.143748+00:00 app[consumer.2]:     partition=6 begin_offset=2102436
2017-03-08T19:49:20.143749+00:00 app[consumer.2]:     partition=7 begin_offset=2127751
2017-03-08T19:49:20.143750+00:00 app[consumer.2]:     partition=8 begin_offset=2095652
2017-03-08T19:49:20.143750+00:00 app[consumer.2]:     partition=9 begin_offset=2081194
2017-03-08T19:49:20.143751+00:00 app[consumer.2]:     partition=10 begin_offset=2100417
2017-03-08T19:49:20.143751+00:00 app[consumer.2]:     partition=11 begin_offset=2090914
2017-03-08T19:49:20.143752+00:00 app[consumer.2]:     partition=12 begin_offset=2089227
2017-03-08T19:49:20.143753+00:00 app[consumer.2]:     partition=13 begin_offset=2083039
2017-03-08T19:49:20.143753+00:00 app[consumer.2]:     partition=14 begin_offset=2091469
2017-03-08T19:49:20.143754+00:00 app[consumer.2]:     partition=15 begin_offset=2109835
2017-03-08T19:49:20.143754+00:00 app[consumer.2]:     partition=16 begin_offset=2080804
2017-03-08T19:49:20.143755+00:00 app[consumer.2]:     partition=17 begin_offset=2074752
2017-03-08T19:49:20.143755+00:00 app[consumer.2]:     partition=18 begin_offset=2106322
2017-03-08T19:49:20.143756+00:00 app[consumer.2]:     partition=19 begin_offset=2079642
2017-03-08T19:49:20.143756+00:00 app[consumer.2]:     partition=20 begin_offset=2082422
2017-03-08T19:49:20.143757+00:00 app[consumer.2]:     partition=21 begin_offset=2076804
2017-03-08T19:49:20.143758+00:00 app[consumer.2]:     partition=22 begin_offset=2081602
2017-03-08T19:49:20.143759+00:00 app[consumer.2]:     partition=23 begin_offset=2076934
2017-03-08T19:49:20.143759+00:00 app[consumer.2]:     partition=24 begin_offset=2079587
2017-03-08T19:49:20.143760+00:00 app[consumer.2]:     partition=25 begin_offset=2060178
2017-03-08T19:49:20.143760+00:00 app[consumer.2]:     partition=26 begin_offset=2078634
2017-03-08T19:49:20.143761+00:00 app[consumer.2]:     partition=27 begin_offset=2075673
2017-03-08T19:49:20.143761+00:00 app[consumer.2]:     partition=28 begin_offset=2070900
2017-03-08T19:49:20.143762+00:00 app[consumer.2]:     partition=29 begin_offset=2062593
2017-03-08T19:49:20.143762+00:00 app[consumer.2]:     partition=30 begin_offset=2067980
2017-03-08T19:49:20.143763+00:00 app[consumer.2]:     partition=31 begin_offset=2061661
2017-03-08T19:49:20.143767+00:00 app[consumer.2]: client :index connected to ec2-34-195-140-72.compute-1.amazonaws.com:9096
2017-03-08T19:49:20.143768+00:00 app[consumer.2]: 
2017-03-08T19:49:20.143769+00:00 app[consumer.2]: client :index connected to ec2-34-195-140-195.compute-1.amazonaws.com:9096
2017-03-08T19:49:20.143769+00:00 app[consumer.2]: 
2017-03-08T19:49:20.220675+00:00 app[consumer.2]: client :index connected to ec2-34-195-135-147.compute-1.amazonaws.com:9096
2017-03-08T19:49:20.220678+00:00 app[consumer.2]: 
2017-03-08T19:49:21.771443+00:00 app[consumer.1]: group coordinator (groupId=index,memberId=,generation=0,pid=#PID<0.195.0>):
2017-03-08T19:49:21.771469+00:00 app[consumer.1]: connected to group coordinator ec2-34-195-140-72.compute-1.amazonaws.com:9096
2017-03-08T19:49:23.442157+00:00 app[consumer.2]: group coordinator (groupId=index,memberId=nonode@nohost/<0.195.0>-59c8c562-c8de-4464-9360-dfeb0a719e98,generation=1,pid=#PID<0.195.0>):
2017-03-08T19:49:23.442167+00:00 app[consumer.2]: re-joining group, reason::RebalanceInProgress
2017-03-08T19:49:30.257546+00:00 heroku[consumer.1]: source=consumer.1 dyno=heroku.62069566.f40175bf-764b-4e24-a96d-0abbfe1b5b92 sample#memory_total=45.63MB sample#memory_rss=45.04MB sample#memory_cache=0.59MB sample#memory_swap=0.00MB sample#memory_pgpgin=13651pages sample#memory_pgpgout=2480pages sample#memory_quota=512.00MB
2017-03-08T19:49:31.355289+00:00 app[consumer.1]: group coordinator (groupId=index,memberId=nonode@nohost/<0.195.0>-526d20e3-a23a-4c24-9c7a-ee38c210b88f,generation=2,pid=#PID<0.195.0>):
2017-03-08T19:49:31.355299+00:00 app[consumer.1]: elected=true
2017-03-08T19:49:31.429594+00:00 app[consumer.1]: group coordinator (groupId=index,memberId=nonode@nohost/<0.195.0>-526d20e3-a23a-4c24-9c7a-ee38c210b88f,generation=2,pid=#PID<0.195.0>):
2017-03-08T19:49:31.429597+00:00 app[consumer.1]: assignments received:
2017-03-08T19:49:31.429598+00:00 app[consumer.1]: index-batch:
2017-03-08T19:49:31.429599+00:00 app[consumer.1]:     partition=0 begin_offset=735120
2017-03-08T19:49:31.429613+00:00 app[consumer.1]:     partition=1 begin_offset=690076
2017-03-08T19:49:31.429614+00:00 app[consumer.1]:     partition=2 begin_offset=621090
2017-03-08T19:49:31.429615+00:00 app[consumer.1]:     partition=3 begin_offset=599866
2017-03-08T19:49:31.429615+00:00 app[consumer.1]:     partition=4 begin_offset=571337
2017-03-08T19:49:31.429616+00:00 app[consumer.1]:     partition=5 begin_offset=532940
2017-03-08T19:49:31.429617+00:00 app[consumer.1]:     partition=6 begin_offset=549349
2017-03-08T19:49:31.429617+00:00 app[consumer.1]:     partition=7 begin_offset=536644
2017-03-08T19:49:31.429618+00:00 app[consumer.1]:     partition=8 begin_offset=493229
2017-03-08T19:49:31.429619+00:00 app[consumer.1]:     partition=9 begin_offset=478531
2017-03-08T19:49:31.429619+00:00 app[consumer.1]:     partition=10 begin_offset=455589
2017-03-08T19:49:31.429620+00:00 app[consumer.1]:     partition=11 begin_offset=437179
2017-03-08T19:49:31.429620+00:00 app[consumer.1]:     partition=12 begin_offset=445078
2017-03-08T19:49:31.429621+00:00 app[consumer.1]:     partition=13 begin_offset=426732
2017-03-08T19:49:31.429622+00:00 app[consumer.1]:     partition=14 begin_offset=403720
2017-03-08T19:49:31.429622+00:00 app[consumer.1]:     partition=15 begin_offset=414390
2017-03-08T19:49:31.429623+00:00 app[consumer.1]:     partition=16 begin_offset=414437
2017-03-08T19:49:31.429623+00:00 app[consumer.1]:     partition=17 begin_offset=412652
2017-03-08T19:49:31.429624+00:00 app[consumer.1]:     partition=18 begin_offset=406215
2017-03-08T19:49:31.429625+00:00 app[consumer.1]:     partition=19 begin_offset=409315
2017-03-08T19:49:31.429625+00:00 app[consumer.1]:     partition=20 begin_offset=397055
2017-03-08T19:49:31.429626+00:00 app[consumer.1]:     partition=21 begin_offset=396992
2017-03-08T19:49:31.429626+00:00 app[consumer.1]:     partition=22 begin_offset=398619
2017-03-08T19:49:31.429627+00:00 app[consumer.1]:     partition=23 begin_offset=387111
2017-03-08T19:49:31.429627+00:00 app[consumer.1]:     partition=24 begin_offset=374766
2017-03-08T19:49:31.429628+00:00 app[consumer.1]:     partition=25 begin_offset=373108
2017-03-08T19:49:31.429629+00:00 app[consumer.1]:     partition=26 begin_offset=361589
2017-03-08T19:49:31.429629+00:00 app[consumer.1]:     partition=27 begin_offset=377395
2017-03-08T19:49:31.429630+00:00 app[consumer.1]:     partition=28 begin_offset=354547
2017-03-08T19:49:31.429630+00:00 app[consumer.1]:     partition=29 begin_offset=340911
2017-03-08T19:49:31.429631+00:00 app[consumer.1]:     partition=30 begin_offset=347559
2017-03-08T19:49:31.429632+00:00 app[consumer.1]:     partition=31 begin_offset=347518
2017-03-08T19:49:31.429632+00:00 app[consumer.1]: whitelist:
2017-03-08T19:49:31.429633+00:00 app[consumer.1]:     partition=0 begin_offset=23083880
2017-03-08T19:49:31.429633+00:00 app[consumer.1]:     partition=1 begin_offset=2103280
2017-03-08T19:49:31.429634+00:00 app[consumer.1]:     partition=2 begin_offset=2132920
2017-03-08T19:49:31.429634+00:00 app[consumer.1]:     partition=3 begin_offset=2097410
2017-03-08T19:49:31.429635+00:00 app[consumer.1]:     partition=4 begin_offset=2094067
2017-03-08T19:49:31.429636+00:00 app[consumer.1]:     partition=5 begin_offset=2122689
2017-03-08T19:49:31.429636+00:00 app[consumer.1]:     partition=6 begin_offset=2102436
2017-03-08T19:49:31.429637+00:00 app[consumer.1]:     partition=7 begin_offset=2127751
2017-03-08T19:49:31.429637+00:00 app[consumer.1]:     partition=8 begin_offset=2095652
2017-03-08T19:49:31.429638+00:00 app[consumer.1]:     partition=9 begin_offset=2081194
2017-03-08T19:49:31.429639+00:00 app[consumer.1]:     partition=10 begin_offset=2100417
2017-03-08T19:49:31.429639+00:00 app[consumer.1]:     partition=11 begin_offset=2090914
2017-03-08T19:49:31.429640+00:00 app[consumer.1]:     partition=12 begin_offset=2089227
2017-03-08T19:49:31.429640+00:00 app[consumer.1]:     partition=13 begin_offset=2083039
2017-03-08T19:49:31.429641+00:00 app[consumer.1]:     partition=14 begin_offset=2091469
2017-03-08T19:49:31.429642+00:00 app[consumer.1]:     partition=15 begin_offset=2109835
2017-03-08T19:49:31.429642+00:00 app[consumer.1]:     partition=16 begin_offset=2080804
2017-03-08T19:49:31.429643+00:00 app[consumer.1]:     partition=17 begin_offset=2074752
2017-03-08T19:49:31.429643+00:00 app[consumer.1]:     partition=18 begin_offset=2106322
2017-03-08T19:49:31.429644+00:00 app[consumer.1]:     partition=19 begin_offset=2079642
2017-03-08T19:49:31.429645+00:00 app[consumer.1]:     partition=20 begin_offset=2082422
2017-03-08T19:49:31.429645+00:00 app[consumer.1]:     partition=21 begin_offset=2076804
2017-03-08T19:49:31.429646+00:00 app[consumer.1]:     partition=22 begin_offset=2081602
2017-03-08T19:49:31.429646+00:00 app[consumer.1]:     partition=23 begin_offset=2076934
2017-03-08T19:49:31.429647+00:00 app[consumer.1]:     partition=24 begin_offset=2079587
2017-03-08T19:49:31.429648+00:00 app[consumer.1]:     partition=25 begin_offset=2060178
2017-03-08T19:49:31.429648+00:00 app[consumer.1]:     partition=26 begin_offset=2078634
2017-03-08T19:49:31.429649+00:00 app[consumer.1]:     partition=27 begin_offset=2075673
2017-03-08T19:49:31.429650+00:00 app[consumer.1]:     partition=28 begin_offset=2070900
2017-03-08T19:49:31.429650+00:00 app[consumer.1]:     partition=29 begin_offset=2062593
2017-03-08T19:49:31.429651+00:00 app[consumer.1]:     partition=30 begin_offset=2067980
2017-03-08T19:49:31.429652+00:00 app[consumer.1]:     partition=31 begin_offset=2061661
2017-03-08T19:49:31.548433+00:00 app[consumer.1]: client :index connected to ec2-34-195-140-72.compute-1.amazonaws.com:9096
2017-03-08T19:49:31.548436+00:00 app[consumer.1]: 
2017-03-08T19:49:31.709082+00:00 app[consumer.1]: client :index connected to ec2-34-195-140-195.compute-1.amazonaws.com:9096
2017-03-08T19:49:31.709097+00:00 app[consumer.1]: 
2017-03-08T19:49:32.004626+00:00 app[consumer.1]: client :index connected to ec2-34-195-135-147.compute-1.amazonaws.com:9096
2017-03-08T19:49:32.004639+00:00 app[consumer.1]: 

Producer: topics key required in configuration

While working with Kaffe on a feature at work, I became very curious why the topics key was required for a producer. Given that Kaffe does not support dynamically creating Kafka topics, I began to wonder why it would be required at all.

Looking at the source I found this: https://github.com/spreedly/kaffe/blob/master/lib/kaffe/producer.ex#L76

Why is topics a list, and not a topic string?

I wondered what would happen if I just left it out completely (since I was not using produce_sync/2), and I got an error:

** (KeyError) key :topics not found in: [...]

Setting the topics key to [] seems to at least allow the app to boot, but it feels a bit odd.

Given that a producer can produce to any topic, it makes a bit more sense to me that those topics should be maintained in user-land code and passed into all produce_sync/n functions.

The reason I created this issue is to see if the maintainers might consider removing this requirement and produce_sync/2 all together? I see little value in Kaffe implicitly choosing the first in a list of configured topics.

Am I missing something else that this topics list does?

Allow a callback function for partition selection

Instead of being restricted to the built-in :round_robin and :random partition selection strategies, allow clients to supply a function to be called to determine the partition. e.g. a hashing function to always assign the same partition for some aspect of the value.

Investigate consumer group behavior

When @objectuser spun up multiple dynos in the same consumer group we saw that offsets were not advancing forward as expected. In fact it appeared offsets were resetting over each other.

I suspected that consumer groups required more explicit coordination on our side vs kafka but that turns out to not be the case. This issue summarizes my results.

[Question] dynamically registering new topics

Hi, I haven't found in the doc and at a first glance of the code if it was possible and how easy, to register to new topics dynamically, without having to stop the app, change the config file and restart. I'd like to be able to instruct an application to now consume an additional given topic. Also, be able to stop consuming a topic as well.

Thanks for your help

Worker should compute offset to ack

Some consumers have lost messages. This might be due to the consumer. However, it would be easier to reason about the offset to ack if the worker computed it and then sent it on to the subscriber instead of the subscriber storing the information in process state.

This would probably mean removing the computation from here and moving it before this line.

_group_config = [] ?

Today I noticed that my offset_commit_interval_seconds setting wasn't picked up. And did some digging and found that in group_member.ex:46 it says

{:ok, pid} = group_coordinator().start_link(subscriber_name, consumer_group,
      [topic], _group_config = [], __MODULE__, self())

Why is _group_config = [] there? That's the reason offset_commit_interval_seconds isn't propagated to brod. Why isn't it taking the group config from kaffe's configuration?

Repeated rebalance cycle with kafka broker 2.3.0

After we upgraded our kafka brokers from 1.0.1 to 2.0.3 the Kaffe consumer started to misbehave. It is trapped in a rebalance cycle for hours while no message is consumed. Then it recovers and process all messages then after a random interval it starts the rebalance cycle again for hours again.
This happens only if the consumer consume from multiple topic.
Our Kaffe version is {:kaffe, "~> 1.14.1"},
The config:

config :kaffe,
  consumer: [
    endpoints: [
      {"localhost", 9092}
    ],
    worker_allocation_strategy: :worker_per_topic_partition,
    offset_reset_policy: :reset_to_earliest,
    topics: [
      "topic-1",
      "topic-2",
      "topic-3"
    ],
    consumer_group: "test-consumer",
    message_handler: MyKafkaConsumer,
    async_message_ack: false,
    start_with_earliest_message: true
  ]

We start kaffe inside a supervisor with

defp child_spec do
      [
        supervisor(Kaffe.GroupMemberSupervisor, [])
      ]
    end
  end

To reproduce start a kafka broker with version 2.3.0 and restart the application. It begins the rebalance cycle with ~60% chance. If it doesn't then restart it again until it the issue occurs.

The logs of the rebalance cycle:

11:12:56.271 [info] event#startup=Elixir.Kaffe.WorkerSupervisor subscriber_name=test_consumer
11:12:56.279 [info] event#starting=Elixir.Kaffe.WorkerManager subscriber_name=test_consumer supervisor=#PID<0.1061.0>
11:12:56.279 [debug] Starting group member for topic: topic-1
Interactive Elixir (1.6.4) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> 11:12:56.368 [info] event#init=Elixir.Kaffe.GroupMember
       group_coordinator=#PID<0.1071.0>
       subscriber_name=test_consumer
       consumer_group=test_consumer
11:12:56.368 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-1
11:12:56.368 [debug] Starting group member for topic: topic-2
11:12:56.370 [info] event#init=Elixir.Kaffe.GroupMember
       group_coordinator=#PID<0.1077.0>
       subscriber_name=test_consumer
       consumer_group=test_consumer
11:12:56.370 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-2
11:12:56.370 [debug] Starting group member for topic: topic-3
11:12:56.372 [info] event#init=Elixir.Kaffe.GroupMember
       group_coordinator=#PID<0.1083.0>
       subscriber_name=test_consumer
       consumer_group=test_consumer
11:12:56.372 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-3
11:12:56.384 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=14):
elected=false
11:12:56.385 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=14):
elected=true
11:12:56.385 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=15):
elected=true
11:12:56.387 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-2
11:12:56.387 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-1
11:12:56.387 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=14):
failed to join group
reason: :unknown_member_id
11:12:56.387 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=14):
re-joining group, reason::unknown_member_id
11:12:56.387 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=14):
failed to join group
reason: :unknown_member_id
11:12:56.387 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=14):
re-joining group, reason::unknown_member_id
11:12:56.390 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-3 generation_id=15
11:12:56.390 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=15):
assignments received:
  topic-3:
    partition=0 begin_offset=undefined
11:12:56.392 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=16):
elected=true
11:12:56.395 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=17):
elected=true
11:12:56.396 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-2 generation_id=16
11:12:56.396 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=16):
assignments received:
  topic-2:
    partition=0 begin_offset=undefined
11:12:56.398 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-1 generation_id=17
11:12:56.399 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=17):
assignments received:
  topic-1:
    partition=0 begin_offset=undefined
11:13:01.374 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-2
11:13:01.374 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=16):
re-joining group, reason::unknown_member_id
11:13:01.375 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-3
11:13:01.375 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=15):
re-joining group, reason::unknown_member_id
11:13:01.379 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=18):
elected=true
11:13:01.382 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-2
11:13:01.382 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=18):
failed to join group
reason: :rebalance_in_progress
11:13:01.382 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=19):
elected=true
11:13:01.382 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=18):
re-joining group, reason::rebalance_in_progress
11:13:01.386 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=18):
failed to join group
reason: :unknown_member_id
11:13:01.387 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-3 generation_id=19
11:13:01.387 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=19):
assignments received:
  topic-3:
    partition=0 begin_offset=undefined
11:13:02.387 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-2
11:13:02.387 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=18):
re-joining group, reason::unknown_member_id
11:13:02.396 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=20):
elected=true
11:13:02.400 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-2 generation_id=20
11:13:02.400 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=20):
assignments received:
  topic-2:
    partition=0 begin_offset=undefined
11:13:06.372 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-1
11:13:06.372 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=17):
re-joining group, reason::unknown_member_id
11:13:06.377 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-3
11:13:06.377 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=19):
re-joining group, reason::unknown_member_id
11:13:06.381 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=21):
elected=true
11:13:06.383 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=22):
elected=true
11:13:06.384 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=21):
failed to join group
reason: :unknown_member_id
11:13:06.384 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-1
11:13:06.384 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=21):
re-joining group, reason::unknown_member_id
11:13:06.387 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-3 generation_id=22
11:13:06.387 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=22):
assignments received:
  topic-3:
    partition=0 begin_offset=undefined
11:13:06.389 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=23):
elected=true
11:13:06.390 [debug] Discarding old generation 15 for current generation: 22
11:13:06.393 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-1 generation_id=23
11:13:06.393 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=23):
assignments received:
  topic-1:
    partition=0 begin_offset=undefined
11:13:06.396 [debug] Discarding old generation 16 for current generation: 20
11:13:06.399 [debug] Discarding old generation 17 for current generation: 23
11:13:11.376 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-2
11:13:11.376 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=20):
re-joining group, reason::unknown_member_id
11:13:11.377 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=22):
re-joining group, reason::unknown_member_id
11:13:11.377 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-3
11:13:11.384 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=24):
elected=true
11:13:11.384 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=24):
elected=false
11:13:11.388 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-2 generation_id=24
11:13:11.388 [debug] Discarding old generation 19 for current generation: 22
11:13:11.388 [info] Group member (test_consumer,coor=#PID<0.1077.0>,cb=#PID<0.1073.0>,generation=24):
assignments received:
  topic-2:
    partition=0 begin_offset=undefined
11:13:11.389 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-3 generation_id=24
11:13:11.389 [info] Group member (test_consumer,coor=#PID<0.1083.0>,cb=#PID<0.1079.0>,generation=24):
assignments received:
  topic-3:
    partition=0 begin_offset=undefined
11:13:12.401 [debug] Discarding old generation 20 for current generation: 24
11:13:16.374 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=23):
re-joining group, reason::unknown_member_id
11:13:16.374 [info] event#assignments_revoked=Elixir.Kaffe.GroupMember.test_consumer.topic-1
11:13:16.380 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=25):
elected=true
11:13:16.385 [info] event#assignments_received=Elixir.Kaffe.GroupMember.test_consumer.topic-1 generation_id=25
11:13:16.385 [info] Group member (test_consumer,coor=#PID<0.1071.0>,cb=#PID<0.1065.0>,generation=25):
assignments received:
  topic-1:
    partition=0 begin_offset=undefined
11:13:16.388 [debug] Discarding old generation 22 for current generation: 24
11:13:16.395 [debug] Discarding old generation 23 for current generation: 25

...and it continues for hours until somehow all group member receives the assignments for the same generation.

The logs on the broker:

kafka_1  | [2019-07-23 09:12:56,386] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 13 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1077.0>-625143d0-22a8-4a51-9d40-4eecbc66a1bb with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,387] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 14 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,388] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 14 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1083.0>-9db8b1f9-17b1-43d3-ab18-0f8daecff052 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,388] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 15 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,391] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 15 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,394] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 15 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1077.0>-c805a9d7-bb33-4b04-8ea3-8df99fcc49f6 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,395] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 16 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,397] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 16 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,398] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 16 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1071.0>-0d583d67-dcd2-4115-8d4d-8552b7c455e9 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,398] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 17 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:12:56,400] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 17 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:01,387] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 17 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1077.0>-ed514f3c-3fb7-4523-bb43-0eb67b5ee5c5 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:01,388] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 18 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:01,390] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 18 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1083.0>-cbc2b2df-0ade-430e-ba12-22a3205a04eb with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:01,391] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 19 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:01,394] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 19 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:02,404] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 19 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1077.0>-76c36c71-519f-45f8-953a-80eb352be715 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:02,405] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 20 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:02,408] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 20 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:06,394] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 20 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1071.0>-bf04c78a-3c5c-4259-8d37-c747e1e2b0aa with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:06,395] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 21 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:06,397] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 21 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1083.0>-25b81c73-7bde-4392-bb7e-d28ea2b3e7f3 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:06,398] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 22 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:06,400] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 22 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:06,403] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 22 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1071.0>-bea28729-3749-4428-be2b-1f3b96494333 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:06,404] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 23 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:06,406] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 23 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:11,403] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 23 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1077.0>-7fdb63ba-4fba-432d-9af3-89411c63d4c9 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:11,404] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 24 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:11,407] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 24 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:16,406] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 24 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1071.0>-1478c6db-6b99-4f75-9f47-5cf00500d913 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:16,407] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 25 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:16,409] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 25 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:21,415] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 25 (__consumer_offsets-7) (reason: Adding new member nonode@nohost/<0.1077.0>-ef0eb8e4-fabd-41bb-a55f-e1af641914ff with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:21,416] INFO [GroupCoordinator 0]: Stabilized group test_consumer generation 26 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:21,418] INFO [GroupCoordinator 0]: Assignment received from leader for group test_consumer for generation 26 (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:51,384] INFO [GroupCoordinator 0]: Member nonode@nohost/<0.1083.0>-b1810a29-4f1a-40a6-8528-3bf4db48ff0a in group test_consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka_1  | [2019-07-23 09:13:51,384] INFO [GroupCoordinator 0]: Preparing to rebalance group test_consumer in state PreparingRebalance with old generation 26 (__consumer_offsets-7) (reason: removing member nonode@nohost/<0.1083.0>-b1810a29-4f1a-40a6-8528-3bf4db48ff0a on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)

I think somehow the rebalance for each topic affects the consumers for the other topics and if different topics get the assignments in different generation they invalidate each other.
The issue was not present with the older 1.0.1 broker before.

I hope this description is enough for reproducing the issue.

Receives notification when rebalance in progress/assignments revoked

Hi,
We are dealing with some scaling issues where if we have another consumer coming up in the same consumer group, we would like to delete some old cache.
In order to do this, we would need to receive a notification when a rebalance is in progress and we'll also have to know what partitions each consumer gets. Would this be possible?

Thanks!

Allow runtime configuration of consumers.

This is similar to the proposal @scrogson made in #73. It would be very nice if we could pass configuration to the consumers when they start, like:

consumer_config =   [
    endpoints: [kafka: 9092], # that's [hostname: kafka_port]
    topics: ["interesting-topic"], # the topic(s) that will be consumed
    consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
    message_handler: MessageProcessor, # the module from Step 1 that will process messages

    # optional
    async_message_ack: false, # see "async message acknowledgement" below
    start_with_earliest_message: true # default false
  ]

children = [
  {Kaffe.Consumer, consumer_config},
]

This would play nicer with releases and support other use cases such as dynamically starting consumers. This is something that we occasionally need to do to read error topics or re-constitute some state.

Brod consumers terminating

Hi again!

I'm not sure if this is expected or regular, but I'm seeing a lot of this in my logs (with the single consumer, not group consumer or message batching)

[warn] :brod_consumer #PID<0.479.0> terminating, reason:
{{:badmatch, []}, [{:brod_consumer, :handle_fetch_response, 2, [file: 'src/brod_consumer.erl', line: 345]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}
[error] GenServer #PID<0.479.0> terminating
** (MatchError) no match of right hand side value: []
    (brod) src/brod_consumer.erl:345: :brod_consumer.handle_fetch_response/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:msg, #PID<0.454.0>, {:kpro_rsp, :fetch_response, 3, 6, [throttle_time_ms: 4894, responses: []]}}
State: {:state, #PID<0.420.0>, #PID<0.454.0>, "events", 4, 25972, 10000, 0, 1048576, 1000, 10, 6, #PID<0.419.0>, #Reference<0.3419544911.997457921.8525>, {:pending_acks, 0, 0, {[], []}}, false, :reset_by_subscriber, 1182.0585739109188, 1048576, 5, 102400}

Should this concern me? Obviously, it recovers and processes more messages, but it generally fails after some amount of time.

Handle already_subscribed_by

Kafka Copy had message a few times:

no match of right hand side value: {:error, {:already_subscribed_by, #PID<0.25161.1189>}}
lib/kaffe/group_member/subscriber/subscriber.ex:48 :in `init`
gen_server.erl:328 :in `init_it`
proc_lib.erl:247 :in `init_p_do_apply`

Handle OffsetOutOfRange error

When the offset expires or the topic is recreated:

ArgumentError: expected argument to be a literal atom, literal keyword or a :kafka_message_set record, got runtime: {:kafka_fetch_error, "topic", 1, :OffsetOutOfRange, "The requested offset is not within the range o...

Not pulling earliest messages from topic

In local development, using dev-services:

  • set a new consumer group
  • ensure that start_with_earliest_message: true

When the consumer is started, kaffe does not return the existing messages.

A work around in development has been to simply start my core seeder and use those messages.

Several handlers

Hi! I have several different independent Kafka clusters. Is it possible to configure several message handlers for each cluster independently in one application?

No Consuming

Hey,

I'm finding that a few of my services aren't actually consuming. Any ideas what would cause this?

I've removed tweaking the latest offset config for Kaffe to no avail.

Kaffe logs:

20:28:01.644 [info]  Group member (mindetic-space-service-2017-12-05_2027,coor=#PID<0.1575.0>,cb=#PID<0.1572.0>,generation=2):
assignments received:
  events:
    partition=0 begin_offset=-1

20:37:55.658 [info]  client :kaffe_producer_client metadata socket down kafka-kafka.mindetic.svc.cluster.local:9092
Reason:{:shutdown, :tcp_closed}

20:38:01.637 [info]  client :"mindetic-space-service-2017-12-05_2027" metadata socket down kafka-kafka.mindetic.svc.cluster.local:9092
Reason:{:shutdown, :tcp_closed}

20:47:18.233 [info]  Group member (mindetic-space-service-2017-12-05_2027,coor=#PID<0.1575.0>,cb=#PID<0.1572.0>,generation=2):
re-joining group, reason::RebalanceInProgress

20:47:18.255 [info]  Group member (mindetic-space-service-2017-12-05_2027,coor=#PID<0.1575.0>,cb=#PID<0.1572.0>,generation=3):
elected=true

20:47:18.258 [info]  Group member (mindetic-space-service-2017-12-05_2027,coor=#PID<0.1575.0>,cb=#PID<0.1572.0>,generation=3):
assignments received:
  events:
    partition=0 begin_offset=-1

I found the following in the Kafka logs:

[2017-12-05 20:27:55,669] INFO [GroupCoordinator 1]: Preparing to rebalance group mindetic-space-service-2017-12-05_2027 with old generation 0 (__consumer_offsets-19) (kafka.coordinator.group.GroupCoordinator)
[2017-12-05 20:27:55,679] INFO [GroupCoordinator 1]: Stabilized group mindetic-space-service-2017-12-05_2027 generation 1 (__consumer_offsets-19) (kafka.coordinator.group.GroupCoordinator)
[2017-12-05 20:27:55,688] INFO [GroupCoordinator 1]: Assignment received from leader for group mindetic-space-service-2017-12-05_2027 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2017-12-05 20:27:55,689] INFO Updated PartitionLeaderEpoch. New: {epoch:25, offset:0}, Current: {epoch:-1, offset-1} for Partition: __consumer_offsets-19. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-12-05 20:27:59,982] INFO [GroupCoordinator 1]: Preparing to rebalance group mindetic-space-service-2017-12-05_2027 with old generation 1 (__consumer_offsets-19) (kafka.coordinator.group.GroupCoordinator)
[2017-12-05 20:28:01,633] INFO [GroupCoordinator 1]: Stabilized group mindetic-space-service-2017-12-05_2027 generation 2 (__consumer_offsets-19) (kafka.coordinator.group.GroupCoordinator)
[2017-12-05 20:28:01,634] INFO [GroupCoordinator 1]: Assignment received from leader for group mindetic-space-service-2017-12-05_2027 for generation 2 (kafka.coordinator.group.GroupCoordinator)
[2017-12-05 20:30:51,742] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2017-12-05 20:40:51,742] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2017-12-05 20:47:16,880] INFO [GroupCoordinator 1]: Preparing to rebalance group mindetic-space-service-2017-12-05_2027 with old generation 2 (__consumer_offsets-19) (kafka.coordinator.group.GroupCoordinator)
[2017-12-05 20:47:18,254] INFO [GroupCoordinator 1]: Stabilized group mindetic-space-service-2017-12-05_2027 generation 3 (__consumer_offsets-19) (kafka.coordinator.group.GroupCoordinator)
[2017-12-05 20:47:18,256] INFO [GroupCoordinator 1]: Assignment received from leader for group mindetic-space-service-2017-12-05_2027 for generation 3 (kafka.coordinator.group.GroupCoordinator)

Clients need to specify partitions when producing

Right now the "Producer" module is very thinly wrapping the brod functions. Instead of making clients handle the low level details Kaffe should start a producer worker that analyzes the given topic and determines the number of partitions. Using that it should round-robin between the partitions when called to produce.

In the future we may want to specify partition selection strategies. So keep that in mind. Also allow for clients to specify specific partitions if they really need to.

[Question] Runtime configuration

Hi,
Thank you for a great library.
Is it possible to run Kaffe with Kafka endpoints set at runtime and not att compile time?

I couldn't find any documentation on this?

Something like below (which does not compile)

config :kaffe,
  producer: [
    endpoints: [{:system, "KAFKA_SERVER", "defaultname"}, 9092], # [hostname: port]
    topics: ["topicname"],
    # optional
    partition_strategy: :md5
  ]

Batch produce to a partition

There's currently no way to batch produce to a specific partition, something like:

  • topic/partition/message_list
Kaffe.Producer.produce_sync("topic", 2, [{"key1", "value1"}, {"key2", "value2"}])

I can work on this and send a PR, is this something that you would merge back into kaffe? Or is there a reason to not having this feature?

Thanks

Handle no_available_offsets

Received a bad match:

no match of right hand side value: {:error, {{:badmatch, {:error, :no_available_offsets}}, [{Kaffe.Subscriber, :init, 1, [file: 'lib/kaffe/group_member/subscriber/subscriber.ex', line: 50]}, {:gen_server, :in...

Need to find out what it means and what to do about it.

kaffe does not compile on Windows 10

I created a new project, added 'kaffe' to the dependencies, did a mix deps.get, but when I run mix compile, I get the following output:

> mix compile
===> Compiling supervisor3
'make' is not recognized as an internal or external command,
operable program or batch file.
===> Hook for compile failed!

** (Mix) Could not compile dependency :supervisor3, "escript.exe "c:/Users/USERNAME/.mix/rebar3" bare compile --paths "c:/dev/elixir/sim/_build/dev/lib/*/ebin"" command failed. You can recompile this dependency with "mix deps.compile supervisor3", update it with "mix deps.update supervisor3" or clean it with "mix deps.clean supervisor3"

Do I need to run cygwin or something with a make environment to run kaffe, or am I doing something wrong?

Flow adapter for kaffe

I have a flow adapter for kaffe I use extensively and I'm thinking about open sourcing it. The adapter merges incoming kafka messages to a given Stream.resource/3 that can then be processed, partitioned, windowed, etc. using flow (or other libraries).

My question is this - is this something Kaffe would be interested in having as part of its offering or should I offer it separately. If separate, would you be ok with a name like Kaffe.Stream or would you like something without Kaffe in the name?

Serialisers

I seem to be implementing the same functions in most of my services, that is:

def handle_message(message = %{}) do
  message[:value]
  |> Poison.decode!(as: %Event{})
  |> handle_message!()
end

I was thinking I would take a stab at implementing the following, should you consider it applicable to generic-use and would accept the PR:

config :kaffe, serialiser: CustomSerialiser # Must implement Kaffe.SerialiserBehaviour?

Then, when configured, would deserialise the message and then call handle_message/2

This would allow serialisers for protobufs, thrift, messagepack, etc.

Support per-topic offsets

There is currently a start_with_earliest_message option, but you can't specify it on a per-topic basis. Kaffe should allow per topic configurations vs. the current configuration approach. I'm thinking this will meaningfully change the config structure to really be explicit about the relationship between consumers, kafka instances, and topics. Something more like:

config :kaffe,
  consumers: [
    [
      endpoint: [kafka: 9092],
      consumer_group: "your-app-consumer-group",
      topics: [
        [
          topic: "topic1",
          mode: :sync,
          message_handler: MessageProcessor,
          offset: :earliest # or integer or :latest?
        ], [
          topic: "topic2",
          mode: :async,
          message_handler: MessageProcessor2,
          offset: :latest
        ]
      ]
    ],
    [
      endpoint: [kafka: 8092],
      # ....
    ]
  ]

Not sure if that's the perfect delineation, but it feels like we're trying to overload the config as is.

Kaffe doesn't resume at last offset on consumer restart(?)

I witnessed the following behavior on Heroku, which seems to imply that the last successfully acknowledged offset might be trailing from reality. Observe the following offset output before and after a re-deploy:

2017-02-03T18:25:15.799201+00:00 heroku[consumer.1]: count#kafkacat-consumer.messages.received.count=1 topic=whitelist source=whitelist offset=2236722 partition=0 key=VSURL1bFrPV45mDXndn7nQLUGcY
2017-02-03T18:25:15.799273+00:00 heroku[consumer.1]: Stopping all processes with SIGTERM
2017-02-03T18:25:15.847917+00:00 heroku[consumer.1]: Process exited with status 143
2017-02-03T18:25:15.463461+00:00 heroku[consumer.1]: Restarting
2017-02-03T18:25:15.464027+00:00 heroku[consumer.1]: State changed from up to starting
2017-02-03T18:25:17.731609+00:00 heroku[slug-compiler]: Slug compilation started
2017-02-03T18:25:17.731619+00:00 heroku[slug-compiler]: Slug compilation finished
2017-02-03T18:25:17.985935+00:00 heroku[consumer.1]: Restarting
2017-02-03T18:25:21.324378+00:00 heroku[consumer.1]: Starting process with command `mix run --no-halt`
2017-02-03T18:25:21.996317+00:00 heroku[consumer.1]: State changed from starting to up
...
2017-02-03T18:25:26.087713+00:00 app[consumer.1]: 18:25:26.087 [info]  count#kafkacat-consumer.messages.received.count=1 topic=whitelist source=whitelist offset=2236610 partition=0 key=JIvYocCuHfVHPZpxJab0SE8NNex
2017-02-03T18:25:26.088097+00:00 app[consumer.1]: 18:25:26.087 [info]  count#kafkacat-consumer.messages.received.count=1 topic=whitelist source=whitelist offset=2236611 partition=0 key=VC6mAFv1qouc3cGjtMDe7ocd0zK

The offset goes from 2236722 before the restart back to 2236611 afterwards - essentially regressing.

Now - the log stream is not guaranteed to be sequential so it's (quite) possible it gets interleaved across dynos. I just wanted to put this on the radar for further confirmation/investigation. E.g. just because the sequence seems off in the output I captured, doesn't mean 2236722 was processed more than once.

@spreedly/systems-dev

handle_messages sometimes receives nil as argument

Hello there! First of all, congratulations on your work with this library and thanks a lot for it. We have been using it at DeepX with quite a degree of success this far, and it has been really helpful.

This issue started happening after 1.4.0, and we believe it to be related to the upgrade to brod 3.0.

Sometimes when Kaffe calls our implementation of handle_messages for batch message processing, it gives nil as argument, which gives us a crash when we try to use the given argument as an enumerator, such as below:

  def handle_messages(messages) do
    for message <- messages do
      message.value
      |> Poison.decode()
      |> process_message(message.topic)
    end
    :ok
  end

Could you clarify whether this is a bug or an intended case of this function?
Thank you very much!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.