Giter VIP home page Giter VIP logo

sarama-cluster's Introduction

Sarama Cluster

GoDoc Build Status Go Report Card License

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

DEPRECATION NOTICE

Please note that since IBM/sarama#1099 was merged and released (>= v1.19.0) this library is officially deprecated. The native implementation supports a variety of use cases that are not available through this library.

Documentation

Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster

Examples

Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple topics and partitions are all passed to the single channel:

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume errors
	go func() {
		for err := range consumer.Errors() {
			log.Printf("Error: %s\n", err.Error())
		}
	}()

	// consume notifications
	go func() {
		for ntf := range consumer.Notifications() {
			log.Printf("Rebalanced: %+v\n", ntf)
		}
	}()

	// consume messages, watch signals
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "")	// mark message as processed
			}
		case <-signals:
			return
		}
	}
}

Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level consumers:

package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"

  cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, set mode to ConsumerModePartitions
	config := cluster.NewConfig()
	config.Group.Mode = cluster.ConsumerModePartitions

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume partitions
	for {
		select {
		case part, ok := <-consumer.Partitions():
			if !ok {
				return
			}

			// start a separate goroutine to consume messages
			go func(pc cluster.PartitionConsumer) {
				for msg := range pc.Messages() {
					fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
					consumer.MarkOffset(msg, "")	// mark message as processed
				}
			}(part)
		case <-signals:
			return
		}
	}
}

Running tests

You need to install Ginkgo & Gomega to run tests. Please see http://onsi.github.io/ginkgo for more details.

To run tests, call:

$ make test

Troubleshooting

Consumer not receiving any messages?

By default, sarama's Config.Consumer.Offsets.Initial is set to sarama.OffsetNewest. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.

If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set Config.Consumer.Offsets.Initial to sarama.OffsetOldest.

sarama-cluster's People

Contributors

angrz avatar arnaudbriche avatar blockchainian avatar boyand avatar codelingoteam avatar d33d33 avatar daichirata avatar dim avatar elakito avatar f21 avatar georgeteo avatar ggirtsou avatar imjustfly avatar jgeiger avatar jriecken avatar nak3 avatar pmalekn avatar relud avatar stevevls avatar thehydroimpulse avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sarama-cluster's Issues

mission statement / introduction / readme should be clearer

the readme says

Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.

This seems rather vague. I think I can run a cluster without sarama-cluster, with straight sarama just fine. We can also consume topics from multiple nodes using sarama. So is it all about the "balanced" part? What does this mean exactly, balanced? I can run a kafka cluster and use plain sarama consumers that are well balanced. So what does sarama-cluster do? I'm guessing it has to do with automatically managing something that otherwise has to be managed manually. But what exactly? What are some scenarios where this library becomes useful? When should we use it, and when not?

Then in #41 it is said

The goal of sarama-cluster is to safely consume multiple topics/partitions via a cluster of concurrent consumers, forming a shared group.

Safe in what way? What is otherwise unsafe and how does sarama-cluster make it safe? What is a shared group, and who is the group being shared with?

Does sarama-cluster make certain assumptions about the intended use? Like perhaps "the same message should not be processed by multiple consumers" ? Any such assumptions should be clarified.

thank you.

Tried to use sarama-cluster library to write sample consumer code ... but during go build my code I am getting error from sarama-cluster library ...

I'm trying to write a simple consumer code using sarama-cluster library ....
But during go build of my code ... I'm getting bellow error from your library ... ?

Already sarama library is also there ....

github.com/bsm/sarama-cluster

github.com/bsm/sarama-cluster/balancer.go:93: undefined: sarama.ConsumerGroupMemberMetadata
github.com/bsm/sarama-cluster/config.go:82: c.Config.Consumer.Offsets undefined (type struct { Retry struct { Backoff time.Duration }; Fetch struct { Min int32; Default int32; Max int32 }; MaxWaitTime time.Duration; Return struct { Errors bool } } has no field or method Offsets)
github.com/bsm/sarama-cluster/consumer.go:270: undefined: sarama.ErrRebalanceInProgress
github.com/bsm/sarama-cluster/consumer.go:286: c.client.config.Config.Consumer.Offsets undefined (type struct { Retry struct { Backoff time.Duration }; Fetch struct { Min int32; Default int32; Max int32 }; MaxWaitTime time.Duration; Return struct { Errors bool } } has no field or method Offsets)
github.com/bsm/sarama-cluster/consumer.go:307: undefined: sarama.ErrRebalanceInProgress
github.com/bsm/sarama-cluster/consumer.go:334: c.client.config.Config.Consumer.MaxProcessingTime undefined (type struct { Retry struct { Backoff time.Duration }; Fetch struct { Min int32; Default int32; Max int32 }; MaxWaitTime time.Duration; Return struct { Errors bool } } has no field or method MaxProcessingTime)
github.com/bsm/sarama-cluster/consumer.go:356: broker.Heartbeat undefined (type *sarama.Broker has no field or method Heartbeat)
github.com/bsm/sarama-cluster/consumer.go:356: undefined: sarama.HeartbeatRequest
github.com/bsm/sarama-cluster/consumer.go:383: undefined: sarama.ErrUnknownMemberId
github.com/bsm/sarama-cluster/consumer.go:394: undefined: sarama.ErrRebalanceInProgress

github.com/bsm/sarama-cluster/consumer.go:394: too many errors

Issue #66: Unable to listen to messages with cluster.Consumer - still having the same problem

Dimitrij, thanks for looking into the issue #66 I posted last Friday. Unfortunately, your changes haven't fixed the problem I am experiencing. I am still not able to receive any published messages. I am on commit 659b67f.

