Giter VIP home page Giter VIP logo

kafka's Introduction

kafka

Kafka libraries, tools and example applications built on top of the sarama package.

Libraries

  • consumergroup: Distributed Kafka consumer, backed by Zookeeper, supporting load balancing and offset persistence, as defined by the Kafka documentation. API documentation can be found on godoc.org

Tools

The following tools can be useful for discovery, testing, and benchmarking. They also serve as examples of how to use Sarama.

  • tools/stressproducer: A tool to stress test the producer to measure throughput and latency.
  • tools/consoleconsumer: A tool to consume a kafka topic and write messages to STDOUT.

Examples

  • examples/consumergroup: An example consumer application that uses the consumergroup library mentioned above.

kafka's People

Contributors

aaronkavlie-wf avatar bhirbec avatar bouk avatar burke avatar darinkrauss avatar davidreynolds avatar eapache avatar fayizk1 avatar jamesowenhall avatar jiminoc avatar nemosupremo avatar ontarionick avatar rogerclermont avatar rrh avatar shivnagarajan avatar warebot avatar wvanbergen avatar yejingx avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka's Issues

Failure to use select instead of range

Hello,

I'm trying to write small consumer group application using several workers.
Almost everything is working correctly, but when in supplied example (consumergroup.go) I try to change logic from:

    for message := range consumer.Messages() {
        if offsets[message.Topic] == nil {
            offsets[message.Topic] = make(map[int32]int64)
        }

to:

    for {
        select {
        case message := <-consumer.Messages():
            if offsets[message.Topic] == nil {
                offsets[message.Topic] = make(map[int32]int64)
            }

When I try to stop worker with ^C, I'm getting nil pointer dereference:

[Sarama] 2015/08/16 16:59:58 [consumer_example.go/4932df2e5698] ip-rep-input :: Stopped topic consumer
[Sarama] 2015/08/16 16:59:58 [consumer_example.go/4932df2e5698] Deregistered consumer instance Greyhound.local:9b307c47-696a-4419-9b93-4932df2e5698.
[Sarama] 2015/08/16 16:59:58 Closing Client
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x2d9d]

goroutine 1 [running]:
main.main()
    /Users/eric/sync/Work/go/src/cybertonica.com/irb/cg/cg.go:75 +0xbcd

What am I doing wrong?

The full modified source code is:

package main

import (
    "flag"
    "log"
    "os"
    "os/signal"
    "strings"
    "time"

    "github.com/Shopify/sarama"
    "github.com/wvanbergen/kafka/consumergroup"
    "github.com/wvanbergen/kazoo-go"
)

const (
    defaultKafkaTopics   = "test_topic"
    defaultConsumerGroup = "consumer_example.go"
)

var (
    consumerGroup  = flag.String("group", defaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing")
    kafkaTopicsCSV = flag.String("topics", defaultKafkaTopics, "The comma-separated list of topics to consume")
    zookeeper      = flag.String("zookeeper", "", "A comma-separated Zookeeper connection string (e.g. `zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181`)")

    zookeeperNodes []string
)

func init() {
    sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}

func main() {
    flag.Parse()

    if *zookeeper == "" {
        flag.PrintDefaults()
        os.Exit(1)
    }

    config := consumergroup.NewConfig()
    config.Offsets.Initial = sarama.OffsetNewest
    config.Offsets.ProcessingTimeout = 10 * time.Second

    zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)

    kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")

    consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
    if consumerErr != nil {
        log.Fatalln(consumerErr)
    }

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    go func() {
        <-c
        if err := consumer.Close(); err != nil {
            sarama.Logger.Println("Error closing the consumer", err)
        }
    }()

    go func() {
        for err := range consumer.Errors() {
            log.Println(err)
        }
    }()

    eventCount := 0
    offsets := make(map[string]map[int32]int64)

    for {
        select {
        case message := <-consumer.Messages():
            if offsets[message.Topic] == nil {
                offsets[message.Topic] = make(map[int32]int64)
            }

            eventCount++
            if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
                log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
            }

            // Simulate processing time
            time.Sleep(10 * time.Millisecond)

            offsets[message.Topic][message.Partition] = message.Offset
            consumer.CommitUpto(message)
        }
    }

    log.Printf("Processed %d events.", eventCount)
    log.Printf("%+v", offsets)
}

panic: invalid memory address or nil pointer dereference

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x48 pc=0x5472e0]

goroutine 167 [running]:
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).partitionConsumer(0xc8201745a0, 0xc82013b180, 0x11, 0x0, 0xc8204e0420, 0xc8204e0480, 0xc8206a4b50, 0xc8206709c0)
        /go/src/xxxx/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:435 +0x1580
created by github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).topicConsumer
        /go/src/xxxx/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:341 +0xbd6

        {
            "ImportPath": "github.com/wvanbergen/kafka/consumergroup",
            "Rev": "1ff806bb203e104e04d49189220d267d37e46758"
        },
        {
            "ImportPath": "github.com/Shopify/sarama",
            "Comment": "v1.8.0-49-g89bd629",
            "Rev": "89bd629b50e1a69c40d40ded1c82a767b4859442"
        },

kafka 0.9.0.0

The process panic every 10 minutes.

Commit when CG closed

It would be nice if zookeeperOffsetManager.commitOffsets() got called from ConsumerGroup.Close().
Right now stuff that is 'markedForCommit' after the last commit and before CG close is dropped.
Maybe also provide a Commit() method in the OffsetManager interface to be able to force commits from CG/clients.

Kafka consumer not restarting after broker failure

We had an issue with connection from our consumers to kafka brokers. The issue lasted for couple of hours. The log from this time is below:

2017-11-11 00:14:13 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:14:17 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:21 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:14:25 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:14:29 UTC | ERROR: kafka.go:518: Kafka consumer error: zk: could not connect to a server
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/0 :: Stopping partition consumer at offset 38214590710
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/4 :: Stopping partition consumer at offset 4394485987
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/2 :: Stopping partition consumer at offset 4397716347
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/1 :: Stopping partition consumer at offset 38266406728
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/3 :: Stopping partition consumer at offset 4380970705
2017-11-11 00:14:30 UTC | consumer/broker/8 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:47780->10.1.3.144:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka8:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/0: read tcp 10.128.8.79:47780->10.1.3.144:9092: i/o timeout
2017-11-11 00:14:30 UTC | consumer/broker/2 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:44170->10.1.3.236:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka2:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/3: read tcp 10.128.8.79:44170->10.1.3.236:9092: i/o timeout
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/1: read tcp 10.128.8.79:44170->10.1.3.236:9092: i/o timeout
2017-11-11 00:14:30 UTC | consumer/broker/1 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:37892->10.1.3.235:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka1:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/2: read tcp 10.128.8.79:37892->10.1.3.235:9092: i/o timeout
2017-11-11 00:14:30 UTC | consumer/broker/3 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:48368->10.1.3.195:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka3:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/4: read tcp 10.128.8.79:48368->10.1.3.195:9092: i/o timeout
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/0 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/1 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/3 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/2 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/4 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/3 :: FAILED to commit offset 4380970705 to Zookeeper. Last committed offset: 4380970443
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/0 :: FAILED to commit offset 38214590710 to Zookeeper. Last committed offset: 38214590466
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/4 :: FAILED to commit offset 4394485987 to Zookeeper. Last committed offset: 4394485910
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/1 :: FAILED to commit offset 38266406728 to Zookeeper. Last committed offset: 38266406473
2017-11-11 00:14:37 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:14:37 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:37 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/2 :: FAILED to commit offset 4397716347 to Zookeeper. Last committed offset: 4397716273
2017-11-11 00:14:41 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:14:45 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:14:49 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:14:53 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:57 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:15:01 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | Closed connection to broker kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:05 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:15:09 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:15:13 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:15:17 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:15:21 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:15:25 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:15:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:15:32 UTC | Failed to connect to broker kafka3:9092: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
.
.
. *trying to commit offset and failure to fetch metadata continues...*
.
.
2017-11-11 01:39:02 UTC | Failed to connect to broker kafka7:9092: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka1:9092
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/brokers deregistered broker #-1 at kafka7:9092
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka1:9092
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/brokers deregistered broker #-1 at kafka7:9092
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka1:9092
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/brokers deregistered broker #-1 at kafka7:9092
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for all topics from broker kafka1:9092
2017-11-11 01:39:06 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 01:39:10 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 01:39:14 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 01:39:18 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 01:39:22 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 01:39:26 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 01:39:30 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 01:39:32 UTC | Failed to connect to broker kafka1:9092: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka2:9092
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/brokers deregistered broker #-1 at kafka1:9092
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka2:9092
2017-11-11 01:39:32 UTC | client/brokers deregistered broker #-1 at kafka1:9092
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka2:9092
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/brokers deregistered broker #-1 at kafka1:9092
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for all topics from broker kafka2:9092
2017-11-11 01:40:02 UTC | Failed to connect to broker kafka2:9092: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka0:9092
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/brokers deregistered broker #-1 at kafka2:9092
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka0:9092
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/brokers deregistered broker #-1 at kafka2:9092
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka0:9092
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/brokers deregistered broker #-1 at kafka2:9092
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 01:40:32 UTC | Failed to connect to broker kafka0:9092: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka9:9092
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/brokers deregistered broker #-1 at kafka0:9092
2017-11-11 01:40:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka9:9092
2017-11-11 01:40:32 UTC | client/brokers deregistered broker #-1 at kafka0:9092
2017-11-11 01:40:32 UTC | client/metadata fetching metadata for all topics from broker kafka9:9092
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/brokers deregistered broker #-1 at kafka0:9092
.
.
. *no failure messages to commit offset anymore. but the brokers are still away*
.
.
2017-11-11 04:55:08 UTC | client/metadata retrying after 250ms... (2 attempts remaining)
2017-11-11 04:55:09 UTC | client/metadata fetching metadata for all topics from broker kafka4:9092
2017-11-11 04:55:39 UTC | Failed to connect to broker kafka4:9092: dial tcp 10.1.3.196:9092: i/o timeout
2017-11-11 04:55:39 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.196:9092: i/o timeout
2017-11-11 04:55:39 UTC | client/metadata fetching metadata for all topics from broker kafka3:9092
2017-11-11 04:56:09 UTC | Failed to connect to broker kafka3:9092: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 04:56:09 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 04:56:09 UTC | client/metadata fetching metadata for all topics from broker kafka10:9092
2017-11-11 04:56:39 UTC | Failed to connect to broker kafka10:9092: dial tcp 10.1.3.146:9092: i/o timeout
2017-11-11 04:56:39 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.146:9092: i/o timeout
2017-11-11 04:56:39 UTC | client/metadata fetching metadata for all topics from broker kafka7:9092
2017-11-11 04:57:09 UTC | Failed to connect to broker kafka7:9092: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 04:57:09 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 04:57:09 UTC | client/metadata fetching metadata for all topics from broker kafka1:9092
2017-11-11 04:57:39 UTC | Failed to connect to broker kafka1:9092: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 04:57:39 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 04:57:39 UTC | client/metadata fetching metadata for all topics from broker kafka2:9092
2017-11-11 04:58:09 UTC | Failed to connect to broker kafka2:9092: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 04:58:09 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 04:58:09 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 04:58:09 UTC | Connected to broker at kafka0:9092 (unregistered)
2017-11-11 04:58:09 UTC | client/brokers registered new broker #0 at kafka0:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #10 at kafka10:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #1 at kafka1:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #9 at kafka9:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #2 at kafka2:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #7 at kafka7:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #3 at kafka3:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #8 at kafka8:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #4 at kafka4:9092
2017-11-11 04:58:09 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:52:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:52:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:52:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092

After registering brokers, we were still not consuming messages. Also, after a while even though the network problem persists, saving the offset is not tried anymore and there are no error messages. Which is a bit strange to me.
And when the brokers are registered, there is no partition rebalancing or anything like that in the logs.

The way I have implemented is:

func (cons *KafkaConsumer) restartGroupWithZooKeeper() {
	time.Sleep(cons.persistTimeout)
	cons.readFromGroupWithZookeeper()
}

func (cons *KafkaConsumer) readFromGroupWithZookeeper() {
	cons.AddWorker()

	restartWithNewClient := false
	defer func() {
		// close the existing group client otherwise
		// there will be a stale client who had partition but is not reading.
		if (cons.consumerGroupClient != nil) && (!cons.consumerGroupClient.Closed()) {
			cons.consumerGroupClient.Close()
		}
		cons.WorkerDone()

		if restartWithNewClient {
			// mutual recursive function. Will call this function again.
			// so it is important to close the existing one above
			cons.restartGroupWithZooKeeper()
		}
	}()

	client, err := kzconsumergroup.JoinConsumerGroup(cons.group, []string{cons.topic}, cons.zookeeper, cons.consumerGroupConfig)
	cons.consumerGroupClient = client
	if err != nil {
		restartWithNewClient = true
		Log.Error.Printf("Restarting kafka consumer %s:%s - %s", cons.topic, cons.group, err.Error())
		return
	}

	spin := shared.NewSpinner(shared.SpinPriorityLow)
	for !cons.consumerGroupClient.Closed() {
		cons.WaitOnFuse()
		select {
		case event := <-cons.consumerGroupClient.Messages():
			cons.processMessage(event)
			cons.consumerGroupClient.CommitUpto(event)
		case err := <-cons.consumerGroupClient.Errors():
			restartWithNewClient = true
			Log.Error.Print("Kafka consumer error: ", err)
			return
		default:
			spin.Yield()
		}
	}
}

I saw that there are multiple issues related to Sarama as well when it comes to connection issues. So, I wanted to post it here to get a bit of confirmation that this consumer is good before creating issue in sarama directly.

@wvanbergen if you have any ideas on this, I would be happy to create a PR to resolve this.

Panic Integer Divide by Zero when dividePartitionsBetweenConsumers

I have 1 consumer and what might be a finicky zookeeper connection which can cause a panic with a divide by zero.

Log output (AFAIK, zookeeper is up, else I'd have bigger problems, however this client is behind a VPN):

2015/07/03 22:32:29 Failed to connect to zoo1:2181: dial tcp 10.129.196.49:2181: i/o timeout
2015/07/03 22:32:31 Failed to connect to zoo2:2181: dial tcp 10.129.196.11:2181: i/o timeout
2015/07/03 22:32:32 read tcp 10.129.196.49:2181: i/o timeout
2015/07/03 22:32:33 Failed to connect to zoo2:2181: dial tcp 10.129.196.11:2181: i/o timeout
panic: runtime error: integer divide by zero
[signal 0x8 code=0x7 addr=0x36bafd pc=0x36bafd]

goroutine 544 [running]:
github.com/wvanbergen/kafka/consumergroup.dividePartitionsBetweenConsumers(0x1005fc8, 0x0, 0x0, 0xc208dd2200, 0x20, 0x20, 0x0)
    /Users/nimi/go/src/github.com/wvanbergen/kafka/consumergroup/utils.go:39 +0x16d
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).topicConsumer(0xc2081a8000, 0xafae10, 0xa, 0xc208162f60, 0xc208163320, 0xc208dfdce0)
    /Users/nimi/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:310 +0x7c3
created by github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).topicListConsumer
    /Users/nimi/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:260 +0x3f6

Which points to here

sort.Sort(partitions)
sort.Sort(consumers)

n := plen / clen
if plen%clen > 0 {
    n++
}

I'm assuming (haven't done any digging) that we see no consumers (because zookeeper madness?) and try to divide partitions among zero consumers. Given the ZooKeeper connection should eventually comeback, what should be the right course of action here? I think it would be ideal for the consumer to do no work until the connection with ZK stabilizes/comes back.

consumergroup not work in docker

I run Kafka in Docker use images: wurstmeister/kafka:0.9.0.1
docker-compose.yml looks like:

### zookeeper    #########################################
    zookeeper:
      image: wurstmeister/zookeeper
      ports:
        - "2181:2181"
      networks:
        - backend

### kafka    #########################################
    kafka:
      image: wurstmeister/kafka:0.9.0.1
      ports:
        - "9092:9092"
      environment:
        KAFKA_ADVERTISED_PORT: 9092
        KAFKA_ADVERTISED_HOST_NAME: "172.23.0.8"
        KAFKA_CREATE_TOPICS: "KtRoomMessage:1:1,KtRoomMergeMessage:1:1,KtRoomDelMessage:1:1,KtRoomEditMessage:1:1,KtMessageFeed:1:1,KafkaPushsTopic:1:1"
        KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
        KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      # volumes:
        # - ../database/kafka:/tmp/kafka-logs
      #   - ../database/zookeeper:/tmp/zookeeper
      # volumes:
      #   - /var/run/docker.sock:/var/run/docker.sock
      depends_on:
        - zookeeper
      networks:
        backend:
          ipv4_address: 172.23.0.8
