Giter VIP home page Giter VIP logo

go_kafka_client's People

Contributors

art4711 avatar echupriyanov avatar edgefox avatar evanjones avatar fugitech avatar jedisct1 avatar joestein avatar jonbonazza avatar marcusking01 avatar miketonks avatar miry avatar rogerclermont avatar serejja avatar spenczar avatar vladiacob avatar xiocode avatar yanzay avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go_kafka_client's Issues

Godeps lists the wrong version of jimlawless/cfg

Godeps file is currently referencing commit 4b1e3c1869d4e608fcbda6994e5f08dd9c6beaa1, however the upstream siesta project uses the LoadNewMap method from cfg which was not added until the commit after we are specifying. Updating this dep to the latetst: 8cf9686de5a8b290717494297d381948feffe19f seems to get things building again.

go get -u github.com/stealthly/go_kafka_client throw error

go get -u github.com/stealthly/go_kafka_client

github.com/stealthly/go_kafka_client
workspace/src/github.com/stealthly/go_kafka_client/low_level_client.go:260: connectorConfig.ClientId undefined (type *siesta.ConnectorConfig has no field or method ClientId, but does have ClientID)
workspace/src/github.com/stealthly/go_kafka_client/low_level_client.go:314: undefined: siesta.OffsetOutOfRange

seems like typo misstake

consumer.Close() never finishes and always panics

I'm trying to Close() the consumer under circumstances, but whenever I call the Close() function, the worker manager never seems to finish.

13:39:52.1 | 2015-07-23/13:39:52 [INFO] [ConsumerFetcherRoutine-TTV-0582.local:e989b4a8-8365-45f3-e3b1-eaf942271e68-0] Closing fetcher
13:39:52.1 | 2015-07-23/13:39:52 [INFO] [ConsumerFetcherRoutine-TTV-0582.local:e989b4a8-8365-45f3-e3b1-eaf942271e68-0] Stopped fetcher
13:39:52.1 | 2015-07-23/13:39:52 [INFO] [TTV-0582.local:e989b4a8-8365-45f3-e3b1-eaf942271e68-manager] Successfully closed all fetcher manager routines
13:39:52.1 | 2015-07-23/13:39:52 [INFO] [TTV-0582.local:e989b4a8-8365-45f3-e3b1-eaf942271e68] Stopping worker manager...
13:44:52.1 | panic: Graceful shutdown failed

Are there pre-requisites that need to be fulfilled before calling the function? I've tried upping the timeout to 5 minutes, but it still does nothing.

need a user interface that you can connect to for each running go kafka client consumer process

We need more visibility into what is going on with each and every process that is running. This needs to be available in two ways.

  1. We need a gui available through internet web and mobile devices.
  2. We need a rest api for other systems to interact with.

In both causes we should expose and make available an oath connection that can sso with their web console. This additional level of authentication should be a secondary item. No login is a lower level of security and we can manage that by the networked system that is running it (for now).

The gui should provide a dash board that allows folks to watch key metrics from within the consumer that is being sent on a channel the dashboard is receiving. The rest api should return this information. In addition to metrics, log information should be bufered and traced. That trace that produces to kafka should also go on the channel.

We also need to have any one dashboard you connect to aggregate the other consumers also that are running. It could even write log.console would be good, we need to map and key on it somehow too. Also need a way to change and control verbosity AND do it in different areas (e.g. funcA info, funcB debug) and do that without changing the state and doing it without locks and use channels.

consumer does not fetch any messages

I ran into an issue where the consumer does not fetch any messages.

Here is the scenario:

Given I consume a topic that does not exist in kafka yet. When I create the topic and publish a message to it then the consumer still won't fetch any messages.

Here is my log:

2015-03-16/17:32:22 [INFO] [ConsumerFetcherRoutine-ff1018a0-a873-a57d-604d-466ac5fe3a0b-0] Fetcher started
2015-03-16/17:32:22 [DEBUG] [ConsumerFetcherRoutine-ff1018a0-a873-a57d-604d-466ac5fe3a0b-0] Owner of {foobar %!s(int32=0)}
2015-03-16/17:32:22 [DEBUG] [ConsumerFetcherRoutine-ff1018a0-a873-a57d-604d-466ac5fe3a0b-0] Sending ask next to ConsumerFetcherRoutine-ff1018a0-a873-a57d-604d-466ac5fe3a0b-0 for {foobar %!s(int32=0)}
2015-03-16/17:32:22 [DEBUG] [ConsumerFetcherRoutine-ff1018a0-a873-a57d-604d-466ac5fe3a0b-0] Sent ask next to ConsumerFetcherRoutine-ff1018a0-a873-a57d-604d-466ac5fe3a0b-0 for {foobar %!s(int32=0)}
2015-03-16/17:32:22 [DEBUG] [ConsumerFetcherRoutine-ff1018a0-a873-a57d-604d-466ac5fe3a0b-0] Received asknext for {Topic: foobar, Partition: 0}
2015-03-16/17:32:22 [DEBUG] [ConsumerFetcherRoutine-ff1018a0-a873-a57d-604d-466ac5fe3a0b-0] Partition map: map[{foobar 0}:-1]
2015-03-16/17:32:22 [WARN] [ConsumerFetcherRoutine-ff1018a0-a873-a57d-604d-466ac5fe3a0b-0] Received invalid offset for {foobar %!s(int32=0)}

The consumerFetcherRoutine stops executing by returning here: https://github.com/stealthly/go_kafka_client/blob/fc357eab2d340d2fe2f78cb23e6d85b5491cfdf4/fetcher.go#L244-L247

                        if existingOffset, exists := f.partitionMap[nextTopicPartition]; exists {
                            offset = existingOffset
                            if isOffsetInvalid(offset) {
                                Warnf(f, "Received invalid offset for %s", nextTopicPartition)
                                return
                            }
                        }

Commenting out the return statement seems to resolve my issue, but I am not sure what other effects it may have.

By only consuming a topic without publishing a message shortly after, results in an exception (which is fine):

2015-03-16/17:33:03 [DEBUG] [zk] Getting all brokers in cluster
2015-03-16/17:33:03 [DEBUG] [zk] Getting info for broker 307116
2015-03-16/17:33:03 [DEBUG] [zk] Getting info for broker 458143
2015-03-16/17:33:03 [DEBUG] [zk] Trying to get partition assignments for topics [foobar]
2015-03-16/17:33:03 [DEBUG] [zk] Trying to get partition assignments for topics [foobar]
2015-03-16/17:33:03 [DEBUG] [zk] Trying to get partition assignments for topics [foobar]
2015-03-16/17:33:04 [DEBUG] [zk] Trying to get partition assignments for topics [foobar]
panic: Failed to create topicCount: zk: node does not exist

Install Error:

Installing go_kafka_client gives errors. There's a breaking change in siesta-producer that removes input parameters to producer.Close()

$go get github.com/elodina/go_kafka_client

github.com/elodina/go_kafka_client

elodina/go_kafka_client/log_emitters.go:137: too many arguments in call to k.producer.Close
elodina/go_kafka_client/mirror_maker.go:125: too many arguments in call to producer.Close
elodina/go_kafka_client/testing_utils.go:154: too many arguments in call to p.Close
elodina/go_kafka_client/testing_utils.go:178: too many arguments in call to p.Close

