Giter VIP home page Giter VIP logo

ketu's Introduction

Ketu

A Clojure Apache Kafka client with core.async api

[com.appsflyer/ketu "0.6.0"]

Features

  • Channels API: Take kafka data from a channel and send data to kafka through a channel.
  • Consumer Source: Polls records from kafka and puts them on a channel.
  • Producer Sink: Takes records from a channel and sends them to kafka.
  • Shapes: Transform the original objects of the java client to clojure data and back.
  • Simple Configuration: Friendly, validated configuration.

Minimal Example

Consume a name string from kafka and produce a greeting string for that name back into kafka, all through channels:

(ns example
  (:require [clojure.core.async :refer [chan close! <!! >!!]]
            [ketu.async.source :as source]
            [ketu.async.sink :as sink]))
  
(let [<names (chan 10)
      source-opts {:name "greeter-consumer"
                   :brokers "broker1:9092"
                   :topic "names"
                   :group-id "greeter"
                   :value-type :string
                   :shape :value}
      source (source/source <names source-opts)

      >greets (chan 10)
      sink-opts {:name "greeter-producer"
                 :brokers "broker2:9091"
                 :topic "greetings"
                 :value-type :string
                 :shape :value}
      sink (sink/sink >greets sink-opts)]

  ;; Consume a name and produce a greeting. You could also do this with e.g. clojure.core.async/pipeline.
  (->> (<!! <names)
       (str "Hi, ")
       (>!! >greets))

  ;; Close the source. It automatically closes the source channel `<names`.
  (source/stop! source)
  ;; Close the sink channel `>greets`. It causes the sink to close itself as a consequence.
  (close! >greets))

Configuration reference

Anything that is not documented is not supported and might change.

Note: int is used for brevity but can also mean long. Don't worry about it.

Common options (both source and sink accept these)

Key Type Req? Notes
:brokers string required Comma separated host:port values e.g "broker1:9092,broker2:9092"
:topic string required
:name string required Simple human-readable identifier, used in logs and thread names
:key-type :string,:byte-array optional Default :byte-array, used in configuring key serializer/deserializer
:value-type :string,:byte-array optional Default :byte-array, used in configuring value serializer/deserializer
:internal-config map optional A map of the underlying java client properties, for any extra lower level config

Consumer-source options

Key Type Req? Notes
:group-id string required
:shape :value:, [:vector <fields>],[:map <fields>], or an arity-1 function of ConsumerRecord optional If unspecified, channel will contain ConsumerRecord objects. Examples

Producer-sink options

Key Type Req? Notes
:shape :value, [:vector <fields>],[:map <fields>], or an arity-1 function of the input returning ProducerRecord optional If unspecified, you must put ProducerRecord objects on the channel. Examples
:compression-type "none" "gzip" "snappy" "lz4" "zstd" optional Default "none", values are same as "compression.type" of the java producer
:workers int optional Default 1, number of threads that take from the channel and invoke the internal producer

Data shapes

You don't have to deal with ConsumerRecord or ProducerRecord objects.
To get a clojure data structure with any of the ConsumerRecord fields, configure the consumer shape:

; Value only:
{:topic "names"
 :key-type :string
 :value-type :string
 :shape :value}
(<!! consumer-chan)
;=> "v"

; Vector:
{:shape [:vector :key :value :topic]}
(<!! consumer-chan)
;=> ["k" "v" "names"]

; Map
{:shape [:map :key :value :topic]}
(<!! consumer-chan)
;=> {:key "k", :value "v", :topic "names"}

Similarly, to put a clojure data structure on the producer channel:

; Value only:
{:key-type :string
 :value-type :string
 :shape :value}
(>!! producer-chan "v")

; Vector:
{:shape [:vector :key :value]}
(>!! producer-chan ["k" "v"])

; Vector with topic in each message:
{:shape [:vector :key :value :topic]}
(>!! producer-chan ["k1" "v1" "names"])
(>!! producer-chan ["k2" "v2" "events"])

Development & Contribution

We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests, or contact us at clojurians slack.

ketu's People

Contributors

yonatane avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.