### Networks Setup ############################################

networks:
  frontend:
    driver: "bridge"
  backend:
    driver: "bridge"
    ipam:
      config:
      - subnet: 172.23.0.0/24

it works fine when use kafka-console-consumer.sh

➜  docker git:(master) ✗ docker-compose exec kafka bash
bash-4.3# cd /opt/kafka_2.11-0.9.0.1/
bash-4.3# ./bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic KtRoomMessage
{"id":1395866053756864,"room_id":1371649991298042,"uid":1800329207,"username":"","room_type":"vip","message_type":"normal","content":"sdf","image_urls":"","date":"2017-04-11","status":1,"ip":"172.23.0.14","created_at":"2017-04-11 16:30:30","updated_at":"2017-04-11 16:30:30","subRoomId":1371649991298042}

and works fine use github.com/wvanbergen/kafka/tools/consoleconsumer

root@101fe7f7fb4d:/go/src/github.com/wvanbergen/kafka/tools/consoleconsumer# ./consoleconsumer -brokers kafka:9092 -topic KtRoomMessage
Offset: 0
Key:
Value:  {"id":1395864139704472,"room_id":1371649991298042,"uid":1800329207,"username":"","room_type":"vip","message_type":"normal","content":"sdf","image_urls":"","date":"2017-04-11","status":1,"ip":"172.23.0.14","created_at":"2017-04-11 15:58:36","updated_at":"2017-04-11 15:58:36","subRoomId":1371649991298042}

but not work use consumergroup

root@101fe7f7fb4d:/go/src/github.com/wvanbergen/kafka/tools/consoleconsumer# consumergroup -zookeeper zookeeper:2181 -topics KtRoomMessage -gro dada
2017/04/11 16:33:09 Connected to 172.23.0.4:2181
2017/04/11 16:33:09 Authenticated: id=97772968327577636, timeout=4000
2017/04/11 16:33:09 Re-submitting `0` credentials after reconnect
[Sarama] 2017/04/11 16:33:09 Initializing new client
[Sarama] 2017/04/11 16:33:09 client/metadata fetching metadata for all topics from broker 172.23.0.8:9092
[Sarama] 2017/04/11 16:33:09 Connected to broker at 172.23.0.8:9092 (unregistered)
[Sarama] 2017/04/11 16:33:09 client/brokers registered new broker #1001 at 172.23.0.8:9092
[Sarama] 2017/04/11 16:33:09 Successfully initialized new client
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] Consumer instance registered (101fe7f7fb4d:1ef0f883-7b41-4f63-a970-fcad8b192b4e).
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] Currently registered consumers: 1
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] KtRoomMessage :: Started topic consumer
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] KtRoomMessage :: Claiming 1 of 1 partitions
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] KtRoomMessage/0 :: Partition consumer starting at offset 8.
[Sarama] 2017/04/11 16:33:09 Connected to broker at 172.23.0.8:9092 (registered as #1001)
[Sarama] 2017/04/11 16:33:09 consumer/broker/1001 added subscription to KtRoomMessage/0

consumergroup can not receive any message when i push message to Topic KtRoomMessage

Clarify Kafka version support

As of Kafka 0.9, users are encouraged to use Kafka broker addresses for auto-discovery, rather than the ZooKeeper connection details that the kafka.NewConsumerGroup() constructor expects.

Will wvanbergen/kafka support a more modern client constructor?

Which versions of Kafka has wvanbergen been tested against?

Group doesn't recover after internet connectivity issue

Hi,

When the internet connection is off for sometime and up again then the consumers do not consume incoming messages. The caller isn't notified of this and no action can be taken.

You can reproduce this behavior the following way:

  • start a consumer group on your local machine
  • turn off your wifi
  • turn on your wifi
  • send some messages to your topic and you'll see that they are not consumed

Here's my log right after I turned on my wifi:

2016-05-31T23:42:22Z Unstructured Log Line,file=structs.go:21,text= Failed to connect to 10.101.206.42:2181: dial tcp 10.101.206.42:2181: connect: network is unreachable
2016-05-31T23:42:23Z Unstructured Log Line,text= Connected to 10.102.206.12:2181,file=structs.go:21
2016-05-31T23:42:23Z Unstructured Log Line,file=structs.go:21,text= Authentication failed: zk: session has been expired by the server
2016-05-31T23:42:23Z [My-Group/80b66df0c462] Triggering rebalance due to consumer list change
2016-05-31T23:42:23Z [My-Group/80b66df0c462] mytopic/0 :: Stopping partition consumer at offset -1
2016-05-31T23:42:23Z [My-Group/80b66df0c462] mytopic/1 :: Stopping partition consumer at offset -1
2016-05-31T23:42:23Z [My-Group/80b66df0c462] mytopic/2 :: Stopping partition consumer at offset -1
2016-05-31T23:42:23Z consumer/broker/3 closed dead subscription to mytopic/0
2016-05-31T23:42:23Z consumer/broker/2 closed dead subscription to mytopic/2
2016-05-31T23:42:24Z consumer/broker/1 closed dead subscription to mytopic/1
2016-05-31T23:42:24Z [My-Group/80b66df0c462] mytopic :: Stopped topic consumer
2016-05-31T23:42:24Z [My-Group/80b66df0c462] Currently registered consumers: 0
2016-05-31T23:42:24Z [My-Group/80b66df0c462] mytopic :: Started topic consumer
2016-05-31T23:42:24Z [My-Group/80b66df0c462] mytopic :: Claiming 0 of 3 partitions
2016-05-31T23:42:24Z [My-Group/80b66df0c462] mytopic :: Stopped topic consumer

The znode at /someroot/consumers/My-Group/ids is cleared after the connection is lost. From Kafka protocol doc:

The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms.

When the group got the connection back then there're no registered consumers. It stops but does not report this to the caller.

Sending an error in the errors channel would be a possible fix. It would notify the caller who could shut down the group and restart it.

Any thoughts?

Plans for this repository

Hey!

Seeing that you are one of the biggest contributors to Shopify/sarama repository right now, should I expect this code to be merged at some point with sarama's main?

I'm using sarama at production and I'm in need to implement consumer groups.
I can probably even contribute to this repo a bit - just I would like to do it for repository having some perspectives for future support :).

I see there is another bsm repository, but for what I see in comments, You have no plans in getting this as a part of Sarama.

Also one more thing. Do you plan on getting possibility to pass already created consumers/partition consumers to join consumer group? This should make at least immediate states of upgrade from sarama and this lib, much easier.

Leader change during re-balance may cause partition not being consumed

Simplified it into the following scenario:
3 brokers. 1 topic with 4 partitions. 2 consumer instances to consume that topic. The start index of broker, partition and consumer is 0.

When c0 (consumer instance 0) calls utils.go#dividePartitionsBetweenConsumers(), the leaders are like:

  • p0 on b0
  • p1 on b1
  • p2 on b2
  • p3 on b0

After sort(by leader then by partition id), partitions is like

[
    {p0 b0 xxxx}  // {partition leader address}
    {p3 b0 xxxx}
    {p1 b1 xxxx}
    {p2 b2 xxxx}
] 

So c0 gets its myPartitions (to claim) like p0, p3.

Then p0 somehow change its leader to b2. The leaders are like:

  • p0 on b2
  • p1 on b1
  • p2 on b2
  • p3 on b0

And another consumer instance c1 calls utils.go#dividePartitionsBetweenConsumers().
After sort(by leader then by partition id), partitions is like

[
    {p3 b0 xxxx}
    {p1 b1 xxxx}
    {p0 b2 xxxx}
    {p2 b2 xxxx}
]

c1 gets its myPartitions (to claim) like p0, p2.

As a result, we have a condition that c0 tries to claim p0 and p3 while c1 tries to claim p0 and p2.

  • c0 and c1 both fight for p0 and c0 wins (as it tries to claim it firstly).
  • But no one tries to claim p1.

In utils.go, we sort the partitionLeader by leader firstly.

func (pls partitionLeaders) Less(i, j int) bool {
	return pls[i].leader < pls[j].leader || (pls[i].leader == pls[j].leader && pls[i].id < pls[j].id)
}

When leader changes between 2 calls of dividePartitionsBetweenConsumers(), the result after sort is changed. I believe the root cause it here.

Partition re-balance issue : 'FAILED to claim the partition'

When I start new process which joins same consumer group is failing with following message.

I think this is because If there are processes that are not released partition while new consumer is spawned.

