Giter VIP home page Giter VIP logo

Comments (3)

wvanbergen avatar wvanbergen commented on May 8, 2024

Once you close the consumer, the Messages() and Errors() channels will be closed, which means they will start returning nil values continuously. You should break your loop when that happens.

from kafka.

orange-jacky avatar orange-jacky commented on May 8, 2024

kafka has data without dealing, i break my loop, than i can't fetch data without deal, this don't suit for the act.

from kafka.

orange-jacky avatar orange-jacky commented on May 8, 2024

# a.go file

import (
"fmt"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
"github.com/wvanbergen/kazoo-go"
"log"
"strings"
"time"
)

//kafka基本信息
type Kafka struct {
Hosts []string
Where string //从哪儿开始读取
Zookeeper []string //zookeeper主机
Groupname string //分组名
Alltopic []string
}

func (k Kafka) String() string {
return fmt.Sprintf("zookeeper=%s, groupname=%s, where=%s, alltopic=%s",
strings.Join(k.Zookeeper, ","), k.Groupname, k.Where, strings.Join(k.Alltopic, ","))
}

//按组消费者基本信息
type KafkaConsumer struct {
Topic string
Kafka //kafka configure
GroupConsumer *consumergroup.ConsumerGroup
Msgs chan *sarama.ConsumerMessage //提供给外部程序使用
Exit chan bool
}

func (k Kafka) NewConsumer(topic string) (*KafkaConsumer, error) {

config := consumergroup.NewConfig()
config.Offsets.Initial = Convert(k.Where)
config.Offsets.ProcessingTimeout = 10 * time.Second
var zookeeperNodes []string
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(topic)
k.Alltopic = append(k.Alltopic, topic)

groupconsumer, err := consumergroup.JoinConsumerGroup(k.Groupname, zookeeperNodes, k.Zookeeper, config)
if err != nil {
	return nil, err
}

msgchan := make(chan *sarama.ConsumerMessage, 30)

ret := &KafkaConsumer{Kafka: k, GroupConsumer: groupconsumer,
	Msgs: msgchan, Exit: make(chan bool, 1), Topic: topic}
log.Printf("new a zookeeper consumer, info:%s, topic:%s, groupconsumer:%+v\n", k.String(), topic, ret)
return ret, nil

}

func (k *KafkaConsumer) Close() error {
if k.GroupConsumer != nil {
if err := k.GroupConsumer.Close(); err != nil {
log.Printf("stop a zookeeper consumer fail. err:%s hostinfo:%s\n",
err.Error(), k.String())
return err
} else {
log.Printf("stop a zookeeper consumer success, hostinfo:%s\n", k.String())
}
}
k.Exit <- true
return nil
}

func (k *KafkaConsumer) String() string {
return fmt.Sprintf("%s input_topic:%s, groupconsumer:%+v, msgchan=%+v", k.Kafka.String(),
k.Topic, k.GroupConsumer, k.Msgs)
}

func (k *KafkaConsumer) Dispatcher(mylog *Log) {
ticker := time.NewTicker(time.Second * 10)
log.Printf("topic:%s start a dispatcher\n", k.Topic)
for {
select {
case msg := <-k.GroupConsumer.Messages():
if msg != nil && msg.Value != nil && len(msg.Value) > 0 {
k.Msgs <- msg
k.GroupConsumer.CommitUpto(msg)
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, msg_len=%d, send to recevie chan",
k.Alltopic, msg.Topic, len(msg.Value))
} else {
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v,error format msg:%+v",
k.Alltopic, k.Topic, msg)
}
case err := <-k.GroupConsumer.Errors():
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, fetch msg err,%+v", k.Alltopic, k.Topic, err)
case <-k.Exit:
log.Printf("conumser_topic:%+v, msg_topic:%+v, dispatcher exit\n", k.Alltopic, k.Topic)
ticker.Stop()
return
case <-ticker.C:
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, dispatcher ticker 10s, Consumer:%+v, chan_len:%d",
k.Alltopic, k.Topic, k)
}
}
}

//转换方法
func Convert(where string) int64 {
var ret int64
switch where {
case "begin":
ret = sarama.OffsetOldest
case "now":
ret = sarama.OffsetNewest
default:
ret = sarama.OffsetNewest
}
return ret
}

mylogic.go file

import (
"bufio"
t "cf_build/thrid_party"
"github.com/go-ini/ini"
"os"
"strings"
)

type Conf struct {
Inter_topics []string
Inter_topics_new []string
Inter_compact_topics []string
Inter_qy_topics []string
Inter_update_topic string

China_topic                string
China_update_topic         string
China_crawl_topics         []string
China_crawl_compact_topics []string

Islog_topic string
Iclog_topic string

}