is caused by:

elodina/siesta-producer@94fa767

Custom starting offset

Hi all,

Currently i do some event stream style processing using homebrew Go code passing messages using zmq. There is no resiliency/failover, and there is no partitioning of messages as provided by kafka.

I am looking into using Kafka + go_kafka_client and have some specific questions.

On startup (and when re-partition happens) I want to start at an offset that is x mins ago (approx - does not heve to be exact, I can deal with few extra messages) . It appears sarama can do this. How would I go about using the work distribution goodness of go_kafka_client and put in some custom starting offset logic? Ignoring the fact that ive already consumed a specific message, or that there are unconsumed messages from > x minutes ago.

-Sajal

BUG?: why not break after handle all request?(tryRemoveOldApiRequests)

why not break after handle all request?

is it usefull to retry ?

func (this *ZookeeperCoordinator) tryRemoveOldApiRequests(group string, api ConsumerGroupApi) error {
    requests := make([]string, 0)
    var err error

    apiPath := fmt.Sprintf("%s/%s", newZKGroupDirs(this.config.Root, group).ConsumerApiDir, api)
    for i := 0; i <= this.config.MaxRequestRetries; i++ {
        requests, _, err = this.zkConn.Children(apiPath)
        if err != nil {
            continue
        }
        for _, request := range requests {
            var data []byte
            var t int64
            childPath := fmt.Sprintf("%s/%s", apiPath, request)
            if api == Rebalance {
                if data, _, err = this.zkConn.Get(childPath); err != nil && err == zk.ErrNoNode {
                    // It's possible another consumer deleted the node before we could read it's data
                    continue
                }
                if t, err = strconv.ParseInt(string(data), 10, 64); err != nil {
                    t = int64(0) // If the data isn't a timestamp ensure it will be deleted anyway.
                }
            } else if api == BlueGreenDeploymentAPI {
                if t, err = strconv.ParseInt(string(request), 10, 64); err != nil {
                    break
                }
            }

            // Delete if this zk node has an expired timestamp
            if time.Unix(t, 0).Before(time.Now().Add(-10 * time.Minute)) {
                // If the data is not a timestamp or is a timestamp but has reached expiration delete it
                err = this.deleteNode(childPath)
                if err != nil && err != zk.ErrNoNode {
                    break
                }
            }
        }
    }

    return err
}

Panic thrown when Zookeeper Node Restarts

Hi guys,

Here is the situation. We are using the GO Kafka Client to create a consumer listening to kafka. Our DevOps was doing a rolling upgrade to the Zookeeper cluster that our Kafka cluster uses. When the zookeeper node that the client listens to went down, we errors being written out that the server was unavailable, but out service stayed up and at least to process connections.

However, when the Zookeeper node that was down restarted the GO_Kafka Client threw a Panic at we traced the problem back to the zk_coordinator.go trySubscribeForChanges()'s function. Here is the specific part of the trace we tracked it down to.

**
github.com/stealthly/go_kafka_client.func·050()
/Users/matt.durham/repos/edge-proxy/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:528 +0x992
created by github.com/stealthly/go_kafka_client.(ZookeeperCoordinator).trySubscribeForChanges
/Users/matt.durham/repos/edge-proxy/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:562 +0xbdb
*

Is this a bug or is this expected behavior? It seems like the consumer is tied to a specific zookeeperConnection and does not refresh after the connection has been re-established. Any help here would be greatly appreciated.

Improve tests stability

Build is failing from time to time:
testing_utils.go:162: Failed to produce message test-kafka-message-0 because: kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes.

76% of time are `runtime.goexit`

consuming too slow compare to java client.

on the same machine. java client process 250k messages per second in snappy encoding each thread, 3 threads in total. but this client can only reach 30k at maximum. on a better machine this client can reach 70k per second.

(pprof) top -cum
3.61s of 25.42s total (14.20%)
Dropped 165 nodes (cum <= 0.13s)
Showing top 10 nodes out of 139 (cum >= 2.85s)
      flat  flat%   sum%        cum   cum%
     0.02s 0.079% 0.079%     19.47s 76.59%  runtime.goexit
     0.11s  0.43%  0.51%      6.15s 24.19%  github.com/stealthly/go_kafka_client                    .func·047
     0.16s  0.63%  1.14%      4.94s 19.43%  main.func·003
         0     0%  1.14%      4.62s 18.17%  System
         0     0%  1.14%      4.07s 16.01%  github.com/rcrowley/go-metrics.(*Sta                    ndardTimer).Time
     0.16s  0.63%  1.77%      3.40s 13.38%  github.com/stealthly/go_kafka_client                    .func·037
     0.04s  0.16%  1.93%      3.32s 13.06%  github.com/stealthly/go_kafka_client                    .Tracef
     0.10s  0.39%  2.32%      3.16s 12.43%  github.com/stealthly/go_kafka_client                    .(*WorkerManager).processBatch
     2.67s 10.50% 12.82%      3.14s 12.35%  runtime.mallocgc
     0.35s  1.38% 14.20%      2.85s 11.21%  reflect.Select

SiestaClient not commiting offsets with Go Kafka Client

Hello,

Not sure if this git issue should go here or to the Siesta client. We recently modified mirror_maker.go to force it to use SiestaClient as the low level client and after adding some minor debugging we can verify that it is trying to get the brokers from ZK.

However we are having hard time in getting it to commit the offset, it is just not even trying to call the CommitOffset function (inside low_level.client), are we missing something really obvious?

config.LowLevelClient = NewSiestaClient(config)

Thx!

when consumer connect to zk fail, it create a large number connect in short time.

I find when consumer client connect to zk fail , it will try to connect zk again very quickly.then it create a lager number of connect in short time.
In this status, consumer client cost too many file handler, and start to show logs like that:
2015/12/08 07:45:02 Failed to connect to 29.1.1.85:3351: dial tcp 29.1.1.85:3351: too many open files
2015/12/08 07:45:02 Failed to connect to 29.1.1.87:3351: dial tcp 29.1.1.87:3351: too many open files
2015/12/08 07:45:02 Failed to connect to 29.1.1.87:3351: dial tcp 29.1.1.87:3351: too many open files
This err cause other processes in the same system can't work abnormal.

In my logs, I get this info:
First, I get logs like this, may be zk have some problem.This
2015/12/08 19:39:00 Failed to connect to 29.1.1.85:3351: dial tcp 29.1.1.85:3351: i/o timeout
2015/12/08 19:39:00 Failed to connect to 29.1.1.85:3351: dial tcp 29.1.1.85:3351: i/o timeout
2015/12/08 19:39:00 Failed to connect to 29.1.1.85:3351: dial tcp 29.1.1.85:3351: i/o timeout

also, it has some panic info in my log:
panic: zk: could not connect to a server
goroutine 1 [running]:
github.com/stealthly/go_kafka_client.NewConsumer(0xc2080e8000, 0xc2080ba110)
/home/zt/git/upload/ops-mgr/src/paas-sdk/src/github.com/stealthly/go_kafka_client/consumer.go:89 +0x628
policy_engine/datamanager/datacollector.NewCollector(0xc2080e8240, 0x7f52712724c8)
/tmp/tmp.jvHyXSdhIY/gopath/src/policy_engine/datamanager/datacollector/datacollector.go:39 +0x378
main.main()
/tmp/tmp.jvHyXSdhIY/gopath/src/policy_engine/worker/heartbeat/main.go:48 +0x307

