Giter VIP home page Giter VIP logo

sarama-cluster's Introduction

Sarama Cluster

GoDoc Build Status Go Report Card License

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Documentation

Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster

Examples

Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple topics and partitions are all passed to the single channel:

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume errors
	go func() {
		for err := range consumer.Errors() {
			log.Printf("Error: %s\n", err.Error())
		}
	}()

	// consume notifications
	go func() {
		for ntf := range consumer.Notifications() {
			log.Printf("Rebalanced: %+v\n", ntf)
		}
	}()

	// consume messages, watch signals
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "")	// mark message as processed
			}
		case <-signals:
			return
		}
	}
}

Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level consumers:

package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"

  cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, set mode to ConsumerModePartitions
	config := cluster.NewConfig()
	config.Group.Mode = cluster.ConsumerModePartitions

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume partitions
	for {
		select {
		case part, ok := <-consumer.Partitions():
			if !ok {
				return
			}

			// start a separate goroutine to consume messages
			go func(pc cluster.PartitionConsumer) {
				for msg := range pc.Messages() {
					fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
					consumer.MarkOffset(msg, "")	// mark message as processed
				}
			}(part)
		case <-signals:
			return
		}
	}
}

Running tests

You need to install Ginkgo & Gomega to run tests. Please see http://onsi.github.io/ginkgo for more details.

To run tests, call:

$ make test

Troubleshooting

Consumer not receiving any messages?

By default, sarama's Config.Consumer.Offsets.Initial is set to sarama.OffsetNewest. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.

If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set Config.Consumer.Offsets.Initial to sarama.OffsetOldest.

sarama-cluster's People

Contributors

dim avatar relud avatar elakito avatar f21 avatar arnaudbriche avatar daichirata avatar jriecken avatar boyand avatar d33d33 avatar thehydroimpulse avatar

Watchers

Chirayu Krishnappa avatar Sev avatar Joshua T Corbin avatar  avatar Chamara Paul avatar Eugene Yaroslavtsev avatar Chris Kleinknecht avatar Motiejus Jakštys avatar  avatar Sunil Garg avatar  avatar Jacob avatar Prashant Varanasi avatar Yang Wang avatar Zheng Shao avatar Minjie Zha avatar Todd Lee avatar Anthony Tran avatar Wit Riewrangboonya avatar Caner Balci avatar  avatar  avatar Andrew Dorr avatar  avatar Hilary avatar Karim Fateem avatar Zachary M. Train avatar Krishna Suravaram avatar Emi Huang avatar  avatar Yixin Zhu avatar Nick Turner avatar Connor Skye Riley avatar Matthew Finifter avatar Cat Dumitru avatar Rauf Rajar avatar Catriona Scott avatar  avatar John O'Connor avatar Curt avatar Oren Freiberg avatar  avatar Haider Sabri avatar En Yu avatar Jake Verbaten avatar Shirley Zhou avatar Erik Seaberg avatar CHAD XU avatar  avatar Arun Nagarajan avatar Martha Kelly Schumann avatar Tim Schmelmer avatar jeff hu avatar Jagmeet Singh avatar Yifu Diao avatar Andreas Sæbjørnsen avatar Will Hughes avatar Kostia Dombrovsky avatar Keith Tsui avatar Ning Li avatar Michael Mui avatar Nilesh Mishra avatar Kevin Roth avatar Tagir Magomedov avatar  avatar Janani avatar Ritesh Agrawal avatar  avatar Dominic Becker avatar binuiq avatar Ryan Tsao avatar Grace avatar Xiaoyu avatar  avatar Ken Buckner avatar Alex Zylman avatar Won Jun Jang avatar  avatar  avatar Kermen Deol avatar Nadeem avatar Anindya avatar Yunfeng Bai avatar James Cloos avatar Sri Kanajan avatar  avatar  avatar Bryan Pon avatar Joel Lehman avatar Adam Krebs avatar ramon avatar Suganya Ramachandran avatar  avatar Franck Michea avatar prateek garg avatar Venkat avatar Mo Kouli avatar Christopher Francis avatar Ali-Reza Adl-Tabatabai avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.