[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] Currently registered consumers: 3
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs :: Started topic consumer
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs :: Claiming 7 of 20 partitions
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/0 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/9 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/6 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/15 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/3 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/18 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance

As workaround I have added a time wait here.

   func (cg *ConsumerGroup) topicListConsumer(topics []string) {
    for {
        select {
        case <-cg.stopper:
            return
        default:
        }

Chroot support?

In our use case for Kafka consumer groups, we need to store
Zoookeeper consumer state outside of the Zookeeper root, i.e.
in a Zookeeper "chroot". For example:

/kafka/consumers/%s/offsets/%s/%d

where here /kafka is the chroot. Unfortunately I don't think this is possible
currently. Would you be open to introducing support for this in this library?

The underlying Zookeeper library doesn't have Zookeeper chroot support,
but since we wrap the connection already I think we could at it at the
level of this library.

I would be happy to write the patch for this if you were interested.

I'm not positive what the API design should be, but one possibility is
adding a Chroot field to ConsumerGroupConfig and threading it
through the various Zookeeper calls. Open to other approaches
though!

Ref samuel/go-zookeeper#32.

Consumergroup stops consuming after ZK connection lost/timeout

I'm trying to debug an issue where it looks like

1.) The Zookeeper connection dies/timesout

2015/09/14 10:39:45 read tcp 10.129.196.49:2181: i/o timeout
2015/09/14 10:39:53 read tcp 10.129.196.11:2181: i/o timeout
2015/09/14 10:40:01 read tcp 10.129.196.55:2181: i/o timeout
2015/09/14 10:40:09 read tcp 10.129.196.49:2181: i/o timeout
2015/09/14 10:49:12 read tcp 10.129.196.11:2181: i/o timeout
2015/09/14 10:49:13 read tcp 10.129.196.55:2181: i/o timeout
2015/09/14 10:49:13 Failed to set previous watches: zk: connection closed
2015/09/14 10:49:14 read tcp 10.129.196.49:2181: i/o timeout
2015/09/14 10:49:14 Failed to set previous watches: zk: connection closed
2015/09/14 10:49:14 read tcp 10.129.196.11:2181: i/o timeout
2015/09/14 10:49:14 Failed to set previous watches: zk: connection closed

2.) A rebalance does not occur (on any of the other nodes) so the partitions are left without a consumer.

I'm not sure if this is an issue with ZK or consumer group. Also I'm using consumergroup from e236a65 with my PRs added (meaning I don't have any other fixes applied - I'm not sure if my problem is solved with those fixes).

The best practice of handling offsets of asynchronous consuming?

Hi, thanks for sharing the great library!
I have a case as the following, the consumer consumes the message asynchronously, so the offset may not be committed serially. What is the best practice of handling offsets to guarantee the offset committing correctly?

for {
        select {
        case m, ok := <- consumer.Messages():
            if ok {
                go func(m *sarama.ConsumerMessage) {
                    exportMessage := worker.Ingest(m)

                    // TODO: Commit the offsets
                }(m)
            }
        }
    }

nil message received causing NPE

The case messages <- message
here at https://github.com/wvanbergen/kafka/blob/master/consumergroup/consumer_group.go#L413
appears to succeed, but there are situations I see where message == nil. This cause a problem when message.Offset is accessed in the case arm. I have not tried to further characterize what makes message == nil.

I'm using go 1.5.1 on ubuntu 14. For my present set of tests, I'm also compiling and runing -race, although I have not run anything to rule in/rule out these situations

Incompatible with latest Sarama Client

The latest sarama client has separated it's consumers to have two channels for Messages and Errors rather than the one Events channel for both. The latest consumergroup code has not been upgraded to that interface. Any plans on updating?

Panic on "CommitUpto" after rebalance

Reproduction Steps:

  1. A worker starts a task
  2. A rebalance occurs
  3. The worker finished the task and commits

Stack dump:

[2015-09-18T20:21:24.991Z] [GEARD] [geard.go:50] [INFO] PANIC: runtime error: invalid memory address or nil pointer dereference
/usr/src/go/src/runtime/panic.go:387 (0x41a9a8)
    gopanic: reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
/usr/src/go/src/runtime/panic.go:42 (0x419cce)
    panicmem: panic(memoryError)
/usr/src/go/src/runtime/sigpanic_unix.go:26 (0x420734)
    sigpanic: panicmem()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:223 (0x759ef3)
    (*partitionOffsetTracker).markAsProcessed: pot.l.Lock()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:152 (0x75942c)
    (*zookeeperOffsetManager).MarkAsProcessed: return zom.offsets[topic][partition].markAsProcessed(offset)
/go/src/github.com/channelmeter/kafka/consumergroup/consumer_group.go:235 (0x755814)
    (*ConsumerGroup).CommitUpto: cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
/go/src/github.com/channelmeter/geard/harvester/kafka.go:83 (0x475b75)
    (*kafkaHarvester).Ack: return rq.consumer.CommitUpto(msg)
/go/src/github.com/channelmeter/geard/geard.go:410 (0x4063e2)
    func.012: harvestQueue.Ack(harvTask)
/usr/src/go/src/runtime/asm_amd64.s:401 (0x443aa5)
    call16: CALLFN(·call16, 16)
/usr/src/go/src/runtime/panic.go:387 (0x41a9a8)
    gopanic: reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
/usr/src/go/src/runtime/panic.go:42 (0x419cce)
    panicmem: panic(memoryError)
/usr/src/go/src/runtime/sigpanic_unix.go:26 (0x420734)
    sigpanic: panicmem()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:223 (0x759ef3)
    (*partitionOffsetTracker).markAsProcessed: pot.l.Lock()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:152 (0x75942c)
    (*zookeeperOffsetManager).MarkAsProcessed: return zom.offsets[topic][partition].markAsProcessed(offset)
/go/src/github.com/channelmeter/kafka/consumergroup/consumer_group.go:235 (0x755814)
    (*ConsumerGroup).CommitUpto: cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
/go/src/github.com/channelmeter/geard/harvester/kafka.go:83 (0x475b75)
    (*kafkaHarvester).Ack: return rq.consumer.CommitUpto(msg)

I'm guessing here, the consumer can no longer Commit the selected message because it no longer owns the partition. What should be done in this case? It might make most sense to drop the commit seeing how the message will be reprocessed anyways, or somehow let the application know that a rebalance occurred and your messages don't matter.

Kafkaconsumer stops consuming after ZK connection lost/timeout

Using banch refactor (#72) and after applying PR @ samuel/go-zookeeper#84 to my code tree, my kafkaconsumer still fails to consume data when an IO timeout occurs on the connection to Zookeeper.

To reproduce you can prevent the consumer from accessing zookeeper for a few seconds:

root@root# iptables -A  OUTPUT -p tcp -m tcp --dport 2181 -j DROP # Add rule to block outgoing traffic to zookeeper
... Wait a few seconds...
root@root# iptables -D  OUTPUT -p tcp -m tcp --dport 2181 -j DROP # Remove rule

Log output (I changed consumerManager.run() so it runs every 10s):

15:19:51 Recv loop terminated: err=read tcp 10.10.12.62:2181: i/o timeout
15:19:51 Send loop terminated: err=<nil>
15:19:51 [instance=f8565f67ec4a] Failed to watch subscription: zk: connection closed. Trying again in 1 second...
15:19:53 Failed to connect to 10.10.12.62:2181: dial tcp 10.10.12.62:2181: i/o timeout
15:19:53 [instance=f8565f67ec4a] Failed to watch subscription: zk: could not connect to a server. Trying again in 1 second...
..
15:20:26 Connected to 10.10.12.62:2181
15:20:26 Authenticated: id=94641789739226871, timeout=4000
15:20:26 [instance=f8565f67ec4a] Currently, 0 instances are registered, to consume 2 partitions in total.
15:20:26 [instance=f8565f67ec4a] This instance is assigned to consume 0 partitions, and is currently consuming 2 partitions.
[Sarama] 2015/10/08 15:20:26 consumer/broker/0 closed dead subscription to rqueue.out.bs_msg_in/0
[Sarama] 2015/10/08 15:20:26 consumer/broker/0 closed dead subscription to rqueue.out.bs_msg_in/1
15:20:26 [instance=f8565f67ec4a partition=rqueue.out.bs_msg_in/0] Offset 579 has been processed. Continuing shutdown...
15:20:26 [instance=f8565f67ec4a partition=rqueue.out.bs_msg_in/1] FAILED to release partition: Cannot release partition: it is not claimed by this instance
15:20:26 [instance=f8565f67ec4a partition=rqueue.out.bs_msg_in/0] FAILED to release partition: Cannot release partition: it is not claimed by this instance
15:20:36 [instance=f8565f67ec4a] Currently, 0 instances are registered, to consume 2 partitions in total.
15:20:36 [instance=f8565f67ec4a] This instance is assigned to consume 0 partitions, and is currently consuming 0 partitions.
15:20:46 [instance=f8565f67ec4a] Currently, 0 instances are registered, to consume 2 partitions in total.
15:20:46 [instance=f8565f67ec4a] This instance is assigned to consume 0 partitions, and is currently consuming 0 partitions.

Consumer group not consuming from partitions -- Kafka v11

Hey,

I just wanted to raise this issue for awareness sake. We use the latest version of Kafka, and your library has been working excellent for us for the past versions of kafka. In the newer version, sometimes even though the partition has an owner, the messages are not consumed from it. i tried restarting the program, assuming I was blocking on some error channel but still no avail. Would you say this is compatible with newer versions of kafka knowing sarama has no support for 0.11

zk: node does not exist

I tried the example, but failed with a working kafka. The topic and zookeeper configurations are right.

-group="click1": The name of the consumer group, used for coordination and load balancing
-topics="bjcnc_nginx_nginx_newmysql0": The comma-separated list of topics to consume
-zookeeper="10.16.13.133:2181,10.16.13.134:2181,10.16.13.135:2181,10.16.13.136:2181,10.16.13.137:2181/realtime-monitor-newmysql": A comma-separated Zookeeper connection string (e.g. zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181)
2016/10/22 17:30:38 Before JoinConsuerGroup, consumerGroup: click1 , nodes: [10.16.13.133:2181 10.16.13.134:2181 10.16.13.135:2181 10.16.13.136:2181 10.16.13.137:2181] , root: /realtime-monitor-newmysql
2016/10/22 17:30:38 Connected to 10.16.13.137:2181
2016/10/22 17:30:38 Authenticated: id=384865011391368701, timeout=4000
2016/10/22 17:30:38 Re-submitting 0 credentials after reconnect
2016/10/22 17:30:38 Recv loop terminated: err=EOF
2016/10/22 17:30:38 zk: node does not exist

Can't receive messages when i test the demo which the project gives.

Here's my code.
`package main

import (
"flag"
"log"
"os"
"os/signal"
"strings"
"time"

"fmt"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
"github.com/wvanbergen/kazoo-go"

)

const (
DefaultKafkaTopics = "kafka_performance_test"
DefaultConsumerGroup = "testsarama1"
MAX_COUNT = 1000000
)

var (
consumerGroup = flag.String("group", DefaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing")
kafkaTopicsCSV = flag.String("topics", DefaultKafkaTopics, "The comma-separated list of topics to consume")
zookeeper = flag.String("zookeeper", "openlive-kafka-online001-bjlt.qiyi.virtual:2181,openlive-kafka-online002-bjlt.qiyi.virtual:2181,openlive-kafka-online003-bjlt.qiyi.virtual:2181,openlive-kafka-online004-bjlt.qiyi.virtual:2181,openlive-kafka-online005-bjlt.qiyi.virtual:2181", "A comma-separated Zookeeper connection string (e.g. zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181)")
zookeeperNodes []string
)

func init() {
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}

func main() {
flag.Parse()

if *zookeeper == "" {
	flag.PrintDefaults()
	os.Exit(1)
}

config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 10 * time.Second

zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)

kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")

consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
if consumerErr != nil {
	log.Fatalln(consumerErr)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
	<-c
	if err := consumer.Close(); err != nil {
		sarama.Logger.Println("Error closing the consumer", err)
	}
}()

go func() {
	for err := range consumer.Errors() {
		log.Println(err)
	}
}()

eventCount := 0
offsets := make(map[string]map[int32]int64)
start_time := time.Now().UnixNano() / 1000000

fmt.Println(len(consumer.Messages()))

for message := range consumer.Messages() {

	fmt.Println(string(message.Value))
	if offsets[message.Topic] == nil {
		offsets[message.Topic] = make(map[int32]int64)
	}

	if eventCount > MAX_COUNT {
		break
	}

	fmt.Println("eventCount:", eventCount)
	eventCount += 1

	if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
		log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
	}

	// Simulate processing time
	// time.Sleep(10 * time.Millisecond)

	offsets[message.Topic][message.Partition] = message.Offset
	consumer.CommitUpto(message)
}

fmt.Println("total_count: ", eventCount)
end_time := time.Now().UnixNano() / 1000000
process_time := end_time - start_time
fmt.Println("process_time: ", process_time)

}
and my output follows:2018/01/09 17:15:05 Connected to 10.13.44.23:2181
2018/01/09 17:15:05 Authenticated: id=170991064662003625, timeout=6000
[Sarama] 2018/01/09 17:15:05 Initializing new client
[Sarama] 2018/01/09 17:15:05 client/metadata fetching metadata for all topics from broker openlive-kafka-online005-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 Connected to broker at openlive-kafka-online005-bjlt.qiyi.virtual:9092 (unregistered)
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #5 at openlive-kafka-online005-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #10 at openlive-kafka-online014-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #1 at openlive-kafka-online001-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #6 at openlive-kafka-online010-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #9 at openlive-kafka-online013-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #2 at openlive-kafka-online002-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #7 at openlive-kafka-online011-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #3 at openlive-kafka-online003-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #8 at openlive-kafka-online012-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #4 at openlive-kafka-online004-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 Successfully initialized new client
[Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] Consumer instance registered (hcdn-others-worker-dev100-bjlt.qiyi.virtual:90c7388b-c001-45bc-8dc9-d2a251a23aba).
len(consumer.Messages()): 0
[Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] Currently registered consumers: 1
[Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] kafka_performance_test :: Started topic consumer
[Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] kafka_performance_test :: Claiming 10 of 10 partitions
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/1 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/6 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online005-bjlt.qiyi.virtual:9092 (registered as #5)
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/0 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/3 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online014-bjlt.qiyi.virtual:9092 (registered as #10)
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online004-bjlt.qiyi.virtual:9092 (registered as #4)
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/8 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 consumer/broker/5 added subscription to kafka_performance_test/1
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/9 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online002-bjlt.qiyi.virtual:9092 (registered as #2)
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online011-bjlt.qiyi.virtual:9092 (registered as #7)
[Sarama] 2018/01/09 17:15:06 consumer/broker/10 added subscription to kafka_performance_test/6
[Sarama] 2018/01/09 17:15:06 consumer/broker/4 added subscription to kafka_performance_test/0
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online003-bjlt.qiyi.virtual:9092 (registered as #3)
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/5 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 consumer/broker/2 added subscription to kafka_performance_test/8
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/4 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 consumer/broker/7 added subscription to kafka_performance_test/3
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/7 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online013-bjlt.qiyi.virtual:9092 (registered as #9)
[Sarama] 2018/01/09 17:15:06 consumer/broker/3 added subscription to kafka_performance_test/9
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/2 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online001-bjlt.qiyi.virtual:9092 (registered as #1)
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online012-bjlt.qiyi.virtual:9092 (registered as #8)
[Sarama] 2018/01/09 17:15:06 consumer/broker/9 added subscription to kafka_performance_test/5
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online010-bjlt.qiyi.virtual:9092 (registered as #6)
[Sarama] 2018/01/09 17:15:06 consumer/broker/1 added subscription to kafka_performance_test/7
[Sarama] 2018/01/09 17:15:06 consumer/broker/8 added subscription to kafka_performance_test/4
[Sarama] 2018/01/09 17:15:06 consumer/broker/6 added subscription to kafka_performance_test/2`

You can see i received anything from kafka, and the length of consumer.Messages() is 0, i don't know the mistakes where i take,please do me a favour. Thanks!

And i find that if i change a new Consumergroup , i encounter the following mistake, but i don't know how to fix it.

[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] Consumergroup testsarama3 does not yet exists, creating...
[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] Consumer instance registered (hcdn-others-worker-dev100-bjlt.qiyi.virtual:0cfbc8c2-5203-4d3d-8283-77ab52d3afbb).
len(consumer.Messages()): 0
[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] Currently registered consumers: 1
[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] kafka_performance_test :: Started topic consumer
[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] kafka_performance_test :: Claiming 10 of 10 partitions
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/5 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/3 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/7 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/6 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/2 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/8 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/1 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/9 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/4 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists

JoinConsumerGroup silently fails if topics are not created

The issue I'm running into is on our local development boxes, I'd like to handle the situation where the partitions are not yet set up for a Kafka topic:

[Sarama] 2015/03/12 18:11:25 [consumer/a0103be9d1f2] Currently registered consumers: 1
[Sarama] 2015/03/12 18:11:25 [consumer/a0103be9d1f2] session :: Started topic consumer
[Sarama] 2015/03/12 18:11:25 [consumer/a0103be9d1f2] session :: FAILED to get list of partitions: zk: node does not exist

I believe that is saying that it's registering the consumer, but zookeeper doesn't know about the partitions for the topic. Unfortunately it seems to print this message to the stdout log and continue on. There don't appear to be any messages in any error channels or any errors coming back as this is returned form a go routine.

It would be great to detect if this scenario has occurred, and be able to handle the case where my consumers come up before topics and partitions are created (for local development with docker in particular).

Partitions are split evenly among consumers / some consumers left out

Currently, the library attempts to split partitions among consumers in a completely even manner (each consumer gets the same number of partitions. For example, if you have 32 partitions and 9 consumers, 8 consumers get assigned partitions at 4 partitions ea., and one is just left hanging. Is that intentional - it seems like a pretty easy fix.

Race condition in partition rebalance.

(Moved from #61)

Actually I was looking into this because I was having an issue where 2 of my nodes would stop accepting requests. I think this might be related - when my 9th node comes up one node gives up all its partitions, and another node tries to claim those partitions and fails:

It looks like this might be a data race?
Node A tries to grab 16, 17, 18, 19 and fails.

[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] Triggering rebalance due to consumer list change
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user/14 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user/15 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user/12 :: Stopping partition consumer at offset 44
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user/13 :: Stopping partition consumer at offset 43
[Sarama] 2015/07/05 02:10:35 consumer/broker/40770 closed dead subscription to geard-user/13
[Sarama] 2015/07/05 02:10:35 consumer/broker/40770 closed dead subscription to geard-user/14
[Sarama] 2015/07/05 02:10:35 consumer/broker/40770 closed dead subscription to geard-user/15
[Sarama] 2015/07/05 02:10:35 consumer/broker/40770 closed dead subscription to geard-user/12
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user :: Stopped topic consumer
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] Currently registered consumers: 9
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user :: Started topic consumer
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user :: Claiming 4 of 32 partitions
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user/16 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user/17 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user/18 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user/19 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user :: Stopped topic consumer
[Sarama] 2015/07/05 02:18:46 client/metadata fetching metadata for all topics from broker 10.129.196.48:9092

Node B lets go of 16,17,18,19 possible after Node A tries to acquire it.

[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] Triggering rebalance due to consumer list change
[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] geard-user/16 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] geard-user/17 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] geard-user/18 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] geard-user/19 :: Stopping partition consumer at offset 44
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 closed dead subscription to geard-user/18
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 closed dead subscription to geard-user/19
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 closed dead subscription to geard-user/16
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 closed dead subscription to geard-user/17
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user :: Stopped topic consumer
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] Currently registered consumers: 9
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user :: Started topic consumer
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user :: Claiming 4 of 32 partitions
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user/20 :: Partition consumer starting at offset 37.
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user/21 :: Partition consumer starting at offset 50.
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 added subscription to geard-user/20
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user/22 :: Partition consumer starting at offset 57.
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user/23 :: Partition consumer starting at offset 38.
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 added subscription to geard-user/21
[Sarama] 2015/07/05 02:10:37 consumer/broker/40770 added subscription to geard-user/23
[Sarama] 2015/07/05 02:10:37 consumer/broker/40770 added subscription to geard-user/22
[Sarama] 2015/07/05 02:18:42 client/metadata fetching metadata for all topics from broker 10.129.196.48:9092