how can I avoid this problem?
I think it may be didn't close zk connect in some abnormal situation and try to recreate new connect too quickly.
I use old version , maybe two month ago. If this problem had be fixed ,it will be good.

Mirror Maker for Go Kafka Client

Should be like the existing high level consumer mirror maker however it should have 3 differences.

  1. It should use https://github.com/stealthly/go_kafka_client
  2. It should preserve the partition number, if it read from partition 5 then write to partition 5. This needs to be an option so that if the partition count is different you can turn this off and make it random.
  3. We need to have the destination have an option --prefix when starting it. This is important for use of e.g. 3 data centers each mirroring to the other two. The destination topic name should have where it came from to deal with collisions. This is optional because folks often have topic name already encompass this but not always.

Please preserve the existing command line options so that it is easy for folks to use this new version --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

Rebalance breaks when running multiple consumers

Usecase:
Run N chronos/mesos jobs for a singe consumergroup where N == the number of partitions in the topic. (Lets assume a single topic consumergroup here)

Test:
-Create a go consumer that eats from a large local kafka/topic with 2 partitions.
-Start once (pid 1) -> looks ok, alternates between partition 0 and 1
-Start another (pid 2) -> looks ok, consumes only from one partition
-But pid 1 is then crashed with following stack:

<<happily consuming here>>
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8002}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8003}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8004}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8005}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8006}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8007}
2015-04-23/17:22:25 [DEBUG] [zk] Getting info for broker 0
2015-04-23/17:22:25 [DEBUG] [zk] Trying to get partition assignments for topics [dev_allrequests_3]
2015-04-23/17:22:25 [DEBUG] [zk] Getting consumers in group asdasdsdads_cango
2015-04-23/17:22:25 [DEBUG] [zk] Getting consumers in group asdasdsdads_cango
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Releasing partition ownership
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Successfully released partition ownership
2015-04-23/17:22:25 [INFO] [zk] Commencing assertion series at /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69
2015-04-23/17:22:25 [INFO] [zk] Joining state barrier /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69
2015-04-23/17:22:25 [DEBUG] [zk] Trying to create path /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69/ebdffa95-ce32-e383-fddb-eec3d9a2e571 in Zookeeper
2015-04-23/17:22:25 [INFO] [zk] Successfully joined state barrier /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69
2015-04-23/17:22:25 [DEBUG] [zk] Trying to assert rebalance state for group asdasdsdads_cango and hash e8ee221f572c5a810b7c60818eadeb69 with 2
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] [%!!(MISSING)s(int32=0) %!!(MISSING)s(int32=1)]
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] {ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)} attempting to claim {Topic: dev_allrequests_3, Partition: 1}
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Consumer is trying to reflect partition ownership decision: map[{dev_allrequests_3 1}:{ebdffa95-ce32-e383-fddb-eec3d9a2e571 0}]

