Comments (3)
Once you close the consumer, the Messages()
and Errors()
channels will be closed, which means they will start returning nil
values continuously. You should break your loop when that happens.
from kafka.
kafka has data without dealing, i break my loop, than i can't fetch data without deal, this don't suit for the act.
from kafka.
# a.go file
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
"github.com/wvanbergen/kazoo-go"
"log"
"strings"
"time"
)
//kafka基本信息
type Kafka struct {
Hosts []string
Where string //从哪儿开始读取
Zookeeper []string //zookeeper主机
Groupname string //分组名
Alltopic []string
}
func (k Kafka) String() string {
return fmt.Sprintf("zookeeper=%s, groupname=%s, where=%s, alltopic=%s",
strings.Join(k.Zookeeper, ","), k.Groupname, k.Where, strings.Join(k.Alltopic, ","))
}
//按组消费者基本信息
type KafkaConsumer struct {
Topic string
Kafka //kafka configure
GroupConsumer *consumergroup.ConsumerGroup
Msgs chan *sarama.ConsumerMessage //提供给外部程序使用
Exit chan bool
}
func (k Kafka) NewConsumer(topic string) (*KafkaConsumer, error) {
config := consumergroup.NewConfig()
config.Offsets.Initial = Convert(k.Where)
config.Offsets.ProcessingTimeout = 10 * time.Second
var zookeeperNodes []string
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(topic)
k.Alltopic = append(k.Alltopic, topic)
groupconsumer, err := consumergroup.JoinConsumerGroup(k.Groupname, zookeeperNodes, k.Zookeeper, config)
if err != nil {
return nil, err
}
msgchan := make(chan *sarama.ConsumerMessage, 30)
ret := &KafkaConsumer{Kafka: k, GroupConsumer: groupconsumer,
Msgs: msgchan, Exit: make(chan bool, 1), Topic: topic}
log.Printf("new a zookeeper consumer, info:%s, topic:%s, groupconsumer:%+v\n", k.String(), topic, ret)
return ret, nil
}
func (k *KafkaConsumer) Close() error {
if k.GroupConsumer != nil {
if err := k.GroupConsumer.Close(); err != nil {
log.Printf("stop a zookeeper consumer fail. err:%s hostinfo:%s\n",
err.Error(), k.String())
return err
} else {
log.Printf("stop a zookeeper consumer success, hostinfo:%s\n", k.String())
}
}
k.Exit <- true
return nil
}
func (k *KafkaConsumer) String() string {
return fmt.Sprintf("%s input_topic:%s, groupconsumer:%+v, msgchan=%+v", k.Kafka.String(),
k.Topic, k.GroupConsumer, k.Msgs)
}
func (k *KafkaConsumer) Dispatcher(mylog *Log) {
ticker := time.NewTicker(time.Second * 10)
log.Printf("topic:%s start a dispatcher\n", k.Topic)
for {
select {
case msg := <-k.GroupConsumer.Messages():
if msg != nil && msg.Value != nil && len(msg.Value) > 0 {
k.Msgs <- msg
k.GroupConsumer.CommitUpto(msg)
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, msg_len=%d, send to recevie chan",
k.Alltopic, msg.Topic, len(msg.Value))
} else {
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v,error format msg:%+v",
k.Alltopic, k.Topic, msg)
}
case err := <-k.GroupConsumer.Errors():
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, fetch msg err,%+v", k.Alltopic, k.Topic, err)
case <-k.Exit:
log.Printf("conumser_topic:%+v, msg_topic:%+v, dispatcher exit\n", k.Alltopic, k.Topic)
ticker.Stop()
return
case <-ticker.C:
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, dispatcher ticker 10s, Consumer:%+v, chan_len:%d",
k.Alltopic, k.Topic, k)
}
}
}
//转换方法
func Convert(where string) int64 {
var ret int64
switch where {
case "begin":
ret = sarama.OffsetOldest
case "now":
ret = sarama.OffsetNewest
default:
ret = sarama.OffsetNewest
}
return ret
}
mylogic.go file
import (
"bufio"
t "cf_build/thrid_party"
"github.com/go-ini/ini"
"os"
"strings"
)
type Conf struct {
Inter_topics []string
Inter_topics_new []string
Inter_compact_topics []string
Inter_qy_topics []string
Inter_update_topic string
China_topic string
China_update_topic string
China_crawl_topics []string
China_crawl_compact_topics []string
Islog_topic string
Iclog_topic string
}
type KafkaFlight struct {
Conf
Kafka t.Kafka
I []*t.KafkaConsumer
INew []*t.KafkaConsumer
ICompact []*t.KafkaConsumer
IQy []*t.KafkaConsumer
IUpdate []*t.KafkaConsumer
C []*t.KafkaConsumer
CUpdate []*t.KafkaConsumer
CCrawl []*t.KafkaConsumer
CCrawlCompact []*t.KafkaConsumer
Islog []*t.KafkaConsumer
Iclog []*t.KafkaConsumer
Log *t.Log
}
//初始化从kafka中读取特价机票信息模块
func (k *KafkaFlight) Init(inifile string, log *t.Log) error {
param, err := ini.Load(inifile)
if err != nil {
return err
}
k.Log = log
//读取国际航线数据
inter_topics_file := param.Section("kafka").Key("inter_topics").String()
var inter_topics []string
f, _ := os.Open(inter_topics_file)
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
topic := strings.TrimSpace(scanner.Text())
inter_topics = append(inter_topics, topic)
}
inter_topics_new_file := param.Section("kafka").Key("inter_topic_new").String()
var inter_topics_new []string
f_new, _ := os.Open(inter_topics_new_file)
defer f_new.Close()
scanner_new := bufio.NewScanner(f_new)
for scanner_new.Scan() {
topic := strings.TrimSpace(scanner_new.Text())
inter_topics_new = append(inter_topics_new, topic)
}
inter_compact_topics_file := param.Section("kafka").Key("inter_compact_topics").String()
var inter_compact_topics []string
f22, _ := os.Open(inter_compact_topics_file)
defer f22.Close()
scanner22 := bufio.NewScanner(f22)
for scanner22.Scan() {
topic := strings.TrimSpace(scanner22.Text())
inter_compact_topics = append(inter_compact_topics, topic)
}
inter_qy_topics_file := param.Section("kafka").Key("inter_qy_topic").String()
var inter_qy_topics []string
f_qy, _ := os.Open(inter_qy_topics_file)
defer f_qy.Close()
scanner_qy := bufio.NewScanner(f_qy)
for scanner_qy.Scan() {
topic := strings.TrimSpace(scanner_qy.Text())
inter_qy_topics = append(inter_qy_topics, topic)
}
inter_update_topic := param.Section("kafka").Key("inter_update_topic").String()
inter_update_topic = strings.TrimSpace(inter_update_topic)
//读取国内航线数据
china_crawl_topic_file := param.Section("kafka").Key("china_crawl_topic").String()
var china_crawl_topic []string
f1, _ := os.Open(china_crawl_topic_file)
defer f1.Close()
scanner1 := bufio.NewScanner(f1)
for scanner1.Scan() {
topic := strings.TrimSpace(scanner1.Text())
china_crawl_topic = append(china_crawl_topic, topic)
}
china_crawl_compact_topic_file := param.Section("kafka").Key("china_crawl_compact_topic").String()
var china_crawl_compact_topic []string
f11, _ := os.Open(china_crawl_compact_topic_file)
defer f11.Close()
scanner11 := bufio.NewScanner(f11)
for scanner11.Scan() {
topic := strings.TrimSpace(scanner11.Text())
china_crawl_compact_topic = append(china_crawl_compact_topic, topic)
}
china_topic := param.Section("kafka").Key("china_topic").String()
china_update_topic := param.Section("kafka").Key("china_update_topic").String()
china_topic = strings.TrimSpace(china_topic)
china_update_topic = strings.TrimSpace(china_update_topic)
//读取更新日志信息
islog_topic := param.Section("kafka").Key("islog_topic").String()
iclog_topic := param.Section("kafka").Key("iclog_topic").String()
islog_topic = strings.TrimSpace(islog_topic)
iclog_topic = strings.TrimSpace(iclog_topic)
//配置conf
k.Conf.Inter_topics = inter_topics
k.Conf.Inter_topics_new = inter_topics_new
k.Conf.Inter_compact_topics = inter_compact_topics
k.Conf.Inter_qy_topics = inter_qy_topics
k.Conf.Inter_update_topic = inter_update_topic
k.Conf.China_topic = china_topic
k.Conf.China_update_topic = china_update_topic
k.Conf.China_crawl_topics = china_crawl_topic
k.Conf.China_crawl_compact_topics = china_crawl_compact_topic
k.Conf.Islog_topic = islog_topic
k.Conf.Iclog_topic = iclog_topic
//读取kafka主机信息
kafka_hosts := param.Section("kafka").Key("hosts").String()
sli := strings.Split(kafka_hosts, ",")
for index, _ := range sli {
sli[index] = strings.TrimSpace(sli[index])
}
kafka_where := param.Section("kafka").Key("where").String()
kafka_zk := param.Section("kafka").Key("zookeepers").String()
zk_sli := strings.Split(kafka_zk, ",")
for index, _ := range zk_sli {
zk_sli[index] = strings.TrimSpace(zk_sli[index])
}
kafka_groupname := param.Section("kafka").Key("groupname").String()
k.Kafka = t.Kafka{Hosts: sli, Where: kafka_where, Zookeeper: zk_sli,
Groupname: kafka_groupname}
return nil
}
//启动模块
func (k *KafkaFlight) Start() error {
//启动分组消费
for _, v := range k.Conf.Inter_topics {
kafka := k.Kafka
if groupconsumer, err := kafka.NewConsumer(v); err == nil {
k.I = append(k.I, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
}
for _, v := range k.Conf.Inter_topics_new {
kafka := k.Kafka
if groupconsumer, err := kafka.NewConsumer(v); err == nil {
k.INew = append(k.INew, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
}
for _, v := range k.Conf.Inter_compact_topics {
kafka := k.Kafka
if groupconsumer, err := kafka.NewConsumer(v); err == nil {
k.ICompact = append(k.ICompact, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
}
for _, v := range k.Conf.Inter_qy_topics {
kafka := k.Kafka
if groupconsumer, err := kafka.NewConsumer(v); err == nil {
k.IQy = append(k.IQy, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
}
kafka1 := k.Kafka
if groupconsumer, err := kafka1.NewConsumer(k.Conf.Inter_update_topic); err == nil {
k.IUpdate = append(k.IUpdate, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
kafka2 := k.Kafka
if groupconsumer, err := kafka2.NewConsumer(k.Conf.China_topic); err == nil {
k.C = append(k.C, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
kafka3 := k.Kafka
if groupconsumer, err := kafka3.NewConsumer(k.Conf.China_update_topic); err == nil {
k.CUpdate = append(k.CUpdate, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
for _, v := range k.Conf.China_crawl_topics {
kafka := k.Kafka
if groupconsumer, err := kafka.NewConsumer(v); err == nil {
k.CCrawl = append(k.CCrawl, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
}
for _, v := range k.Conf.China_crawl_compact_topics {
kafka := k.Kafka
if groupconsumer, err := kafka.NewConsumer(v); err == nil {
k.CCrawlCompact = append(k.CCrawlCompact, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
}
kafka4 := k.Kafka
if groupconsumer, err := kafka4.NewConsumer(k.Conf.Islog_topic); err == nil {
k.Islog = append(k.Islog, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
kafka5 := k.Kafka
if groupconsumer, err := kafka5.NewConsumer(k.Conf.Iclog_topic); err == nil {
k.Iclog = append(k.Iclog, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
return nil
}
//停止模块
func (k *KafkaFlight) Stop() {
//停止topic消费
for _, v := range k.I {
v.Close()
}
for _, v := range k.INew {
v.Close()
}
for _, v := range k.ICompact {
v.Close()
}
for _, v := range k.IQy {
v.Close()
}
for _, v := range k.IUpdate {
v.Close()
}
for _, v := range k.C {
v.Close()
}
for _, v := range k.CUpdate {
v.Close()
}
for _, v := range k.CCrawl {
v.Close()
}
for _, v := range k.CCrawlCompact {
v.Close()
}
for _, v := range k.Islog {
v.Close()
}
for _, v := range k.Iclog {
v.Close()
}
}
when my program start, i start KafkaFlight; and stop KafkaFlight when my program stop.
i need to fetch data from kafka continuely, when producer produce data to kafka. the Message() close and returen nil. how can this suit for me ? thanks.
best wishes.
from kafka.
Related Issues (20)
- zk: node does not exist HOT 1
- Is There Any Benchmark for the Consumer? HOT 1
- Lots of logs such as "abandoned subscription to xxx because consuming was taking too long" HOT 1
- Bug? Consuming stops or hangs at certain time.
- How to exit the program in kafka/examples/consumergroup/main.go gracefully? HOT 1
- Clarify Kafka version support HOT 3
- consumption message delay
- consumergroup not work in docker HOT 1
- Consumer group not consuming from partitions -- Kafka v11
- Some problems when I try to run my consumer in two node. HOT 3
- Trouble reliably reusing consumer group names across topics HOT 1
- Upgrading to kafka based offset storage HOT 1
- Kafka consumer not restarting after broker failure
- Leader change during re-balance may cause partition not being consumed HOT 4
- Can't receive messages when i test the demo which the project gives. HOT 4
- ConsumerGroup.FlushOffsets() HOT 3
- Version releases? HOT 2
- which tag or version support kafka 0.8 ?
- Is there any way to get to the ConsumerGroup. Instance. ID?
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka.