Giter VIP home page Giter VIP logo

clj-kafka's Introduction

clj-kafka

Clojure library for Kafka.

Current build status: Build Status

Development is against the 0.8 release of Kafka.

Installing

Add the following to your Leiningen project.clj:

latest clj-kafka version

Usage

Producer

Discovery of Kafka brokers from Zookeeper:

(brokers {"zookeeper.connect" "127.0.0.1:2181"})
;; ({:host "localhost", :jmx_port -1, :port 9999, :version 1})
(use 'clj-kafka.producer)

(def p (producer {"metadata.broker.list" "localhost:9999"
                  "serializer.class" "kafka.serializer.DefaultEncoder"
                  "partitioner.class" "kafka.producer.DefaultPartitioner"}))

(send-message p (message "test" (.getBytes "this is my message")))

See: clj-kafka.producer

New Producer

As of 0.3.1 we also support the "new" pure-Java producer. The interface is superficially similar but we've chosen to keep names close to their Java equivalent.

(use 'clj-kafka.new.producer)

(with-open [p (producer {"bootstrap.servers" "127.0.0.1:9092"} (byte-array-serializer) (byte-array-serializer))]
  (send p (record "test-topic" (.getBytes "hello world!"))))

One key difference is that sending is asynchronous by default. send returns a Future immediately. If you want synchronous behaviour you can deref it right away:

(with-open [p (producer {"bootstrap.servers" "127.0.0.1:9092"} (byte-array-serializer) (byte-array-serializer))]
  @(send p (record "test-topic" (.getBytes "hello world!"))))

See: clj-kafka.new.producer

Zookeeper Consumer

The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.

(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)

(def config {"zookeeper.connect" "localhost:2182"
             "group.id" "clj-kafka.consumer"
             "auto.offset.reset" "smallest"
             "auto.commit.enable" "false"})

(with-resource [c (consumer config)]
  shutdown
  (take 2 (messages c "test")))

The messages function provides the easy-case of single topic and single thread consumption. This is a stricter form of the same API that was in earlier releases. messages is built on two key other functions: create-message-streams and stream-seq that create the underlying streams and turn them into lazy sequences respectively; this change makes it easier to consume across multiple partitions and threads.

See: clj-kafka.consumer.zk

Usage with transducers

An alternate way of consuming is using create-message-stream or create-message-streams to obtain KafkaStream instances. These are Iterable which means, amongst other things, that they work nicely with transducers.

Continuing previous example:

;; hypothetical transformation
(def xform (comp (map deserialize-message)
                 (filter production-traffic)
                 (map parse-user-agent-string)))

(with-resource [c (consumer config)]
  shutdown
  (let [stream (create-message-stream c "test-topic")]
    (run! write-to-database! (eduction xform stream))))

Administration Operations

There is support the following simple administration operations:

  • checking if a topic exists
  • creating a topic
  • deleting a topic (requires that the Kafka cluster supports deletion and has delete.topic.enable set to true)
  • retrieving topic configuration
  • changing topic configuration
(require '[clj-kafka.admin :as admin])

(with-open [zk (admin/zk-client "127.0.0.1:2181")]
  (if-not (admin/topic-exists? zk "test-topic")
    (admin/create-topic zk "test-topic"
                        {:partitions 3
                         :replication-factor 1
                         :config {"cleanup.policy" "compact"}})))

See: clj-kafka.admin

Kafka Offset Manager Operations

There is support the following simple Kafka offset management operations:

  • fetch the current offsets of a consumer group
  • reset the current offsets of a consumer group
(require '[clj-kafka.offset :as offset])

(fetch-consumer-offsets "broker1:9092,broker1:9092" {"zookeeper.connect" "zkhost:2182"} "my-topic" "my-consumer")
(reset-consumer-offsets "broker1:9092,broker1:9092" {"zookeeper.connect" "zkhost:2182"} "my-topic" "my-consumer" :earliest)
(reset-consumer-offsets "broker1:9092,broker1:9092" {"zookeeper.connect" "zkhost:2182"} "my-topic" "my-consumer" :latest)

See: clj-kafka.admin

License

Copyright © 2013 Paul Ingles

Distributed under the Eclipse Public License, the same as Clojure.

Thanks

YourKit is kindly supporting this open source project with its full-featured Java Profiler. YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications. Take a look at YourKit's leading software products: YourKit Java Profiler and YourKit .NET Profiler.

clj-kafka's People

Contributors

pingles avatar ragnard avatar quantisan avatar rsslldnphy avatar devn avatar vincentbernat avatar kelveden avatar benedekfazekas avatar christianblunden avatar daddye avatar dayooliyide avatar fatlazycat avatar cm-cnnxty avatar strongh avatar jpfuentes2 avatar dinedal avatar ottbot avatar prepor avatar rwat avatar

Watchers

James Cloos avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.