2015-04-23/17:22:25 [DEBUG] [zk] Trying to create path /consumers/asdasdsdads_cango/owners/dev_allrequests_3 in Zookeeper
2015-04-23/17:22:25 [DEBUG] [zk] Successfully claimed partition 1 in topic dev_allrequests_3 for {ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)}
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Consumer successfully claimed partition 1 for topic dev_allrequests_3
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Partition ownership has been successfully reflected
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Trying to reinitialize fetchers and workers
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Trying to update fetcher
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Updating fetcher with numStreams = 1
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Topic Registry = map[dev_allrequests_3:map[%!s(int32=1):{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: -1, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Fetcher Manager started
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] TopicInfos = [{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: -1, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Received asknext for {Topic: dev_allrequests_3, Partition: 1}
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Partition map: map[{dev_allrequests_3 0}:8008 {dev_allrequests_3 1}:7506]
2015-04-23/17:22:25 [DEBUG] [Sarama client] Adding block: topic=dev_allrequests_3, partition=1, offset=7506, fetchsize=1048576
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Sent partition data to {dev_allrequests_3 %!s(int32=1)}
2015-04-23/17:22:25 [DEBUG] [Sarama client] Processing fetch response
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Received asknext for {Topic: dev_allrequests_3, Partition: 0}
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Partition map: map[{dev_allrequests_3 0}:8008 {dev_allrequests_3 1}:7506]
2015-04-23/17:22:25 [DEBUG] [Sarama client] Adding block: topic=dev_allrequests_3, partition=0, offset=8008, fetchsize=1048576
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Sent partition data to {dev_allrequests_3 %!s(int32=0)}
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Updating fetcher configuration
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Got new list of partitions to process map[{dev_allrequests_3 1}:{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: 0, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] All partitions map: map[]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] There are obsolete partitions [{dev_allrequests_3 0}]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Fetcher ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0 parition map before obsolete partitions removal%!(EXTRA map[go_kafka_client.TopicAndPartition]int64=map[{dev_allrequests_3 0}:8008 {dev_allrequests_3 1}:7506])
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Remove partitions
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Stopping worker manager for {dev_allrequests_3 %!s(int32=0)}
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Trying to stop workerManager
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopping manager
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopping processor
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Successful manager stop
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopping committer
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Successful committer stop
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopped failure counter
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Leaving manager stop
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopped workerManager
2015-04-23/17:22:25 [DEBUG] [zk] Trying to update path /consumers/asdasdsdads_cango/offsets/dev_allrequests_3/0
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Stopping buffer: {Topic: dev_allrequests_3, Partition: 0}-MessageBuffer
2015-04-23/17:22:25 [INFO] [{Topic: dev_allrequests_3, Partition: 0}-MessageBuffer] Trying to stop message buffer
2015-04-23/17:22:25 [INFO] [{Topic: dev_allrequests_3, Partition: 0}-MessageBuffer] Stopping message buffer
2015-04-23/17:22:25 [INFO] [{Topic: dev_allrequests_3, Partition: 0}-MessageBuffer] Stopped message buffer
2015-04-23/17:22:25 [DEBUG] [Sarama client] Processing fetch response
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Fetcher ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0 parition map after obsolete partitions removal%!(EXTRA map[go_kafka_client.TopicAndPartition]int64=map[{dev_allrequests_3 1}:7506])
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Shutting down idle fetchers
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Closed idle fetchers
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Adding fetcher for partitions map[]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] partitionsPerFetcher: map[]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Applied new partition map map[{dev_allrequests_3 1}:{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: 0, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Notifying all waiters about completed update
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Updated fetcher
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Fetcher has been updated &{ebdffa95-ce32-e383-fddb-eec3d9a2e571 asdasdsdads_cango map[dev_allrequests_3:[{ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)}]] %!s(*go_kafka_client.StaticTopicsToNumStreams=&{ebdffa95-ce32-e383-fddb-eec3d9a2e571 map[dev_allrequests_3:1]}) map[dev_allrequests_3:[%!s(int32=0) %!s(int32=1)]] map[dev_allrequests_3:[{934b2c74-7e81-8d6b-52c4-fd034c9a273e %!s(int=0)} {ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)}]] [934b2c74-7e81-8d6b-52c4-fd034c9a273e ebdffa95-ce32-e383-fddb-eec3d9a2e571] [{Version: 1, Id: 0, Host: localhost, Port: 9092}] [all topics here]}
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Initializing worker managers from topic registry: map[dev_allrequests_3:map[%!s(int32=1):{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: -1, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]]
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Restarted streams
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Fetchers and workers have been successfully reinitialized
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Rebalance has been successfully completed
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x4b4388]

goroutine 62144 [running]:
github.com/stealthly/go_kafka_client.func·015()
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/fetcher.go:343 +0x618
github.com/stealthly/go_kafka_client.inReadLock(0xc208011048, 0xc2083bcf80)
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/utils.go:52 +0x54
github.com/stealthly/go_kafka_client.(*consumerFetcherRoutine).processPartitionData(0xc2086f9450, 0xc2086ec840, 0x11, 0x0, 0xc20c616000, 0x1d6, 0x200)
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/fetcher.go:348 +0x141
created by github.com/stealthly/go_kafka_client.func·013
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/fetcher.go:268 +0xb82

<<more here>>

Fix:
I am not sure if this has some unwanted side effects but I was able to fix this in go_kafka_client.fetcher.go:

func (f *consumerFetcherRoutine) processPartitionData(topicAndPartition TopicAndPartition, messages []*Message) {
    Trace(f, "Trying to acquire lock for partition processing")
    inReadLock(&f.manager.updateLock, func() {
        for f.manager.updateInProgress {
            f.manager.updatedCond.Wait()
        }
        Tracef(f, "Processing partition data for %s", topicAndPartition)
        if len(messages) > 0 {
+           if f.allPartitionMap[topicAndPartition] == nil{
+               return
+           }
            f.partitionMap[topicAndPartition] = messages[len(messages)-1].Offset + 1
            f.allPartitionMap[topicAndPartition].Buffer.addBatch(messages)
            Debugf(f, "Sent partition data to %s", topicAndPartition)
        } else {
            Trace(f, "Got empty message. Ignoring...")
        }
    })
}

"Failed awaiting on state barrier"

Related to PR #127 and 5b92ab3

We're seeing our staging environment produce the following log lines 3 minutes after startup:

2015-07-20/18:57:13 [ERROR] [zk] Failed awaiting on state barrier 705508e0b7ca3fc982c5f75af1d557bb [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/705508e0b7ca3fc982c5f75af1d557bb]
2015-07-20/18:57:43 [ERROR] [zk] Failed awaiting on state barrier 2ef6195d0b05c12077b7e7f12cf2b387 [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/2ef6195d0b05c12077b7e7f12cf2b387]
2015-07-20/18:57:54 [ERROR] [zk] Failed awaiting on state barrier 507988448b8ba0e9bb84ea28650fcaba [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/507988448b8ba0e9bb84ea28650fcaba]
2015-07-20/18:58:20 [ERROR] [zk] Failed awaiting on state barrier 01ec42775cf5701877fd78db18e05c90 [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/01ec42775cf5701877fd78db18e05c90]
2015-07-20/18:58:20 [ERROR] [zk] Failed awaiting on state barrier 01ec42775cf5701877fd78db18e05c90 [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/01ec42775cf5701877fd78db18e05c90]
2015-07-20/18:58:20 [ERROR] [zk] Failed awaiting on state barrier 01ec42775cf5701877fd78db18e05c90 [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/01ec42775cf5701877fd78db18e05c90]
2015-07-20/18:58:20 [ERROR] [zk] Failed awaiting on state barrier 01ec42775cf5701877fd78db18e05c90 [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/01ec42775cf5701877fd78db18e05c90]
2015-07-20/18:58:20 [ERROR] [zk] Failed awaiting on state barrier 01ec42775cf5701877fd78db18e05c90 [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/01ec42775cf5701877fd78db18e05c90]
2015-07-20/18:58:20 [ERROR] [zk] Failed awaiting on state barrier 01ec42775cf5701877fd78db18e05c90 [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/01ec42775cf5701877fd78db18e05c90]
2015-07-20/18:58:20 [ERROR] [zk] Failed awaiting on state barrier 01ec42775cf5701877fd78db18e05c90 [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/01ec42775cf5701877fd78db18e05c90]
2015-07-20/18:58:20 [ERROR] [zk] Failed awaiting on state barrier 01ec42775cf5701877fd78db18e05c90 [Timedout waiting for consensus on barrier path /consumers/scancheckgroup/api/rebalance/01ec42775cf5701877fd78db18e05c90]
panic: Could not reach consensus on state barrier.

goroutine 35 [running]:
github.com/stealthly/go_kafka_client.func·013()
    /home/user/src/github.com/stealthly/go_kafka_client/consumer.go:650 +0xb8d
github.com/stealthly/go_kafka_client.inLock(0xc2088305a8, 0xc2083d3f08)
    /home/user/src/github.com/stealthly/go_kafka_client/utils.go:44 +0x51
github.com/stealthly/go_kafka_client.(*Consumer).rebalance(0xc208830580)
    /home/user/src/github.com/stealthly/go_kafka_client/consumer.go:671 +0x73
github.com/stealthly/go_kafka_client.(*Consumer).reinitializeConsumer(0xc208830580)
    /home/user/src/github.com/stealthly/go_kafka_client/consumer.go:295 +0x4e
github.com/stealthly/go_kafka_client.(*Consumer).createMessageStreams(0xc208830580, 0xc2083cd470)
    /home/user/src/github.com/stealthly/go_kafka_client/consumer.go:270 +0x159
created by github.com/stealthly/go_kafka_client.(*Consumer).StartStatic
    /home/user/src//github.com/stealthly/go_kafka_client/consumer.go:112 +0x3c

This happens every time we start the client on that environment... which means we're guessing it's something to do with config within that environment (we're not seeing it in our development environment).

@marcusking01 or others, what information could help us work out the root cause? Or where should we look?

tail -f kafka producer supporting LogLine when source and/or tag and/or log type set

Similar to how syslog producer works, tail a file (configurable on command line) https://github.com/aybabtme/tailf and produce that to kafka. The arguments from command line and docker image should be almost the same if not exact t that of syslog (when appropriate). If --source, --tag or --log.type are set then transform to the LogLine https://github.com/stealthly/go_kafka_client/blob/master/syslog_producer.go#L212 like sys log does

Consumer stops consuming a partition after several empty fetches

Moved from #1

vladiacob
If I start consumer and after this I start producer I don't get messages in consumer. After this if I stop consumer and start again I get all messages sent before but if I try to send a new message is the same behaviour.

Given a running consumer
If the consumer then doesn't get a message within config.FetchMaxRetries * config.RequeueAskNextBackoff it disconnects this partition and stops consuming it until the consumer is restarted

Rebalance state issue with kafka topic order

There's an issue in the state rebalancing code when zookeeper nodes in a cluster return the list of kafka topics in a different order. The stateHash is computed from the broker Ids, Consumers, topics, and partitions in the topics. However, this hash is going to be inconsistent across consumers that are connected to different zookeeper nodes that are returning topic lists in different orders.

The main bad behavior I've seen with this is that if you start multiple consumers up simultaneously, they get stuck in a rebalance loop and never begin processing events.

The fix I found that worked was simply to sort the list of kafka topics returned from zookeeper, before creating the hash.

tailf fails to build

% go get github.com/stealthly/go_kafka_client/producers/tailf                                                                                                    
# github.com/stealthly/go_kafka_client/producers/tailf
../../../.gvm/pkgsets/go1.4.2/global/src/github.com/stealthly/go_kafka_client/producers/tailf/tailf.go:49: undefined: sarama.NewClientConfig
../../../.gvm/pkgsets/go1.4.2/global/src/github.com/stealthly/go_kafka_client/producers/tailf/tailf.go:50: too many arguments in call to sarama.NewClient
../../../.gvm/pkgsets/go1.4.2/global/src/github.com/stealthly/go_kafka_client/producers/tailf/tailf.go:56: undefined: sarama.NewProducerConfig
../../../.gvm/pkgsets/go1.4.2/global/src/github.com/stealthly/go_kafka_client/producers/tailf/tailf.go:57: undefined: sarama.NewProducer

Support OffsetCommit and OffsetFetch API for storing offsets in Kafka instead of Zookeeper

We should also bench mark this against the different connection options too (so 3 tests total)

  1. with zookeeper
  2. without zookeeper using Sarama
  3. without zookeeper with librdkafka

While this is a good option for folks the way we are handling the commits in batch (per partition) reduces the overall Zookeeper load drastically while still keeping at least once processing (so after work is done) semantics. We should be able to benchmark that also through different batch size tests.

Error when try to start my application with go_kafka_client included

This is what error i got:

github.com/stealthly/go_kafka_client
../../../github.com/stealthly/go_kafka_client/testing_utils.go:33: not enough arguments in call to zk.StartTestCluster
../../../github.com/stealthly/go_kafka_client/testing_utils.go:148: too many arguments in call to producer.NewKafkaProducer
../../../github.com/stealthly/go_kafka_client/testing_utils.go:151: p.Send undefined (type *producer.KafkaProducer has no field or method Send)

I commented content from the testing_utils.go and after this I passed this error but i got another one:

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x70 pc=0x4593c2]

goroutine 1 [running]:
runtime.panic(0x72fbc0, 0xba6108)
    /usr/lib/go/src/pkg/runtime/panic.c:266 +0xb6
github.com/stealthly/go_kafka_client.Infof(0x6985a0, 0xc21007df30, 0x6985a0, 0xc21007df40, 0xc21007df20, ...)
    /home/vagrant/workspace/ideazz-go/src/github.com/stealthly/go_kafka_client/utils.go:55 +0x112
github.com/stealthly/go_kafka_client.NewConsumer(0xc210050580, 0x0)
    /home/vagrant/workspace/ideazz-go/src/github.com/stealthly/go_kafka_client/consumer.go:84 +0x18c
main.main()
    /home/vagrant/workspace/ideazz-go/src/bitbucket.org/viacob/ideazz-indexer/main.go:46 +0x3e5

goroutine 3 [semacquire]:
sync.runtime_Syncsemacquire(0xc21005ff90)
    /usr/lib/go/src/pkg/runtime/sema.goc:257 +0xca
sync.(*Cond).Wait(0xc21005ff80)
    /usr/lib/go/src/pkg/sync/cond.go:62 +0x89
github.com/cihub/seelog.(*asyncLoopLogger).processItem(0xc210039780, 0x0)
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:50 +0x95
github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc210039780)
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x31
created by github.com/cihub/seelog.newAsyncLoopLogger
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0x74

goroutine 4 [semacquire]:
sync.runtime_Syncsemacquire(0xc21005f210)
    /usr/lib/go/src/pkg/runtime/sema.goc:257 +0xca
sync.(*Cond).Wait(0xc21005f200)
    /usr/lib/go/src/pkg/sync/cond.go:62 +0x89
github.com/cihub/seelog.(*asyncLoopLogger).processItem(0xc2100398a0, 0x0)
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:50 +0x95
github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc2100398a0)
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x31
created by github.com/cihub/seelog.newAsyncLoopLogger
    /home/vagrant/workspace/ideazz-go/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0x74

goroutine 6 [syscall]:
os/signal.loop()
    /usr/lib/go/src/pkg/os/signal/signal_unix.go:21 +0x1e
created by os/signal.init·1
    /usr/lib/go/src/pkg/os/signal/signal_unix.go:27 +0x31
exit status 2

My code is:

    // Zookeeper config
    zkConfig := kafka.NewZookeeperConfig()
    zkConfig.ZookeeperConnect = []string{fmt.Sprintf("localhost:%d", os.Getenv("IDEAZZ_ZOOKEEPER_PORT"))}

    // Initialize kafka config
    config := kafka.DefaultConsumerConfig()
    config.AutoOffsetReset = kafka.SmallestOffset
    config.Groupid = "ideazz-product-group"
    config.Coordinator = kafka.NewZookeeperCoordinator(zkConfig)
    config.Strategy = func(_ *kafka.Worker, message *kafka.Message, id kafka.TaskId) kafka.WorkerResult {
        return kafka.NewSuccessfulResult(id)
    }
    config.WorkerFailureCallback = func(worker_manager *kafka.WorkerManager) kafka.FailedDecision {
        return kafka.DoNotCommitOffsetAndContinue
    }
    config.WorkerFailedAttemptCallback = func(_ *kafka.Task, _ kafka.WorkerResult) kafka.FailedDecision {

        return kafka.DoNotCommitOffsetAndContinue
    }

    consumer := kafka.NewConsumer(config)
    go consumer.StartStatic(map[string]int{"new-product": 3})

panic when i start use config.OffsetStorage = NewSiestaClient(config)

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x48 pc=0x453c79]