It looks like the naive thing to do would be to possibly sleep for a second in topicListConsumer() - however using something other than Sleep to solve this race condition might be better - unfortunately I don't yet have a great understanding of how consumergroups work.

Or, retry claiming a set number of times?

consumption message delay

hi,anyone
I create a consumergroup to consumer some topics, and I print offset of each topic, and find some topic consumption delay.
Meanwhile, i create a consumergroup with "github.com/Shopify/sarama", and find topic is normal consumption.
Anyone can help me fix the issue?

tips:

**normal consumption source code**

kafka.go.zip

**consumption delay source code**
kafka_0.8.2_group.go.zip
delay_log.txt

Try to FixBug: Two consumers compete partitions on zkpath: $kafka_root/consumers/$group/owners/$topic/{$partid1; $partid2...}

Hi

I have met with a bug: I have 32 partitions in one topic, and two consumers using the same group in this topic. When consumer A starts, it will consume all 32 partitions, as expected. But when the second consumer B starts, B will try to consume 16 partitions which have not been yield from consumer A yet.

So I just put some retry code to fix this bug, waiting for author's official fix. Somebody who has also met with this bug, can use this code temporarily.

In github.com/wvanbergen/kafka/consumergroup/consumer_group.go, line 339, function partitionConsumer:

change code from:

    err := cg.instance.ClaimPartition(topic, partition)

to

    retry_sec := 10
    var err error
    for i:=0; i< retry_sec; i++ {
            err = cg.instance.ClaimPartition(topic, partition)
            if err == nil {
                    break   
            }       
            cg.Logf("%s/%d :: Retry to Claim the partition : %s\n" ,topic, partition, partition)
            time.Sleep(1*time.Second)
    }      

Thanks.

ConsumerGroup.FlushOffsets()

eg:cg, _ := consumergroup.JoinConsumerGroup("convert", kafkaTopics, Conf.Kafka.Consumer.ZKAddrs, config)
in the same partision
msg.offset is 187 but i did't exec
cg.CommitUpto(msg) cg.FlushOffsets()
and msg.offet is 188 exec
cg.CommitUpto(msg) cg.FlushOffsets()
and msg.offet is 189 didn't exec
cg.CommitUpto(msg) cg.FlushOffsets()
and then i restart my consumergroup . cg.Messages() will retrive 189,but 187 will not be retrived. i 'm consufed by this

ConsumerGroup.Messages()

i create lots of ConsumerGroup with JoinConsumerGroup, and fetch msg from kafka by
for event := range consumer.Messages() {
// Process event
log.Println(string(event.Value))
eventCount += 1

// Ack event
consumer.CommitUpto(event)

}

when i run my program, consumer.Messages() return nil. the fowllowing my code

`type KafkaConsumer struct {
Topic string
Kafka //kafka configure
GroupConsumer *consumergroup.ConsumerGroup
Msgs chan *sarama.ConsumerMessage //提供给外部程序使用
Exit chan bool
}

func (k Kafka) NewConsumer(topic string) (*KafkaConsumer, error) {

config := consumergroup.NewConfig()
config.Offsets.Initial = Convert(k.Where)
config.Offsets.ProcessingTimeout = 10 * time.Second
var zookeeperNodes []string
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(topic)
k.Alltopic = append(k.Alltopic, topic)

groupconsumer, err := consumergroup.JoinConsumerGroup(k.Groupname, zookeeperNodes, k.Zookeeper, config)
if err != nil {
	return nil, err
}

msgchan := make(chan *sarama.ConsumerMessage, 30)

ret := &KafkaConsumer{Kafka: k, GroupConsumer: groupconsumer,
	Msgs: msgchan, Exit: make(chan bool, 1), Topic: topic}
log.Printf("new a zookeeper consumer, info:%s, topic:%s, groupconsumer:%+v\n", k.String(), topic, ret)
return ret, nil

}

func (k *KafkaConsumer) Close() error {
if k.GroupConsumer != nil {
if err := k.GroupConsumer.Close(); err != nil {
log.Printf("stop a zookeeper consumer fail. err:%s hostinfo:%s\n",
err.Error(), k.String())
return err
} else {
log.Printf("stop a zookeeper consumer success, hostinfo:%s\n", k.String())
}
}
k.Exit <- true
return nil
}

func (k *KafkaConsumer) String() string {
return fmt.Sprintf("%s input_topic:%s, groupconsumer:%+v, msgchan=%+v", k.Kafka.String(),
k.Topic, k.GroupConsumer, k.Msgs)
}

func (k *KafkaConsumer) Dispatcher(mylog *Log) {
ticker := time.NewTicker(time.Second * 10)
log.Printf("topic:%s start a dispatcher\n", k.Topic)
for {
select {
case msg := <-k.GroupConsumer.Messages():
if msg != nil && msg.Value != nil && len(msg.Value) > 0 {
k.Msgs <- msg
k.GroupConsumer.CommitUpto(msg)
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, msg_len=%d, send to recevie chan",
k.Alltopic, msg.Topic, len(msg.Value))
} else {
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v,error format msg:%+v",
k.Alltopic, k.Topic, msg)
}
case err := <-k.GroupConsumer.Errors():
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, fetch msg err,%+v", k.Alltopic, k.Topic, err)
case <-k.Exit:
log.Printf("conumser_topic:%+v, msg_topic:%+v, dispatcher exit\n", k.Alltopic, k.Topic)
ticker.Stop()
return
case <-ticker.C:
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, dispatcher ticker 10s, Consumer:%+v, chan_len:%d",
k.Alltopic, k.Topic, k)
}
}
}`

how to fix it?
best wishes.

Expose a way to set offset manager config

While there are public methods for initializing the offset manager and its config, JoinConsumerGroup() does not make it possible to set this. It sets internally to a private member, offsetManager. I don't see a way to set VerboseLogging in the consumer group API.

Upgrading to kafka based offset storage

I am planning to upgrade sarama clients from 0.8 to latest version and use sarama-cluster. I see that offset will be committed to kafka now.

On Apache Kafka page it says that for doing migration from zk based storage to kafka you need to do following:
Set offsets.storage=kafka and dual.commit.enabled=true in your consumer config.

I cannot find these properties anywhere? Does anyone know if we need to set it and where?

some partitions have no owner

Hi,
I find a problem that when I restart my consumers, a partition has no owner.

Consumer group will trigger rebalance when I stop a consumer. Its timeout in function finalizePartition is cg.config.Offsets.ProcessingTimeout, and retrial times is also cg.config.Offsets.ProcessingTimeout in rebalancing.

It should finish some logical processing which it was doing and running function finalizePartition before a consumer stop. This operating maybe spend time more than cg.config.Offset.ProcessingTimeout, so rebalancing will maybe fail, and some partitions have no owner.

To solve this problem, maybe we can add a goroutine to watch partition's owner, and it can also avoid some problem when partition numbers make changes such as kafka broker capacity expansion.

Trouble reliably reusing consumer group names across topics

When the same identifier for Kafka group is used to consume different topics, sometimes the wvanbergen client receives 0 messages.

As a workaround, users can explicitly prefix each group name with <topic>-, but ideally this would be fixed by the client library itself.

"This is really shitty and needs reworking"

I noticed you have this comment in partition_consumer.go:

// This is shitty and reallt needs reworking.
p.stream.Close()
if err := p.setSaramaConsumer(0); err != nil {
return err
}

I just had a runtime panic that this that Close() line
"panic: runtime error: close of closed channel"

before I dig in, since you already know there are issues I wanted to get your thoughts on what you might have in mind for a fix or suggestions for places to start poking around

thanks,
Jim

Run into "index out of range" when launch multiple concurrent consumers "go routine" in a single go process

Is this supported, multiple consumers of a same consumer group run concurrently in a single go process? I run into following err, any ideas? Thanks!

panic: runtime error: index out of range

goroutine 4395258 [running]:
panic(0x7e1ec0, 0xc820010090)
/usr/local/go/src/runtime/panic.go:464 +0x3e6
github.com/wvanbergen/kazoo-go.(_ConsumergroupInstanceList).Less(0xc8df7bc480, 0x3, 0xffffffffffffffff, 0x1)
:10 +0x124
sort.doPivot(0x7fa8cba22338, 0xc8df7bc480, 0x3, 0x18, 0x9, 0x3)
/usr/local/go/src/sort/sort.go:128 +0x27b
sort.quickSort(0x7fa8cba22338, 0xc8df7bc480, 0x3, 0x18, 0x8)
/usr/local/go/src/sort/sort.go:195 +0xa3
sort.Sort(0x7fa8cba22338, 0xc8df7bc480)
/usr/local/go/src/sort/sort.go:229 +0x74
github.com/wvanbergen/kafka/consumergroup.dividePartitionsBetweenConsumers(0xc8f8824540, 0x18, 0x18, 0xc8bac5fc00, 0x3c, 0x3c, 0x0)
/Users/lhan/Dev/gopath/src/github.com/wvanbergen/kafka/consumergroup/utils.go:40 +0x192
github.com/wvanbergen/kafka/consumergroup.(_ConsumerGroup).topicConsumer(0xc82479a090, 0x887910, 0xb, 0xc82649d080, 0xc82649d0e0, 0xc86102b740)
/Users/lhan/Dev/gopath/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:333 +0x850
created by github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).topicListConsumer
/Users/lhan/Dev/gopath/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:271 +0x40a

consumergroup hangs after sometime

Hello,

I have observe when I use consumer group library, it works fine for short time after that it does not put offset in the zookeeper. After sometime, I only see the following offsets.

/consumers/testgroup/offsets/mmetopic1/2:2502052
/consumers/testgroup/offsets/mmetopic1/1:2417072
/consumers/testgroup/offsets/mmetopic1/0:2482172

My producer keep writing new messages however consumer entries are still the same. When I try to instantiate another consumer, I get the following message (which might be useful for you to debug).

[Sarama] 2014/08/30 04:48:37 Initializing new client
[Sarama] 2014/08/30 04:48:37 Fetching metadata from broker 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #1 at 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #3 at 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #2 at 10.1.129.1:9092
[Sarama] 2014/08/30 04:48:37 Successfully initialized new client
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.129.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Initializing new client
[Sarama] 2014/08/30 04:48:37 Fetching metadata from broker 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #1 at 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #3 at 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #2 at 10.1.129.1:9092
[Sarama] 2014/08/30 04:48:37 Successfully initialized new client
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.129.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Consumer instance registered (tpalkcoe3b9:66a33176-668a-4c89-b314-d238b36d178a).
init succeesfull
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Currently registered consumers: 5
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Started topic consumer for mmetopic1
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Claiming 1 of 3 partitions for topic mmetopic1.
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Started partition consumer for mmetopic1:2 at offset 2502052.
[Sarama] 2014/08/30 04:48:38 [testgroup/d238b36d178a] Stopping partition consumer for mmetopic1:2 at offset 0.
[Sarama] 2014/08/30 04:48:38 [testgroup/d238b36d178a] Stopped topic consumer for mmetopic1

Bug? Consuming stops or hangs at certain time.

I have met a problem:

Code written using wvanbergen/kafka consumes messages successfully at first, but after receiving 872 messages, the consuming loop ended. I am sure all 872 messages are processed successfully by another goroutine.

Then I restarted my program, still no message could be consumed.

While at this time, on kafka manager, the consumer offset stops and never grows, while the log size and total lag num are constantly growing.

If I use kafka consuming test script kafka-console-consumer.sh, with the same zk/consumer group, all messages could be successfully consumed.

So I could just consider it as a bug...

BTW, the code is displayed as following (The "received %d msg" log printed at first but now never prints again, and the log "Message terminated." never ever printed even once):

BTW again, the same thing happened last week, and I stopped my program at that time. Today I ran my program again, at first everything was right, but then stopped again.

`consumer, consumerErr := consumergroup.JoinConsumerGroup(consumerGroupName, kafkaTopics, p.zooServer, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}

go func() {
	for err := range consumer.Errors() {
		log.Printf("receive consumer error: %s\n", err)
		if consumer.Closed() {
			consumer, _ = consumergroup.JoinConsumerGroup(consumerGroupName, kafkaTopics, p.zooServer, config)
		}
	}
}()

eventCount := 0
offsets := make(map[string]map[int32]int64)
go func() {
	log.Infof("Consumer group: close = %t, %s", consumer.Closed(), consumer)
	for message := range consumer.Messages() {
		if offsets[message.Topic] == nil {
			offsets[message.Topic] = make(map[int32]int64)
		}
		eventCount += 1
		if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
			log.Printf("unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
		}

		log.Infof("received %d msg", eventCount)
		p.recordRecv()
		p.msgChan <- message.Value

		offsets[message.Topic][message.Partition] = message.Offset
		consumer.CommitUpto(message)
	}
	log.Infoln("Message terminated.")
}()`

Ability to reliably process messages?

First of all, thanks for writing and sharing this library! It looks like a promising
option for writing multi-node Kafka consumers in Go.

One possible issue I have is the ability to reliably process messages. I'd like
to be able guarantee that I don't commit an offset before I've positively
confirmed that all of the corresponded messages have been processed
successfully (i.e. not just passed into the processing code, but successfully
completed the associated application-level processing). IIUC the current
API doesn't support this. After you receive an event from the cg.Stream(),
the corresponding offset could be committed by the client at any time in the
background.

Do I understand this correctly? If so, is reliable processing something that
you'd be interested in supporting in the high-level API?

Null pointer exception when starting another instance for the same consumer group

I start multiple processes which join the same consumer group.

If there are processes that are currently consuming kafka messages while a new consumer process is spawned, often some of the existing processes panic and crash with a null pointer exception (NPE) when I try to commit the latest offset (CommitUpto).

The NPE occurs because the partition has been deleted from the zookeeperOffsetManager in the following function:

func (zom *zookeeperOffsetManager) MarkAsProcessed(topic string, partition int32, offset int64) bool {
    zom.l.RLock()
    defer zom.l.RUnlock()
    return zom.offsets[topic][partition].markAsProcessed(offset)
}

My local work around has been to add a check and short circuit:

func (zom *zookeeperOffsetManager) MarkAsProcessed(topic string, partition int32, offset int64) bool {
    zom.l.RLock()
    defer zom.l.RUnlock()
    currentOffset := zom.offsets[topic][partition]
    if currentOffset == nil {
        return false
    }
    return currentOffset.markAsProcessed(offset)
}

Some problems when I try to run my consumer in two node.

I used to run a consumer( just like examples/main.go) on one node(Node A), but the CPU usage is too high, so I copy my program to another node(Node B). Now I have two consumer(the consume the same topic) with same configuration.

But, here comes the problem. I find the second node(Node B)'s CPU usage is very high just as consumer runs on one node. And the first node(Node A)'s CPU usage is almost zero.
In my opinion, the two node will consume the same topic and their CPU usage should be the same.
But the result is beyond my expectations. Can you point out is there anything wrong with me?

I will be appreciative. Thank you anyway.

Partition rebalance has serious issues

Given the following test program:

package main

import (
    "fmt"
    "log"
    "os"
    "time"

    "github.com/wvanbergen/kafka/consumergroup"
    "gopkg.in/Shopify/sarama.v1"
)

func main() {
    sarama.Logger = log.New(os.Stdout, "[Sarama]", log.LstdFlags)
    config := consumergroup.NewConfig()
    config.Offsets.Initial = sarama.OffsetOldest
    config.Offsets.ProcessingTimeout = 10 * time.Second

    consumer, consumerErr := consumergroup.JoinConsumerGroup("FOO_TEST_CAN_GO", []string{"testproducercango"}, []string{"127.0.0.1"}, config)
    if consumerErr != nil {
        log.Fatalln(consumerErr)
    }
    defer consumer.Close()

    go func() {
        for err := range consumer.Errors() {
            log.Println(err)
        }
    }()

    eventCount := 0
    messProcessed := make(map[string]int)

StreamLoop:
    for {
        select {
        case <-time.After(time.Second * 10):
            break StreamLoop
        case mess := <-consumer.Messages():
            fmt.Printf("Got event from stream. Topic: %v, Partition: %v, Offset: %v, Mess: %v \n", mess.Topic, mess.Partition, mess.Offset, string(mess.Value))
            eventCount += 1
            // Simulate processing time
            time.Sleep(2 * time.Second)
            consumer.CommitUpto(mess)
            messProcessed[string(mess.Value)] += 1
        }
    }

    log.Printf("Processed %d events.", eventCount)
    log.Printf("%+v", messProcessed)
}

With topic testproducercango having 20 messages called bladiebla1...bladiebla20 in 2 partitions.
I get the following output when I start this program 4 times in parallel.
Pid 0:

[Sarama]2015/04/07 17:37:39 Initializing new client
[Sarama]2015/04/07 17:37:39 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:39 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:39 Successfully initialized new client
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Consumer instance registered (me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840).
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Currently registered consumers: 1
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Claiming 2 of 2 partitions
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Partition consumer starting at the oldest available offset.
[Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Partition consumer starting at the oldest available offset.
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 0, Mess: bladiebla2 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 1, Mess: bladiebla3 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4 
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Stopping partition consumer at offset 6
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Last processed offset: 1. Waiting up to 10s for another 5 messages to process...
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Stopping partition consumer at offset 12
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Last processed offset: -1. Waiting up to 10s for another 13 messages to process...
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: -1
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] FAILED closing the offset manager: Not all offsets were committed before shutdown was completed!
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] Deregistered consumer instance me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840.
[Sarama]2015/04/07 17:37:55 Closing Client
[Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x47a613]

goroutine 1 [running]:
github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x2, 0x0)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123
github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x0, 0x2, 0xc208381e00)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208041590, 0x0, 0x0)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1
main.main()
    /home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f

goroutine 5 [semacquire]:
sync.(*WaitGroup).Wait(0xc20801e140)
    /usr/local/go/src/sync/waitgroup.go:132 +0x169
github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0)
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d
github.com/samuel/go-zookeeper/zk.func·001()
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c
created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f

goroutine 7 [runnable]:
github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f9731560bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0)
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9
github.com/samuel/go-zookeeper/zk.func·002()
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a
created by github.com/samuel/go-zookeeper/zk.(*Conn).loop
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680

goroutine 17 [syscall, locked to thread]:
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:2232 +0x1

Pid 1:

[Sarama]2015/04/07 17:37:45 Initializing new client
[Sarama]2015/04/07 17:37:45 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:45 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:45 Successfully initialized new client
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Consumer instance registered (me-user:7c3f576d-063f-4399-b346-509180e0075d).
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 2.
[Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 3, Mess: bladiebla8 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 4, Mess: bladiebla12 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 5, Mess: bladiebla14 
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset 6
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Last processed offset: 4. Waiting up to 10s for another 2 messages to process...
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 6, Mess: bladiebla19 
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 0 of 2 partitions
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 3.
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6 
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process...
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13 
[Sarama]2015/04/07 17:38:13 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: 3
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 1
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 2 of 2 partitions
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 4.
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18 
[Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset -1
[Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12
2015/04/07 17:38:50 Processed 24 events.
2015/04/07 17:38:50 map[bladiebla13:2 bladiebla17:2 bladiebla6:1 bladiebla7:2 bladiebla16:2 bladiebla12:1 bladiebla8:1 bladiebla19:1 bladiebla11:2 bladiebla15:2 bladiebla4:1 bladiebla9:2 bladiebla10:2 bladiebla18:2 bladiebla14:1]
[Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] Deregistered consumer instance me-user:7c3f576d-063f-4399-b346-509180e0075d.
[Sarama]2015/04/07 17:38:51 Closing Client
[Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092

Pid 2:

[Sarama]2015/04/07 17:37:52 Initializing new client
[Sarama]2015/04/07 17:37:52 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:52 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:52 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:52 Successfully initialized new client
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Consumer instance registered (me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7).
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/14644cac47d7] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1
[Sarama]2015/04/07 17:37:56 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1
2015/04/07 17:38:02 Processed 0 events.
2015/04/07 17:38:02 map[]
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] Deregistered consumer instance me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7.
[Sarama]2015/04/07 17:38:02 Closing Client
[Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092

Pid 3:

[Sarama]2015/04/07 17:37:55 Initializing new client
[Sarama]2015/04/07 17:37:55 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:55 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:55 Successfully initialized new client
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Consumer instance registered (me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0).
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Partition consumer starting at the oldest available offset.
[Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 0, Mess: bladiebla0 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 1, Mess: bladiebla1 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 2, Mess: bladiebla5 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6 
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Stopping partition consumer at offset 12
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process...
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 2
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Stopping partition consumer at offset -1
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] Deregistered consumer instance me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0.
[Sarama]2015/04/07 17:38:03 Closing Client
[Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x47a613]

goroutine 1 [running]:
github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x3, 0x0)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123
github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x1, 0x3, 0xc208381700)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208040fa0, 0x0, 0x0)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1
main.main()
    /home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f

goroutine 5 [semacquire]:
sync.(*WaitGroup).Wait(0xc20801e140)
    /usr/local/go/src/sync/waitgroup.go:132 +0x169
github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0)
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d
github.com/samuel/go-zookeeper/zk.func·001()
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c
created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f

goroutine 7 [runnable]:
github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f7bc8842bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0)
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9
github.com/samuel/go-zookeeper/zk.func·002()
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a
created by github.com/samuel/go-zookeeper/zk.(*Conn).loop
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680

goroutine 17 [syscall, locked to thread]:
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:2232 +0x1

Issues in bold. For now I see two:

  • markAsProcessed acts on already closed partition
  • Messages are processed multiple times!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.