type KafkaFlight struct {
Conf
Kafka t.Kafka
I []*t.KafkaConsumer
INew []*t.KafkaConsumer
ICompact []*t.KafkaConsumer
IQy []*t.KafkaConsumer
IUpdate []*t.KafkaConsumer
C []*t.KafkaConsumer
CUpdate []*t.KafkaConsumer
CCrawl []*t.KafkaConsumer
CCrawlCompact []*t.KafkaConsumer
Islog []*t.KafkaConsumer
Iclog []*t.KafkaConsumer
Log *t.Log
}

//初始化从kafka中读取特价机票信息模块
func (k *KafkaFlight) Init(inifile string, log *t.Log) error {
param, err := ini.Load(inifile)
if err != nil {
return err
}

k.Log = log

//读取国际航线数据
inter_topics_file := param.Section("kafka").Key("inter_topics").String()
var inter_topics []string
f, _ := os.Open(inter_topics_file)
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
	topic := strings.TrimSpace(scanner.Text())
	inter_topics = append(inter_topics, topic)
}

inter_topics_new_file := param.Section("kafka").Key("inter_topic_new").String()
var inter_topics_new []string
f_new, _ := os.Open(inter_topics_new_file)
defer f_new.Close()
scanner_new := bufio.NewScanner(f_new)
for scanner_new.Scan() {
	topic := strings.TrimSpace(scanner_new.Text())
	inter_topics_new = append(inter_topics_new, topic)
}

inter_compact_topics_file := param.Section("kafka").Key("inter_compact_topics").String()
var inter_compact_topics []string
f22, _ := os.Open(inter_compact_topics_file)
defer f22.Close()
scanner22 := bufio.NewScanner(f22)
for scanner22.Scan() {
	topic := strings.TrimSpace(scanner22.Text())
	inter_compact_topics = append(inter_compact_topics, topic)
}

inter_qy_topics_file := param.Section("kafka").Key("inter_qy_topic").String()
var inter_qy_topics []string
f_qy, _ := os.Open(inter_qy_topics_file)
defer f_qy.Close()
scanner_qy := bufio.NewScanner(f_qy)
for scanner_qy.Scan() {
	topic := strings.TrimSpace(scanner_qy.Text())
	inter_qy_topics = append(inter_qy_topics, topic)
}

inter_update_topic := param.Section("kafka").Key("inter_update_topic").String()
inter_update_topic = strings.TrimSpace(inter_update_topic)

//读取国内航线数据
china_crawl_topic_file := param.Section("kafka").Key("china_crawl_topic").String()
var china_crawl_topic []string
f1, _ := os.Open(china_crawl_topic_file)
defer f1.Close()
scanner1 := bufio.NewScanner(f1)
for scanner1.Scan() {
	topic := strings.TrimSpace(scanner1.Text())
	china_crawl_topic = append(china_crawl_topic, topic)
}

china_crawl_compact_topic_file := param.Section("kafka").Key("china_crawl_compact_topic").String()
var china_crawl_compact_topic []string
f11, _ := os.Open(china_crawl_compact_topic_file)
defer f11.Close()
scanner11 := bufio.NewScanner(f11)
for scanner11.Scan() {
	topic := strings.TrimSpace(scanner11.Text())
	china_crawl_compact_topic = append(china_crawl_compact_topic, topic)
}
china_topic := param.Section("kafka").Key("china_topic").String()
china_update_topic := param.Section("kafka").Key("china_update_topic").String()
china_topic = strings.TrimSpace(china_topic)
china_update_topic = strings.TrimSpace(china_update_topic)

//读取更新日志信息
islog_topic := param.Section("kafka").Key("islog_topic").String()
iclog_topic := param.Section("kafka").Key("iclog_topic").String()
islog_topic = strings.TrimSpace(islog_topic)
iclog_topic = strings.TrimSpace(iclog_topic)

//配置conf
k.Conf.Inter_topics = inter_topics
k.Conf.Inter_topics_new = inter_topics_new
k.Conf.Inter_compact_topics = inter_compact_topics
k.Conf.Inter_qy_topics = inter_qy_topics
k.Conf.Inter_update_topic = inter_update_topic
k.Conf.China_topic = china_topic
k.Conf.China_update_topic = china_update_topic
k.Conf.China_crawl_topics = china_crawl_topic
k.Conf.China_crawl_compact_topics = china_crawl_compact_topic
k.Conf.Islog_topic = islog_topic
k.Conf.Iclog_topic = iclog_topic

//读取kafka主机信息
kafka_hosts := param.Section("kafka").Key("hosts").String()
sli := strings.Split(kafka_hosts, ",")
for index, _ := range sli {
	sli[index] = strings.TrimSpace(sli[index])
}
kafka_where := param.Section("kafka").Key("where").String()

kafka_zk := param.Section("kafka").Key("zookeepers").String()
zk_sli := strings.Split(kafka_zk, ",")
for index, _ := range zk_sli {
	zk_sli[index] = strings.TrimSpace(zk_sli[index])
}
kafka_groupname := param.Section("kafka").Key("groupname").String()

k.Kafka = t.Kafka{Hosts: sli, Where: kafka_where, Zookeeper: zk_sli,
	Groupname: kafka_groupname}

return nil

}

//启动模块
func (k *KafkaFlight) Start() error {
//启动分组消费
for _, v := range k.Conf.Inter_topics {
kafka := k.Kafka
if groupconsumer, err := kafka.NewConsumer(v); err == nil {
k.I = append(k.I, groupconsumer)
go groupconsumer.Dispatcher(k.Log)
}
}

for _, v := range k.Conf.Inter_topics_new {
	kafka := k.Kafka
	if groupconsumer, err := kafka.NewConsumer(v); err == nil {
		k.INew = append(k.INew, groupconsumer)
		go groupconsumer.Dispatcher(k.Log)
	}
}

for _, v := range k.Conf.Inter_compact_topics {
	kafka := k.Kafka
	if groupconsumer, err := kafka.NewConsumer(v); err == nil {
		k.ICompact = append(k.ICompact, groupconsumer)
		go groupconsumer.Dispatcher(k.Log)
	}
}
for _, v := range k.Conf.Inter_qy_topics {
	kafka := k.Kafka
	if groupconsumer, err := kafka.NewConsumer(v); err == nil {
		k.IQy = append(k.IQy, groupconsumer)
		go groupconsumer.Dispatcher(k.Log)
	}
}

kafka1 := k.Kafka
if groupconsumer, err := kafka1.NewConsumer(k.Conf.Inter_update_topic); err == nil {
	k.IUpdate = append(k.IUpdate, groupconsumer)
	go groupconsumer.Dispatcher(k.Log)
}

kafka2 := k.Kafka
if groupconsumer, err := kafka2.NewConsumer(k.Conf.China_topic); err == nil {
	k.C = append(k.C, groupconsumer)
	go groupconsumer.Dispatcher(k.Log)
}

kafka3 := k.Kafka
if groupconsumer, err := kafka3.NewConsumer(k.Conf.China_update_topic); err == nil {
	k.CUpdate = append(k.CUpdate, groupconsumer)
	go groupconsumer.Dispatcher(k.Log)
}

for _, v := range k.Conf.China_crawl_topics {
	kafka := k.Kafka
	if groupconsumer, err := kafka.NewConsumer(v); err == nil {
		k.CCrawl = append(k.CCrawl, groupconsumer)
		go groupconsumer.Dispatcher(k.Log)
	}
}

for _, v := range k.Conf.China_crawl_compact_topics {
	kafka := k.Kafka
	if groupconsumer, err := kafka.NewConsumer(v); err == nil {
		k.CCrawlCompact = append(k.CCrawlCompact, groupconsumer)
		go groupconsumer.Dispatcher(k.Log)
	}
}
kafka4 := k.Kafka
if groupconsumer, err := kafka4.NewConsumer(k.Conf.Islog_topic); err == nil {
	k.Islog = append(k.Islog, groupconsumer)
	go groupconsumer.Dispatcher(k.Log)
}

kafka5 := k.Kafka
if groupconsumer, err := kafka5.NewConsumer(k.Conf.Iclog_topic); err == nil {
	k.Iclog = append(k.Iclog, groupconsumer)
	go groupconsumer.Dispatcher(k.Log)
}
return nil

}

//停止模块
func (k *KafkaFlight) Stop() {
//停止topic消费
for _, v := range k.I {
v.Close()
}
for _, v := range k.INew {
v.Close()
}
for _, v := range k.ICompact {
v.Close()
}
for _, v := range k.IQy {
v.Close()
}
for _, v := range k.IUpdate {
v.Close()
}
for _, v := range k.C {
v.Close()
}
for _, v := range k.CUpdate {
v.Close()
}
for _, v := range k.CCrawl {
v.Close()
}
for _, v := range k.CCrawlCompact {
v.Close()
}
for _, v := range k.Islog {
v.Close()
}
for _, v := range k.Iclog {
v.Close()
}
}

when my program start, i start KafkaFlight; and stop KafkaFlight when my program stop.
i need to fetch data from kafka continuely, when producer produce data to kafka. the Message() close and returen nil. how can this suit for me ? thanks.
best wishes.

from kafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.