goroutine 35 [running]:
github.com/stealthly/go_kafka_client.(_SiestaClient).GetOffset(0xc20805e160, 0xc2080d605e, 0x11, 0xc2080b3139, 0x5, 0xc200000000, 0x77be00, 0x0, 0x0)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/low_level_client.go:330 +0x89
github.com/stealthly/go_kafka_client.(_Consumer).fetchOffsets(0xc20807a0b0, 0xc208038100, 0x1, 0x1, 0x1, 0x0, 0x0)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:785 +0x133
github.com/stealthly/go_kafka_client.tryRebalance(0xc20807a0b0, 0xc2084321b0, 0x9e58e0, 0xc2080d605e)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:708 +0x27b
github.com/stealthly/go_kafka_client.func·011()
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:676 +0xe58
github.com/stealthly/go_kafka_client.inLock(0xc20807a0d8, 0xc20842ff08)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/utils.go:44 +0x51
github.com/stealthly/go_kafka_client.(_Consumer).rebalance(0xc20807a0b0)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:695 +0x73
github.com/stealthly/go_kafka_client.(_Consumer).reinitializeConsumer(0xc20807a0b0)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:305 +0x4e
github.com/stealthly/go_kafka_client.(_Consumer).createMessageStreams(0xc20807a0b0, 0xc20809c780)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:280 +0x190
created by github.com/stealthly/go_kafka_client.(_Consumer).StartStatic
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:110 +0x3c