Here is an interesting fact. If I change line 21 in partitioins.go from

pcm, err := manager.ConsumePartition(topic, partition, info.NextOffset(defaultOffset))

to

pcm, err := manager.ConsumePartition(topic, partition, sarama.OffsetOldest)

I will then be able to see all messages that I've ever published. This is obviously not the desired behavior, but you see my point...

The other interesting thing is that if I use OffsetNewest on line 21 (which makes more sense that OffsetOldest), then I see NO messages at all.

Thanks again for your help.

Multiple topics

I have a single app listening to two different topics. What I'm seeing is that both topics start processing, but one stop after a few messages and never picks up again. If I run two separate copies of my app, each pointing at one topic or the other, then processing occurs normally.

I've tried sharing the same sarama client and using two different ones, but I still see the conflict. Also tried messing with GOMAXPROCS.

I'm not sure if it's something in sarama-cluster or sarama itself. I'll investigate deeper into the problem and see if I can uncover something.

Multiple topics cause rebalancing conflict

In Zookeeper under /consumers/, there are three nodes: ids, offsets, and owners. Both offsets and owners are subdivided by topic; ids are not. Because of this, when you listen to two different topics, the rebalancing routines think there are 2 consumers (two unique "ids" are generated), and only listens to half the topic partitions.

To reproduce:

  • Create two topics in Kafka, each with two partitions.
  • Create a consumer for each topic in your Go app.
  • When the app starts, you'll see it grab all the partitions, then rebalance and grab half the partitions for each topic.

Partition offsets out of range fails rebalancing

I've found that if my client hasn't run in a while and the partition index in ZK falls outside the valid range for Kafka, I get an error. This was fixed, but I think changes to Shopify/sarama have broken this. It looks like when you call consumer.ConsumePartition, the index is checked, fails, and an error is returned from the claim function. However, the Zookeeper node for the partition has by then been claimed, so when sarama-cluster tries to adjust the index and try again, the node is already claimed, fails, and falls into a loop.

Feature Request: Expose Consumer Group Member UserData

For Example, Add Config.Group.Member.UserData []byte and propagate with JoinGroup requests.

The Kafka protocol appears to allow consumer group members to provide 'user data' for partition assignment balancing protocols. This user- data is shared with the balancer, which is selected from one of the members of the consumer group. The balancer then allocates partitions to the group members, possibly based on the user-data.

Even though the current sarama-cluster balancer implementation does not act on user-data it provides value to external tools: E.g. external routing of a web request to a specific partition-owner in consumer group (if the user data contained http host:port info).

External tools could extract this info from the broker without joining the group by something like:

client, err := scluster.NewClient(...)
broker, err := client.Coordinator(groupName)
req := &sarama.DescribeGroupsRequest{Groups: []string{groupName}}
resp, err := broker.DescribeGroups(req)
groupDesc := resp.Groups[0]

Not sure how one would watch for changes to the partition assignments without being part of the consumer group though. In a reasonably stable environment this may not be too bad so long as there is a fall back for routing (e.g. a redirect).

panics if new consumers are created in go routines and close is called

If we create a 2 consumers in 2 go routines and both are using same GroupID and we call close for both them aysnc it triggers a panic in sarama-cluster.

this is what i analyzed so far.

In paritions.go in the function
func (c *partitionConsumer) Loop(messages chan<- *sarama.ConsumerMessage, errors chan<- error) {

we are passing the channels messages and errors which are of the base consumer to partition consumer. And as soon as we get messages from partitions we are adding them to channel of base consumer. But for some reason if the consumer is closed those channels are no longer valid and the partition consumer still gets any error or message it is going to add to a closed channel which panics.

How i got rid of it is by sending the dying channel as below to loop and select on a closed channel. that way i am not processing messages or errors if the main consumer died. This solved the panic . let me know what do you think of this.
`
func (c *partitionConsumer) Loop(messages chan<- *sarama.ConsumerMessage, errors chan<- error, dying <-chan none) {

op := make(chan none)
close(op)

for {

    select {
    case _, ok := <-dying:
        if !ok {                
            return
        }
    case <-op:
        select {
        case msg := <-c.pcm.Messages():
            if msg != nil {
                select {
                case messages <- msg:
                case <-c.dying:
                    close(c.dead)
                    return
                }
            }
        case err := <-c.pcm.Errors():
            if err != nil {

                select {
                case <-c.dying:
                    close(c.dead)
                    return
                case errors <- err:

                }
            }
        case <-c.dying:
            close(c.dead)
            return
        }

    }

}

}
`

panic(0x4ed3a0, 0xc8204c7ab0) /usr/local/Cellar/go/1.6.2/libexec/src/runtime/panic.go:481 +0x3e6 github.com/bsm/sarama-cluster.(*partitionConsumer).Loop(0xc820376410, 0xc820017380, 0xc820017320, 0xc820016e40) github.com/bsm/sarama-cluster/partitions.go:59 +0x419 github.com/bsm/sarama-cluster.(*Consumer).createConsumer github.com/bsm/sarama-cluster/consumer.go:634 +0x41c

we should make it easier to view errors by default

the way Consumer.handleError currently works seems like a suboptimal default:

c.client.config.Consumer.Return.Errors defaults to false, so we call sarama.Logger.Println, which defaults to io.Discard.

this means that any error is silently dropped by default, and this is non-obvious.

Hardcoded zookeeper read timeout

Hi,

I'm having trouble with sarama-cluster, mainly in dev environment where zookeeper clusters consist of a single instance with quite slow hard drive, I'm seeing a lot of the following log message (which I think originate from the underlying zookeeper client):

read tcp 127.0.0.1:2181: i/o timeout

I dont' really know for sure if it's related, but I saw you hardcoded a recv timeout of 1 second in NewConsumer; maybe extending ConsumerConfig with a ZookeeperReadTimeout field would prevent sarama-cluster from messing with slow zk servers ?

Should Consumer.messages be flushed after rebalance?

(Applies to the v2 branch)

I imagine a problem where a rebalance occurs and the partitions that consumer A was assigned are now given to consumer B, but consumer A's Consumer.messages channel still has some messages queued from before the rebalance.

I was thinking it may be possible to flush in the app business logic given there is a rebalance notification, but by the time that is delivered, the consumer loop logic has started executing and pushing messages from the post-rebalance partitions onto this same channel.

Thoughts? I understand that this library can't achieve exactly-once delivery semantics on its own, but fixing this could reduce the window of potentially processing a message multiple times.

MarkPartitionOffset has no effect

Hi @bsm
In my program ,we want read messages on partition0 from the position:offset0,but there is no effect below:
MarkPartitionOffset(my_test_topic,partition0,offset0,"")

Or is there a way to achieve such requirements๏ผŸ

Looking forward to your reply,thanks.

rebalancing is not happening when some threads go down

Hi,

I noticed that when I increase the consumer threads from 1 to 2 reading 1 topic with 4 partitions it is able to rebalance, but when I go from 2 to 1 thread then the running thread is not getting assigned the partitions which were assigned to the shut down thread, is it the limitation of this library or am I missing something?

Messages duplicated when consumers > partitions

Start 3 consumers on a topic with 2 partitions. It is expected that 2 consumers will receive messages from a single partition while the other is idle. The current result is that the 3rd consumer will get duplicate (as in already received by other consumers) messages from both partitions

Single client w/ lot of consumers isn't stable

Hello,
I tried using a single Client for a lot (~120) of consumers (NewConsumerFromClient), each consuming a different topic of 8 partitions. Each consumer was alone in its consumer group (no other instance in the consumer group yet).
The consumers were all created during the same 4s window.

The consumers kept rebalancing one after another and it didn't seem to stabilize, even after waiting for 15minutes.
Sometimes i could see log such as:
"kafka server: The provided member is not known in the current generation."

I tried increasing the Config.Net.MaxOpenRequests to 200 with no success.

My other configs are:
Config.Net.ReadTimeout = 60 * time.Second
Config.Group.Session.Timeout = 25 * time.Second
Config.Group.Heartbeat.Interval = 6 * time.Second

Is that-many consumers for a single client just "too much" or should this works and you need more information?

BR

Option for not consuming the same message more than once

Hello guys,

I work at a company (Top Free Games) and we are currently using sarama on one of our services. We are currently in process of migrating from Kafka 0.8.2 to Kafka 0.9.0, so we will be able to use the new Kafka-based offset tracking and consumer-group rebalancing.

We noticed that in a very particular case, when the service crashes, there's a possibility that we consume the same message more than once, since the message`s offset is committed periodically only. However, as we deal with end users, we can not afford to duplica messages as that would affect in a really bad way our user's experience.

We could store the offsets on another database, but that would add another layer and possibly another point of failure to watch for, so that doesn't seem like a good option for us.

We came up with another solution: Store a batch of messages (e.g: 10k messages) at once, commit them right away and only then start using it as we have confirmation that they were committed. This sounds like a perfect solution for us, but we noticed that you don't export the method 'commitOffsets' that is responsible for committing the messages.

So my question is, would exporting this method be a solution or adding the whole functionality (consuming a batch of messages) would be more reasonable?

Cannot consume twice

If I have two services consuming the same topic with the same consumer group, I get the following error:

[sarama] 2016/06/03 18:14:04 Connected to broker at 192.168.0.100:9092 (unregistered)
[sarama] 2016/06/03 18:14:04 client/brokers registered new broker #1001 at 192.168.0.100:9092
[sarama] 2016/06/03 18:14:04 Successfully initialized new client
ERRO[0001] The consumer returned the following error: read tcp 192.168.0.100:62113->192.168.0.100:9092: i/o timeout 
[sarama] 2016/06/03 18:14:05 cluster/consumer  rebalance
[sarama] 2016/06/03 18:14:05 client/coordinator requesting coordinator for consumergoup nx-eptica from 192.168.0.100:9092
[sarama] 2016/06/03 18:14:05 client/coordinator coordinator for consumergoup nx-eptica is #1001 (192.168.0.100:9092)

Any idea what's wrong?

panic: close of closed channel

Yesterday we upgraded to Kafka 0.9 and the latest Sarama library. We are using sarama-cluster to distribute/rebalance our four kafka running instances. This morning, two of our four daemon instances crashed with "panic: close of closed channel". I haven't been able to reproduce it yet but here is what I see in the logs:

[Sarama] 2016/01/22 07:47:29 client/metadata fetching metadata for all topics from broker localhost:9092
[Sarama] 2016/01/22 07:49:03 cluster/consumer -a9899ac7-1b49-40cf-848d-bb843b7231d7 rebalance
2016/01/22 07:49:03 kafka server: The provided member is not known in the current generation.
[Sarama] 2016/01/22 07:49:03 client/coordinator requesting coordinator for consumergoup backend from localhost:9092
[Sarama] 2016/01/22 07:49:03 client/coordinator coordinator for consumergoup backend is #0 (services.veramine.com:9092)
[Sarama] 2016/01/22 07:49:03 consumer/broker/2 disconnecting due to error processing FetchRequest: read tcp 192.168.10.4:54970->192.168.10.4:9094: i/o timeout
[Sarama] 2016/01/22 07:49:03 Closed connection to broker services.veramine.com:9094
2016/01/22 07:49:03 kafka: 1 errors while consuming
[Sarama] 2016/01/22 07:49:03 cluster/consumer -a9899ac7-1b49-40cf-848d-bb843b7231d7 rebalance
[Sarama] 2016/01/22 07:49:03 client/coordinator requesting coordinator for consumergoup backend from localhost:9092
[Sarama] 2016/01/22 07:49:03 client/coordinator coordinator for consumergoup backend is #0 (services.veramine.com:9092)
panic: close of closed channel

The close came from gopkg.in/bsm/sarama-cluster%2ev2.(*partitionConsumer).Close(0xcb3d2d8d20, 0x0, 0x0)

Here's the code there:

func (c *partitionConsumer) Close() (err error) {
close(c.dying)
<-c.dead

    if e := c.pcm.Close(); e != nil {
            err = e
    }
    return

}

Is it possible to first check the partitionConsumer to see if it is already closed before closing, or do you think something needs to be done on sarama side to avoid a panic? I figured that even if a kafka instance died, eventually a rebalance would accomodate that so I'd rather not panic. Thanks!

Jonathan

I tried to use the sarama-cluster library, getting not supported version

  1. New sarama library is updated
  2. Pull the new code of sarama-cluster

Below is the code which use sarama-cluster library

package main

import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"

)

var (
groupID = flag.String("group", "", "REQUIRED: The shared consumer group name")
brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
topicList = flag.String("topics", "", "REQUIRED: The comma separated list of topics to consume")
offset = flag.String("offset", "newest", "The offset to start with. Can be oldest, newest")
verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")

logger = log.New(os.Stderr, "", log.LstdFlags)

)

func main() {
flag.Parse()

if *groupID == "" {
    printUsageErrorAndExit("You have to provide a -group name.")
} else if *brokerList == "" {
    printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
} else if *topicList == "" {
    printUsageErrorAndExit("You have to provide -topics as a comma-separated list.")
}

// Init config
config := cluster.NewConfig()
if *verbose {
    sarama.Logger = logger
} else {
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true
}

switch *offset {
case "oldest":
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
case "newest":
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
default:
    printUsageErrorAndExit("-offset should be `oldest` or `newest`")
}

// Init consumer, consume errors & messages
consumer, err := cluster.NewConsumer(strings.Split(*brokerList, ","), *groupID, strings.Split(*topicList, ","), config)
if err != nil {
    printErrorAndExit(69, "Failed to start consumer: %s", err)
}

go func() {
    for err := range consumer.Errors() {
        logger.Printf("Error: %s\n", err.Error())
    }
}()

go func() {
    for note := range consumer.Notifications() {
        logger.Printf("Rebalanced: %+v\n", note)
    }
}()

go func() {
    for msg := range consumer.Messages() {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
        consumer.MarkOffset(msg, "")
    }
}()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
<-wait

if err := consumer.Close(); err != nil {
    logger.Println("Failed to close consumer: ", err)
}

}

func printErrorAndExit(code int, format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
os.Exit(code)
}

func printUsageErrorAndExit(format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Available command line options:")
flag.PrintDefaults()
os.Exit(64)

}

I'm getting below error ---
2016/07/20 19:48:40 Error: kafka server: The version of API is not supported.
2016/07/20 19:48:40 Rebalanced: &{Claimed:map[] Released:map[] Current:map[]}

Heartbeat should be started before partition consumer creation

When a topic has a larger number of partitions and/or is present on a remote server, the creation of each consumer per partition (https://github.com/bsm/sarama-cluster/blob/master/consumer.go#L298) might take longer then the max session timeout (30 seconds). This causes a nasty bug where by the time the first heartbeat is sent after having completed the group sync, Kafka already removed the member from the group, causing another rebalance. This loops infinitely unless by some chance the partition consumer creation takes less than the configured session timeout.

While we have solved this by decreasing the amount of partitions on certain topics, and increasing the timeout for others, would it not be better to start the heartbeat before consumer creation? This 'bug' took us a quite some time to figure out what was going on, as it's not apparent from both the consumer or Kafka.

Consumer API not working

Hey,

I was using the sarama-cluster high level consumer API. However, I am not getting any messages. The consumer is not consuming anything. Can you please have a look. Here is my code (it is very similiar to one of the examples):

  config := cluster.NewConfig()
  config.Consumer.Return.Errors = true
  consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, "CG1", []string{"my_topic2"}, config)
  if err != nil {
      panic(err)
  }

  defer func() {
      if err := consumer.Close(); err != nil {
          log.Fatalln(err)
      }
  }()

  // Trap SIGINT to trigger a shutdown.
  signals := make(chan os.Signal, 1)
  signal.Notify(signals, os.Interrupt)
  consumed := 0
  ConsumerLoop:
  for {
      select {
      case msg := <-consumer.Messages():
          log.Printf("Consumed message offset %d, %s\n", msg.Offset, string(msg.Value[:]))
          consumed++
      case <-signals:
          break ConsumerLoop
      }
  }

  log.Printf("Consumed: %d\n", consumed)

PS: The sarama API's for consumer is working for me with a similiar code fragment (I am using partionerconsumer to consume).

`MarkOffset` doesn't persist offset

First thank you for writing this library it's been a great help to us.

I'm noticing some strange behavior with offset persistence. I've been testing with the provided cmd/sarama-cluster-cli/main.go tool, targeting a kafka broker set up using sarama's vagrant environment.

Consumer metadata doesn't seem to be making it to zookeeper /consumer path. This prevents Kafka's tools like kafka-consumer-groups.sh from working properly, furthermore after a broker restart or consumer restart the offset position is lost resulting it data processing loss. I'm not sure if this is a bug in this project or in sarama but thought this would be a good place to start.

Here is a session where I use kafka's verifiable-producer tool to publish 10 messages on a topic
https://asciinema.org/a/1ohv1sp2h19wz9vkt4w2qcmt2

Then I start a sarama-cluster using

go run cmd/sarama-cluster-cli/main.go -brokers="192.168.100.67:9091" -group="bsm" -offset="oldest" -topics="test.4" -verbose=true
...
test.4/0/0  3
test.4/0/1  7
test.4/2/0  0
test.4/2/1  4
test.4/2/2  8
test.4/3/0  2
test.4/3/1  6
test.4/1/0  1
test.4/1/1  5
test.4/1/2  9
...

Keeping this consumer running I inspect zookeeper and no consumer data is found.

https://asciinema.org/a/1n3dwextiqnl4kfma2lbselmp

It's my understanding that kafka should be handling all of the zookeeper ugliness for us, but that doesn't seem to be the case. Looking into the kafka zookeeper utility I noticed that the /consumers path is still used on the kafka side.

Have you ever seen your offsets persist in zookeeper using sarama-cluster?

Unable to listen to messages with cluster.Consumer

I must be doing something wrong. I can successfully publish and consume kafka 0.9 messages with plain sarama, i.e. when I am publishing to a specific partition and listening on that partition. However, when I attempt to use cluster.Consumer I can no longer consume any published messages. In this scenario, I am using sarama.SyncProducer, initialized with NewSyncProducer(). Consumer is initialized with cluster.NewConsumer(). Any help will be greatly appreciated.

sarama.Client and sarama.PartitionConsumer are now interfaces

I had to make the following changes to get sarama-cluster to work.

From 71ae4b14af93af8b3dbde9664035baf036bafc02 Mon Sep 17 00:00:00 2001
From: Sean Bowman <[email protected]>
Date: Fri, 13 Mar 2015 14:54:15 -0600
Subject: [PATCH] Shopify changed sarama.Client to an interface.  Updated to
 support.

---
 consumer.go | 12 ++++++------
 util.go     |  2 +-
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/consumer.go b/consumer.go
index b90acd0..6f2f150 100644
--- a/consumer.go
+++ b/consumer.go
@@ -69,8 +69,8 @@ func (c *Config) normalize() {
 type Consumer struct {
    id, group, topic string

-   client   *sarama.Client
-   consumer *sarama.Consumer
+   client   sarama.Client
+   consumer sarama.Consumer
    config   *Config
    zoo      *ZK
    messages chan *sarama.ConsumerMessage
@@ -111,7 +111,7 @@ func NewConsumer(addrs, zookeepers []string, group, topic string, config *Config

 // NewConsumerFromClient creates a new consumer for a given topic, reuing an existing client
 // You MUST call Close() to avoid leaks.
-func NewConsumerFromClient(client *sarama.Client, zookeepers []string, group, topic string, config *Config) (*Consumer, error) {
+func NewConsumerFromClient(client sarama.Client, zookeepers []string, group, topic string, config *Config) (*Consumer, error) {
    if config == nil {
        config = new(Config)
    }
@@ -306,7 +306,7 @@ func (c *Consumer) commitLoop() error {
 }

 // Message consumer loop for a single partition consumer
-func (c *Consumer) consumeLoop(done, errs chan struct{}, wait *sync.WaitGroup, pcsm *sarama.PartitionConsumer) {
+func (c *Consumer) consumeLoop(done, errs chan struct{}, wait *sync.WaitGroup, pcsm sarama.PartitionConsumer) {
    defer wait.Done()

    for {
@@ -420,7 +420,7 @@ func (c *Consumer) reset(claims Claims) (err error) {
    wait := sync.WaitGroup{}
    for _, pcsm := range claims {
        wait.Add(1)
-       go func(c *sarama.PartitionConsumer) {
+       go func(c sarama.PartitionConsumer) {
            defer wait.Done()
            c.Close()
        }(pcsm)
@@ -437,7 +437,7 @@ func (c *Consumer) reset(claims Claims) (err error) {
 }

 // Claims a partition
-func (c *Consumer) claim(partitionID int32) (*sarama.PartitionConsumer, error) {
+func (c *Consumer) claim(partitionID int32) (sarama.PartitionConsumer, error) {
    err := c.zoo.Claim(c.group, c.topic, partitionID, c.id)
    if err != nil {
        return nil, err
diff --git a/util.go b/util.go
index dfa6ea8..a973eb7 100644
--- a/util.go
+++ b/util.go
@@ -12,7 +12,7 @@ import (

 /* Claims map */

-type Claims map[int32]*sarama.PartitionConsumer
+type Claims map[int32]sarama.PartitionConsumer

 // PartitionIDs returns the associated partition IDs
 func (c Claims) PartitionIDs() []int32 {
-- 
2.1.3

When I add partitions to a topic, consumer does not trigger a rebalance.

Kafka version : 0.9.0.1
When I add partitions to a topic, the producer will send message to addition partition.
Because of kafka does not notify consumer by heartbeat api (Is this a kafka issue or normal behavior, but I cannot find this in jira of kafka?). So it dose not trigger a rebalance.
So, it looks like "I write some message in kafka, but I cannot read them".
Should we add an goroutine to check the change of the partition, or other good idea?

Feature Request: expose partitionMap's Snapshot

We have CommitOffsets, but no corresponding GetOffsets. Could we expose the c.subs.Snapshot into the public?

Scenario like following:

  1. compare the consumer offsets and topic offsets to see if the consumer is catching up.
  2. get the consumer offsets, and commit the same offsets for other group.

Any thoughts?

Breaking change in Shopify/sarama

They refactored the consumer event processing, breaking apart events into messages and errors and adding a "PartitionConsumer" layer (as opposed to what other type of consumer?).

Close() doesn't seem to be thread safe / idempotent

I have multiple goroutines reading from a sarama-cluster.Consumer. On fatal errors during processing, I have the goroutines clean up. Part of this clean up is calling Close() on the Consumer from the goroutines. During a recent test, the calling of Close() by multiple goroutines in quick succession caused this panic:

panic: close of closed channel
goroutine 995 [running]:
panic(0x7a3d80, 0xc420840cf0)
    /usr/local/go/src/runtime/panic.go:500 +0x1a1
stash.keynetics.com/reap/collector.git/vendor/github.com/bsm/sarama-cluster.(*Consumer).Close(0xc42009a370, 0x0, 0x0)
    /home/gsp/projects/golang/src/stash.keynetics.com/reap/collector.git/vendor/github.com/bsm/sarama-cluster/consumer.go:177 +0x33
...

I think Close() should support being called multiple times without a panic. Does this seem reasonable? Or should I be making sure it only gets called once on my side and not by multiple goroutines at once?

Doesn't confirm partition offset?

It doesn't look like sarama-cluster is confirming the message offset for a topic partition stored in Zookeeper against what's actually in Kafka. I'm seeing an error message that the requested message offset doesn't exist in Kafka.

I had this issue before when I was just using straight sarama. If I stored a partition offset in Zookeeper, and either (a) didn't run my app for a while so the partition offset was removed from Kafka or (b) manually cleared all the Kafka data and restarted from scratch, the Zookeeper partition offset would be off. When it tried to ask Kafka for a message, Kafka returns an "offset doesn't exist" error or something like that.

I looked in your code and couldn't find a reference to "GetOffset", so I assume you're not checking offsets. I haven't had a chance to look at the problem deeply, but I thought you might know off the top of your head if I'm just missing something, or this really isn't implemented.

What I did was confirm the offset with Kafka when my app started. If my offset was older than the earliest value in Kafka, I reset my value to that value and logged an info message. If my offset was higher than Kafka's, I reset to 0 and logged a warning, assuming that data had been wiped. Maybe this is a little presumptuous, but it worked well enough for my app.

How to disable auto commit?

Hey all,

currently trying to figure out how I disable auto commit. I'd like to commit a message by hand after processing it to avoid false positive message commits - the only way I've found so far is to set config.Consumer.Offsets.CommitInterval to sth. > 10 sec, more likely 1m ... but this doesn't seem to be the preferred way, no? At least it feels a bit wrong. :D

Either way this should be documented somewhere - partly related to #75 and maybe related to #73โ”

Have a nice day and greetings from Germany!
Alex

Using gopkg style dependency management creates issues

Hi,

We have been using sarama-cluster for a while and it works pretty good. However using gopkg style dependencies creates a lot of issues such as problems when trying to use the mock subscribers/producers to write tests. (IBM/sarama#552).

Would you consider switching to master imports and letting users be able to use their own dependency management ? ( such as vendoring or gopkg or whatever makes sense for them )

new consumer connects fine but subscriptions is empty?

Hi, i'm a bit new to kafka, sarama and sarama-cluster.
I have a simple publisher like so https://github.com/raintank/raintank-metric/blob/c6eae22ef4acf868a2f5949436720d7c834b0fbf/fake_metrics/out/kafkamdm/kafkamdm.go
my kafka (i tried both 0.9 and 0.10) has 1 broker and my topic has 1 partition and production to the topic is working fine. i can confirm the number of partitions and message rate with kafka manager.

I also have a simple consumer https://github.com/raintank/raintank-metric/blob/c6eae22ef4acf868a2f5949436720d7c834b0fbf/metric_tank/in/kafkamdm/kafkamdm.go using sarama-cluster.
it logs "kafka-mdm consumer started without error" but immediately gets notification messages like so:

(*cluster.Notification)(0xc4200e3660)({
 Claimed: (map[string][]int32) {
 },
 Released: (map[string][]int32) {
 },
 Current: (map[string][]int32) {
 }
})

When I run log.Info("kafka-mdm subscriptions: %v", k.consumer.Subscriptions()) I get map[] back.

any idea why it may not be picking up the partition(s) to read from?
If this is the wrong place to ask, do you have a recommendation for where I should ask?

thanks,
Dieter

kafka server: The version of API is not supported

We have started seeing this error when we use the sarama consumer

kafka server: The version of API is not supported.

Here is how we create it:

clusterConf := cluster.NewConfig()
if verbose {
    sarama.Logger = log.StandardLogger()
} else {
    clusterConf.Consumer.Return.Errors = true
    clusterConf.Group.Return.Notifications = true
}
clusterConf.Consumer.Offsets.Initial = sarama.OffsetOldest

consumer, err := cluster.NewConsumer(
    conf.KafkaHosts,
    conf.KafkaConsumerGroup,
    strings.Split(conf.KafkaTopic, ","),
    clusterConf)

By default, sarama outputs error messages to stdout

Some of the functionality in sarama-cluster relies on sarama passing error messages to the PartitionConsumer.Errors() channel. However, by default sarama will simply write errors from Kafka to the logger (stdout).

To support sarama-cluster, you need to configure the sarama.Client with Consumer.Return.Errors:

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
client, err := sarama.NewClient(servers, config)

//...

consumer, err := cluster.NewConsumerFromClient(client, zookeepers, group, topic, clusterConfig)

It's only blind luck that I stumbled across this. This seems like a pretty big deal, but it's just briefly mentioned in one or two places in the sarama docs. See the description under https://godoc.org/github.com/Shopify/sarama#PartitionConsumer and the comments in https://godoc.org/github.com/Shopify/sarama#Config, under "Consumer".

ConsumerRebalanceListener alternative

Similar to the ConsumerRebalanceListener of the Java API, it would be great to get informed by the consumer that the partition assignments have changed so related state can be updated or logged.

On first glance, adding a channel to the API where rebalance events are posted and selected in the same loop as Messages seems like a good idea.
Removals would get posted before https://github.com/bsm/sarama-cluster/blob/master/consumer.go#L267, whereas additions would get posted after the partition consumer was succesfully created, following the specifications outlined in the Java doc.

Not consuming all messages

I'm using V2 branch 0cd44ad and kafka 0.9.1

I'm creating a brand new topic. Seeding 30 messages on 10 partitions. You will see that partition 2 and 3 do not start at offset 0.

2016/05/24 21:34:55 added message topic topic-a partition 2 offset 0 value PLAINDATA-00000000 2016/05/24 21:34:55 added message topic topic-a partition 1 offset 0 value PLAINDATA-00000001 2016/05/24 21:34:55 added message topic topic-a partition 0 offset 0 value PLAINDATA-00000002 2016/05/24 21:34:56 added message topic topic-a partition 3 offset 0 value PLAINDATA-00000003 2016/05/24 21:34:56 added message topic topic-a partition 2 offset 1 value PLAINDATA-00000004 2016/05/24 21:34:56 added message topic topic-a partition 1 offset 1 value PLAINDATA-00000005 2016/05/24 21:34:56 added message topic topic-a partition 0 offset 1 value PLAINDATA-00000006 2016/05/24 21:34:56 added message topic topic-a partition 3 offset 1 value PLAINDATA-00000007 2016/05/24 21:34:56 added message topic topic-a partition 2 offset 2 value PLAINDATA-00000008 2016/05/24 21:34:56 added message topic topic-a partition 1 offset 2 value PLAINDATA-00000009 2016/05/24 21:34:56 added message topic topic-a partition 1 offset 3 value PLAINDATA-00000010 2016/05/24 21:34:56 added message topic topic-a partition 0 offset 2 value PLAINDATA-00000011 2016/05/24 21:34:56 added message topic topic-a partition 3 offset 2 value PLAINDATA-00000012 2016/05/24 21:34:56 added message topic topic-a partition 2 offset 3 value PLAINDATA-00000013 2016/05/24 21:34:56 added message topic topic-a partition 1 offset 4 value PLAINDATA-00000014 2016/05/24 21:34:56 added message topic topic-a partition 0 offset 3 value PLAINDATA-00000015 2016/05/24 21:34:56 added message topic topic-a partition 3 offset 3 value PLAINDATA-00000016 2016/05/24 21:34:56 added message topic topic-a partition 2 offset 4 value PLAINDATA-00000017 2016/05/24 21:34:56 added message topic topic-a partition 1 offset 5 value PLAINDATA-00000018 2016/05/24 21:34:56 added message topic topic-a partition 0 offset 4 value PLAINDATA-00000019 2016/05/24 21:34:56 added message topic topic-a partition 0 offset 5 value PLAINDATA-00000020 2016/05/24 21:34:56 added message topic topic-a partition 3 offset 4 value PLAINDATA-00000021 2016/05/24 21:34:56 added message topic topic-a partition 2 offset 5 value PLAINDATA-00000022 2016/05/24 21:34:56 added message topic topic-a partition 1 offset 6 value PLAINDATA-00000023 2016/05/24 21:34:56 added message topic topic-a partition 0 offset 6 value PLAINDATA-00000024 2016/05/24 21:34:56 added message topic topic-a partition 3 offset 5 value PLAINDATA-00000025 2016/05/24 21:34:56 added message topic topic-a partition 2 offset 6 value PLAINDATA-00000026 2016/05/24 21:34:56 added message topic topic-a partition 1 offset 7 value PLAINDATA-00000027 2016/05/24 21:34:56 added message topic topic-a partition 0 offset 7 value PLAINDATA-00000028 2016/05/24 21:34:56 added message topic topic-a partition 3 offset 6 value PLAINDATA-00000029

When I start up the consumer it skips a couple of messages

2016/05/24 21:34:57 consumed message topic topic-a partition 0 offset 0 value PLAINDATA-00000002 2016/05/24 21:34:57 consumed message topic topic-a partition 1 offset 0 value PLAINDATA-00000001 2016/05/24 21:34:57 consumed message topic topic-a partition 1 offset 1 value PLAINDATA-00000005 2016/05/24 21:34:57 consumed message topic topic-a partition 2 offset 1 value PLAINDATA-00000004 2016/05/24 21:34:57 consumed message topic topic-a partition 0 offset 1 value PLAINDATA-00000006 2016/05/24 21:34:57 consumed message topic topic-a partition 2 offset 2 value PLAINDATA-00000008 2016/05/24 21:34:57 consumed message topic topic-a partition 3 offset 1 value PLAINDATA-00000007 2016/05/24 21:34:57 consumed message topic topic-a partition 1 offset 2 value PLAINDATA-00000009 2016/05/24 21:34:57 consumed message topic topic-a partition 3 offset 2 value PLAINDATA-00000012 2016/05/24 21:34:57 consumed message topic topic-a partition 0 offset 2 value PLAINDATA-00000011 2016/05/24 21:34:57 consumed message topic topic-a partition 1 offset 3 value PLAINDATA-00000010 2016/05/24 21:34:57 consumed message topic topic-a partition 2 offset 3 value PLAINDATA-00000013 2016/05/24 21:34:57 consumed message topic topic-a partition 1 offset 4 value PLAINDATA-00000014 2016/05/24 21:34:57 consumed message topic topic-a partition 0 offset 3 value PLAINDATA-00000015 2016/05/24 21:34:57 consumed message topic topic-a partition 3 offset 3 value PLAINDATA-00000016 2016/05/24 21:34:57 consumed message topic topic-a partition 0 offset 4 value PLAINDATA-00000019 2016/05/24 21:34:57 consumed message topic topic-a partition 1 offset 5 value PLAINDATA-00000018 2016/05/24 21:34:57 consumed message topic topic-a partition 2 offset 4 value PLAINDATA-00000017 2016/05/24 21:34:57 consumed message topic topic-a partition 2 offset 5 value PLAINDATA-00000022 2016/05/24 21:34:57 consumed message topic topic-a partition 3 offset 4 value PLAINDATA-00000021 2016/05/24 21:34:57 consumed message topic topic-a partition 0 offset 5 value PLAINDATA-00000020 2016/05/24 21:34:57 consumed message topic topic-a partition 0 offset 6 value PLAINDATA-00000024 2016/05/24 21:34:57 consumed message topic topic-a partition 1 offset 6 value PLAINDATA-00000023 2016/05/24 21:34:57 consumed message topic topic-a partition 3 offset 5 value PLAINDATA-00000025 2016/05/24 21:34:57 consumed message topic topic-a partition 2 offset 6 value PLAINDATA-00000026 2016/05/24 21:34:57 consumed message topic topic-a partition 1 offset 7 value PLAINDATA-00000027 2016/05/24 21:34:57 consumed message topic topic-a partition 0 offset 7 value PLAINDATA-00000028 2016/05/24 21:34:57 consumed message topic topic-a partition 3 offset 6 value PLAINDATA-00000029

Improve docs regarding examples

Hey all,

first of all thanks for this awesome library! I's actually saving A LOT of time! ๐Ÿ’ช

Unfortunately there is a small lack of documentation regarding examples - READMY says Documentation and example are available via godoc [..] but godoc doesn't contain examples?

I've got my consumer implementation running, but this was paired with a lot of reading sarama-clusters source code and the cli tool, which should be the case, no? Would be great if you/we could add some examples to docs. :)

Have a nice day and greetings from Germany!
Alex

Marking the offset per partition

Dear dim,

Your library works really well but I needed a way to mark the offset per partition (as different partitions could run at a different paste and I want ONLY message constructed right before midnight to be committed per partition).

I've added the following function into the lib, this might be handy for your lib as well?
Could you also have a look if this won't cause any race conditions or other unexpected behavior?

func (c *Consumer) MarkOffsetPerPartition(topic string, part int32, offset int64) {
    c.subs.Fetch(topic, part).MarkOffset(offset, "")
}

Thanks in advance,
tarswars

Question about OffsetManager

I want to use this repo, but I am confused by some questions. And I am not sure is here a good place to ask questions.

  1. It was mentioned in sarama repo in this thread: IBM/sarama#619. @dim said he will have to rewrite some parts of OffsetManager, is the work all done? I mean I should be able to commit/fetch offset of a topic/partition by using this sarama-cluster package alone, right?
  2. Here are some of my understandings, just saying out laud. After creating a consumer group, if I add more consumers into the group, the load (as in partitions) will be rebalanced to those new consumers? And the opposite, if some consumers down in the same group, the load will be rebalanced to other consumers in the group?
  3. Is there any good and clean example of using this package?

I appreciate any answer and correction.

Thanks.

rebalance loop for ever

Hello, when using sarama-cluster, setting the Config field Net.ReadTimeout to a lowish value such as 5 seconds make sarama loop for ever with when creating/rebalancing the consumer group (logs from sarama):

2016/05/25 14:36:48 read tcp 127.0.0.1:57534->127.0.0.1:9092: i/o timeout
2016/05/25 14:36:49 cluster/consumer -531b41ea-c2b2-4e06-a345-afe8b4f788ef rebalance
2016/05/25 14:36:49 client/coordinator requesting coordinator for consumergoup testcg from localhost:9092
2016/05/25 14:36:49 client/coordinator coordinator for consumergoup testcg is #0 (localhost:9092)
2016/05/25 14:36:49 read tcp 127.0.0.1:57534->127.0.0.1:9092: i/o timeout
2016/05/25 14:36:49 cluster/consumer -531b41ea-c2b2-4e06-a345-afe8b4f788ef rebalance
2016/05/25 14:36:49 client/coordinator requesting coordinator for consumergoup testcg from localhost:9092
2016/05/25 14:36:49 client/coordinator coordinator for consumergoup testcg is #0 (localhost:9092)
...

Kafka is running on localhost and the load average is at 0.5.

I first stumbled upon this problem from time to time while using the default ReadTimeout (30s). I tried to reproduce the problem by changing some config options and low ReadTimeout seem to lead to the problem reproduction.

Custom consumer ID?

Right now the consumer generates its own ID (newGUID) because the customID field in ConsumerConfig is private. Could you modify it so I can manually set the consumer ID in ConsumerConfig?

Awesome library, BTW.

Error when staring more consumers than patitions

Hi,

When starting more consumers than partitions I get the followings error and all consumers rebalances every few seconds until I shut down the extra consumer.

2016/03/26 08:10:05 Error: kafka server: The provided member is not known in the current generation.

Other kafka client I used just added the extra consumer in a waiting state until a other consumer dies.

Any suggestions?

Thank you.

Regards,
Riaan

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.