goroutine 1 [select]:
github.com/stealthly/go_kafka_client.(_Consumer).startStreams(0xc20807a0b0)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:188 +0x9f9
github.com/stealthly/go_kafka_client.(_Consumer).StartStatic(0xc20807a0b0, 0xc20809c780)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:112 +0x4c
main.(*DemoConsumer).Start(0xc20801ea00)
/home/eyu/workspace/src/ac-kafka/demo/simple_consumer/simple_consumer.go:25 +0xa7
main.main()
/home/eyu/workspace/src/ac-kafka/demo/simple_consumer/simple_consumer.go:66 +0x12b

goroutine 5 [chan receive]:
ac-common-go/glog.(*loggingT).flushDaemon(0xb532e0)
/home/eyu/workspace/src/ac-common-go/glog/glog.go:868 +0x78
created by ac-common-go/glog.init·1
/home/eyu/workspace/src/ac-common-go/glog/glog.go:411 +0x31d

goroutine 17 [syscall, locked to thread]:
runtime.goexit()
/home/eyu/go/src/runtime/asm_amd64.s:2232 +0x1

goroutine 7 [semacquire]:
sync.(_Cond).Wait(0xc20802afc0)
/home/eyu/go/src/sync/cond.go:62 +0x9e
github.com/cihub/seelog.(_asyncLoopLogger).processItem(0xc208082000, 0x420200)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:50 +0xe0
github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc208082000)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x47
created by github.com/cihub/seelog.newAsyncLoopLogger
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0x8e

goroutine 8 [semacquire]:
sync.(_Cond).Wait(0xc20802b140)
/home/eyu/go/src/sync/cond.go:62 +0x9e
github.com/cihub/seelog.(_asyncLoopLogger).processItem(0xc208082100, 0x0)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:50 +0xe0
github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc208082100)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x47
created by github.com/cihub/seelog.newAsyncLoopLogger
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0x8e

goroutine 9 [semacquire]:
sync.(_Cond).Wait(0xc20802b6c0)
/home/eyu/go/src/sync/cond.go:62 +0x9e
github.com/cihub/seelog.(_asyncLoopLogger).processItem(0xc208082280, 0x0)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:50 +0xe0
github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc208082280)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x47
created by github.com/cihub/seelog.newAsyncLoopLogger
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0x8e

goroutine 10 [runnable]:
github.com/stealthly/go_kafka_client.(_TopicAndPartition).String(0xc2083ff960, 0x8248e0, 0xc2083ff960)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/structs.go:128
fmt.(_pp).handleMethods(0xc208090000, 0xc200000073, 0x0, 0x1)
/home/eyu/go/src/fmt/print.go:720 +0x579
fmt.(_pp).printArg(0xc208090000, 0x8248e0, 0xc2083ff960, 0x73, 0x0, 0x0)
/home/eyu/go/src/fmt/print.go:794 +0x3b8
fmt.(_pp).doPrintf(0xc208090000, 0xc2083ff9c0, 0x1d, 0xc2083ff980, 0x2, 0x2)
/home/eyu/go/src/fmt/print.go:1183 +0x21cd
fmt.Sprintf(0xc2083ff9c0, 0x1d, 0xc2083ff980, 0x2, 0x2, 0x0, 0x0)
/home/eyu/go/src/fmt/print.go:203 +0x78
github.com/cihub/seelog.(_logFormattedMessage).String(0xc2080b5d70, 0x0, 0x0)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/logger.go:361 +0x62
github.com/cihub/seelog.(_commonLogger).processLogMsg(0xc208082400, 0x86b802, 0x7f04efa14a60, 0xc2080b5d70, 0x7f04efa14a88, 0xc20843e180)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/logger.go:304 +0x8a
github.com/cihub/seelog.(_asyncLogger).processQueueElement(0xc208082400)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclogger.go:115 +0xd3
github.com/cihub/seelog.(_asyncLoopLogger).processItem(0xc208082400, 0x0)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:57 +0x132
github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc208082400)
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x47
created by github.com/cihub/seelog.newAsyncLoopLogger
/home/eyu/workspace/src/ac-kafka/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0x8e

goroutine 18 [semacquire]:
sync.(_WaitGroup).Wait(0xc20805e1e0)
/home/eyu/go/src/sync/waitgroup.go:132 +0x169
github.com/samuel/go-zookeeper/zk.(_Conn).loop(0xc2080b80e0)
/home/eyu/workspace/src/ac-kafka/src/github.com/samuel/go-zookeeper/zk/conn.go:287 +0x78c
github.com/samuel/go-zookeeper/zk.func·001()
/home/eyu/workspace/src/ac-kafka/src/github.com/samuel/go-zookeeper/zk/conn.go:185 +0x2c
created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer
/home/eyu/workspace/src/ac-kafka/src/github.com/samuel/go-zookeeper/zk/conn.go:189 +0x606

goroutine 19 [chan receive]:
github.com/stealthly/go_kafka_client.(_ZookeeperCoordinator).listenConnectionEvents(0xc20803ade0, 0xc2080e0060)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:90 +0x68
created by github.com/stealthly/go_kafka_client.(_ZookeeperCoordinator).Connect
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:67 +0xe1

goroutine 15 [select]:
github.com/Shopify/sarama.(_client).backgroundMetadataUpdater(0xc2080b6000)
/home/eyu/workspace/src/ac-kafka/src/github.com/Shopify/sarama/client.go:553 +0x2f3
github.com/Shopify/sarama._client.(github.com/Shopify/sarama.backgroundMetadataUpdater)·fm()
/home/eyu/workspace/src/ac-kafka/src/github.com/Shopify/sarama/client.go:142 +0x27
github.com/Shopify/sarama.withRecover(0xc20800af10)
/home/eyu/workspace/src/ac-kafka/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.NewClient
/home/eyu/workspace/src/ac-kafka/src/github.com/Shopify/sarama/client.go:142 +0x8ce

goroutine 11 [select]:
github.com/samuel/go-zookeeper/zk.(_Conn).sendLoop(0xc2080b80e0, 0x7f04efa14ea8, 0xc2080ec010, 0xc2080347e0, 0x0, 0x0)
/home/eyu/workspace/src/ac-kafka/src/github.com/samuel/go-zookeeper/zk/conn.go:483 +0xce9
github.com/samuel/go-zookeeper/zk.func·002()
/home/eyu/workspace/src/ac-kafka/src/github.com/samuel/go-zookeeper/zk/conn.go:272 +0x5a
created by github.com/samuel/go-zookeeper/zk.(_Conn).loop
/home/eyu/workspace/src/ac-kafka/src/github.com/samuel/go-zookeeper/zk/conn.go:275 +0x69f

goroutine 12 [IO wait]:
net.(_pollDesc).Wait(0xc2080100d0, 0x72, 0x0, 0x0)
/home/eyu/go/src/net/fd_poll_runtime.go:84 +0x47
net.(_pollDesc).WaitRead(0xc2080100d0, 0x0, 0x0)
/home/eyu/go/src/net/fd_poll_runtime.go:89 +0x43
net.(_netFD).Read(0xc208010070, 0xc208276000, 0x4, 0x180000, 0x0, 0x7f04efa10fb8, 0xc20843a670)
/home/eyu/go/src/net/fd_unix.go:242 +0x40f
net.(_conn).Read(0xc2080ec010, 0xc208276000, 0x4, 0x180000, 0x0, 0x0, 0x0)
/home/eyu/go/src/net/net.go:121 +0xdc
io.ReadAtLeast(0x7f04efa15f68, 0xc2080ec010, 0xc208276000, 0x4, 0x180000, 0x4, 0x0, 0x0, 0x0)
/home/eyu/go/src/io/io.go:298 +0xf1
io.ReadFull(0x7f04efa15f68, 0xc2080ec010, 0xc208276000, 0x4, 0x180000, 0x0, 0x0, 0x0)
/home/eyu/go/src/io/io.go:316 +0x6d
github.com/samuel/go-zookeeper/zk.(_Conn).recvLoop(0xc2080b80e0, 0x7f04efa14ea8, 0xc2080ec010, 0x0, 0x0)
/home/eyu/workspace/src/ac-kafka/src/github.com/samuel/go-zookeeper/zk/conn.go:547 +0x1b6
github.com/samuel/go-zookeeper/zk.func·003()
/home/eyu/workspace/src/ac-kafka/src/github.com/samuel/go-zookeeper/zk/conn.go:279 +0x5f
created by github.com/samuel/go-zookeeper/zk.(_Conn).loop
/home/eyu/workspace/src/ac-kafka/src/github.com/samuel/go-zookeeper/zk/conn.go:285 +0x77c

goroutine 14 [chan receive]:
github.com/Shopify/sarama.(_Broker).responseReceiver(0xc2080da070)
/home/eyu/workspace/src/ac-kafka/src/github.com/Shopify/sarama/broker.go:345 +0xe3
github.com/Shopify/sarama._Broker.(github.com/Shopify/sarama.responseReceiver)·fm()
/home/eyu/workspace/src/ac-kafka/src/github.com/Shopify/sarama/broker.go:98 +0x27
github.com/Shopify/sarama.withRecover(0xc20800ac20)
/home/eyu/workspace/src/ac-kafka/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.func·006
/home/eyu/workspace/src/ac-kafka/src/github.com/Shopify/sarama/broker.go:98 +0x666

goroutine 16 [chan receive]:
github.com/rcrowley/go-metrics.(*meterArbiter).tick(0xb52f40)
/home/eyu/workspace/src/ac-kafka/src/github.com/rcrowley/go-metrics/meter.go:221 +0x52
created by github.com/rcrowley/go-metrics.NewMeter
/home/eyu/workspace/src/ac-kafka/src/github.com/rcrowley/go-metrics/meter.go:40 +0x1f7

goroutine 34 [chan receive]:
github.com/stealthly/go_kafka_client.func·001()
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:95 +0x48
created by github.com/stealthly/go_kafka_client.NewConsumer
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:99 +0x76f

goroutine 36 [select]:
github.com/stealthly/go_kafka_client.func·003()
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:231 +0x2c9
created by github.com/stealthly/go_kafka_client.(*Consumer).maintainCleanCoordinator
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:239 +0xdf

goroutine 22 [select]:
reflect.Select(0xc2080ae1c0, 0x5, 0x8, 0x0, 0x0, 0x0, 0x0, 0x0)
/home/eyu/go/src/reflect/value.go:1965 +0x218
github.com/stealthly/go_kafka_client.func·035()
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/utils.go:197 +0x7a
created by github.com/stealthly/go_kafka_client.redirectChannelsToWithTimeout
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/utils.go:216 +0xbdb

goroutine 23 [select]:
github.com/stealthly/go_kafka_client.func·050()
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:511 +0x1634
created by github.com/stealthly/go_kafka_client.(*ZookeeperCoordinator).trySubscribeForChanges
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:562 +0xbdb

goroutine 24 [select]:
github.com/stealthly/go_kafka_client.func·010()
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:584 +0x510
created by github.com/stealthly/go_kafka_client.(*Consumer).subscribeForChanges
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/consumer.go:621 +0x1dc

goroutine 25 [chan receive]:
github.com/stealthly/go_kafka_client.func·051(0xc20808d6e0)
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:757 +0x3d
created by github.com/stealthly/go_kafka_client.(*ZookeeperCoordinator).waitForMembersToJoin
/home/eyu/workspace/src/ac-kafka/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:773 +0x45e

low level consumer throws invalid memory address on calling go_kafka_client.(*SaramaClient).Initialize

low level consumer throws invalid memory address on calling go_kafka_client.(*SaramaClient).Initialize

note: zookeeper string and kafka string is valid, and tested using kafka console producer and kafka console consumer

2015/08/12 03:42:58 subscription.go:87: Attempting to add subscriber for topic1 aarti
2015/08/12 03:42:58 subscription.go:117: creating consumer:
2015/08/12 03:42:58 subscription.go:122: zookeeper connect is: 172.17.42.1:2181
2015/08/12 03:42:58 server.go:1775: http: panic serving 172.17.42.1:60480: runtime error: invalid memory address or nil pointer dereference
goroutine 12 [running]:
net/http.func·011()
/usr/src/go/src/net/http/server.go:1130 +0xbb
github.com/samuel/go-zookeeper/zk.(_Conn).nextXid(0x0, 0x600351)
/programming/reliable_message_channel_server/src/github.com/samuel/go-zookeeper/zk/conn.go:643 +0x4f
github.com/samuel/go-zookeeper/zk.(_Conn).queueRequest(0x0, 0xc, 0x855720, 0xc208084220, 0x855780, 0xc208030660, 0x0, 0xc2080f1690)
/programming/reliable_message_channel_server/src/github.com/samuel/go-zookeeper/zk/conn.go:658 +0x28
github.com/samuel/go-zookeeper/zk.(_Conn).request(0x0, 0xc, 0x855720, 0xc208084220, 0x855780, 0xc208030660, 0x0, 0x2, 0x0, 0x0)
/programming/reliable_message_channel_server/src/github.com/samuel/go-zookeeper/zk/conn.go:670 +0x8c
github.com/samuel/go-zookeeper/zk.(_Conn).Children(0x0, 0xa79170, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0)
/programming/reliable_message_channel_server/src/github.com/samuel/go-zookeeper/zk/conn.go:681 +0x127
github.com/stealthly/go_kafka_client.(_ZookeeperCoordinator).tryGetAllBrokers(0xc2080f33b0, 0x0, 0x0, 0x0, 0x0, 0x0)
/programming/reliable_message_channel_server/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:401 +0x1a3
github.com/stealthly/go_kafka_client.(_ZookeeperCoordinator).GetAllBrokers(0xc2080f33b0, 0x0, 0x0, 0x0, 0x0, 0x0)
/programming/reliable_message_channel_server/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:387 +0x8f
github.com/stealthly/go_kafka_client.BootstrapBrokers(0x7f2f2a65bf18, 0xc2080f33b0, 0x0, 0x0, 0x0, 0x0, 0x0)
/programming/reliable_message_channel_server/src/github.com/stealthly/go_kafka_client/low_level_client.go:347 +0xc4
github.com/stealthly/go_kafka_client.(*SaramaClient).Initialize(0xc2080841e0, 0x0, 0x0)
/programming/reliable_message_channel_server/src/github.com/stealthly/go_kafka_client/low_level_client.go:72 +0x5b
app.createConsumer(0xc2080f1a70, 0xc, 0xc2080841a0, 0x5)
/programming/reliable_message_channel_server/src/app/subscription.go:127 +0x412

Installation failed

Try to download the master version, but got this error.
$ go get github.com/stealthly/go_kafka_client

github.com/stealthly/go_kafka_client

src/github.com/stealthly/go_kafka_client/producer_sarama.go:26: undefined: sarama.Producer

update the syslog server to be durable during network failures

One issue now is that if the network card or such has an issue the messages will get lost. There are a lot of things that happen on a local machine that we want to keep track of even if the network is out or blips. We should capture these messages in a localized data store like boltdb. We should make sure that when we configure the server to be durable we have acks=-1 and use the database to make sure we send everything that actually came in AND also got a successful response from Kafka.

Logger allocates strings that are never used

Calls to Trace and Tracef cause a string allocation even when the log level is set to INFO. This generates a lot of garbage, slowing down the garbage collector. In particular the Trace calls in message_buffer.go cause millions of allocations.

I'd be willing to submit a pull request, but am unsure how best to fix this.

Consuming multiple topics

Hey guys, I'm observing some strange behavior, and I was wondering if I'm doing something wrong..

My goals are fairly simple:

  • Consume 2 topics, one with 16 partitions, one with 4.
  • Each topic has its own "Strategy", e.g they contain different "types" of jobs
  • I want to have 2 instances of the consumer service active at all times. I would prefer that they distribute the work evenly, but I do not mind if it all goes to one of them for the time being, as long as it keeps up.
  • It is important to me that if one of the consumer services dies, the other is able to "steal" all of the partitions.

Based on my understanding of this library, the above goals are basically exactly what it was written for. Here is what I am experiencing:

Originally there was only 1 topic, the 16 partition one. We were able to implement that no problem once @evanjones figured out the issue he fixed here.

My first stab at adding the second topic was to create a second Consumer from a second instance of ConsumerConfig, with its own Strategy, WorkerFailureCallback and WorkerFailedAttemptCallback. I was calling StartStatic on each instance, passing the appropriate topic/consumer map to each. Each got a map with just one key in it (the respective topic) with the value 16, since I do want each single instance of the consumer service to be able to process all the partitions in the bigger topic should the other instance die.

When I started the service up with this configuration, even if I was only starting one instance of the service, the consumer could never quite get past the "Joining state barrier" phase - it would just repeat the same set of 4 messages about "Commencing assertion" and "Joining state barrier" at intervals. Pressed for time, I thought "Fine, the Strategy function does get the topic from which the message came passed to it, so I'll just implement a strategy that can handle a number of topics, create a single instance of Consumer with that Strategy, and call StartStatic once with a map of multiple keys." So I wrote this little wrapper.

This seemed to initialize fine, but in spite of running two copies of the service, the second smaller topic does not appear to be getting processed in full (I'm still working on setting up monitoring to see exactly which partitions are and are not being processed; the fact that NO offsets have been committed for one of the partitions by the worker group is breaking kafka.tools.ConsumerOffsetChecker, which is how we were exporting the data before, and I have not figured out a better way yet). It also appears as if jobs that ARE being processed are getting processed twice, at least based on the logs I'm seeing.

I thought I would post here and see if I am making some common mistake or falling into a known pitfall before I went gangbusters writing a reproduction and digging deep into the code. Do you have any advice for isolating the issue and for figuring out exactly what each consumer is doing or trying to do? If we find a bug here, I'll happily contribute a patch, but I do remain optimistic that I'm simply mis-using the library :)

No API document?

I'm looking for document for a long time but nothing found. I wonder if there's an API document for this project.

Manual commits

Right now committing is done every N configured seconds. However I would like to be able to manually commit from the messagehandler, and not have the timed autocommit.

Usecase:
Store reporting data in some other db aggregated per N messages.
Consumer runs over topics, stores aggregated data in temp datastuct and after N records stores that data in some other way. Then and only then a commit of the offsets is performed, and the cycle can start again.

Essentially this would be needed for all multiple record transaction based systems.

WorkerManager's currentBatch map is susceptible to races

Running go test with the -race flag makes this clear:

go test -v -race -run 'TestWorkerManager' .
=== RUN TestWorkerManager
==================
WARNING: DATA RACE
Write by goroutine 14:
  runtime.mapdelete()
      /usr/local/go/src/runtime/hashmap.go:495 +0x0
  github.com/stealthly/go_kafka_client.(*WorkerManager).taskIsDone()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/workers.go:332 +0x162
  github.com/stealthly/go_kafka_client.(*WorkerManager).taskSucceeded()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/workers.go:326 +0x232
  github.com/stealthly/go_kafka_client.(*WorkerManager).processBatch()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/workers.go:242 +0x618

Previous read by goroutine 13:
  runtime.mapaccess1()
      /usr/local/go/src/runtime/hashmap.go:244 +0x0
  github.com/stealthly/go_kafka_client.func·043()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/workers.go:155 +0x7c6
  github.com/stealthly/go_kafka_client.inLock()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/utils.go:44 +0x5e
  github.com/stealthly/go_kafka_client.(*WorkerManager).startBatch()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/workers.go:168 +0xa3
  github.com/stealthly/go_kafka_client.func·040()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/workers.go:107 +0x71
  github.com/rcrowley/go-metrics.(*StandardTimer).Time()
      /home/spencer/go/src/github.com/rcrowley/go-metrics/timer.go:212 +0x51
  github.com/stealthly/go_kafka_client.(*WorkerManager).Start()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/workers.go:108 +0x559

Goroutine 14 (running) created at:
  github.com/stealthly/go_kafka_client.(*WorkerManager).Start()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/workers.go:92 +0x7f

Goroutine 13 (running) created at:
  github.com/stealthly/go_kafka_client.TestWorkerManager()
      /home/spencer/go/src/github.com/stealthly/go_kafka_client/workers_test.go:123 +0x365
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:447 +0x133
==================
--- PASS: TestWorkerManager (3.00s)
PASS
ok      github.com/stealthly/go_kafka_client    3.010s

This race causes quick crashes in Go 1.5beta1. I'm not sure exactly why, but it seems like the new incremental garbage collector is unhappy with this map, which makes some sense.

Documentation

Hi, is there documentation of the library somewhere?

introduce go syslog collector

make client support syslog as entry point for kafka producing of data so that folks can write to syslog and we produce to kafka for them

High CPU usage while doing nothing?

Hi,

Ive just started playing with Kafka and go_kafka_client today, so this might be a silly question...

I am running single node kafla (0.8.2.0) and zookeeper (came bundled with kafka).

I see my Go example consumes 130 - 140% CPU (as seen by top on 8 thread system) even when there is no activity, i.e. no messages being posted or received, no re-balance in progress. If i send message from a producer, i can see this consumer receiving it.

My code : https://gist.github.com/sajal/6df94621335caa77f80f running on go1.3.3 linux/amd64

In this case topic test is a single partition topic and was created using bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Any pointers as to what might be using so much CPU ?

-Sajal

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.