Giter VIP home page Giter VIP logo

grete's Introduction

grete

is gregor's sister that adds a threadpool and a scheduler

<! release <! clojars>

... and some Java API
... and the latest kafka (at the moment of writing)

the idea behind grete is to be able to start a farm of kafka consumers that listen to (potentially) multiple topics and apply a simple consuming function.

spilling the beans

$ make repl

=> (require '[grete.core :as g])

it is quite common for the same app to produce and consume,
hence we'll use one config for producing and consuming:

=> (def config {:kafka
                {:producer
                 {:bootstrap-servers "1.1.1.1:9092,2.2.2.2:9092,3.3.3.3:9092"}
                 :consumer
                 {:group-id "foobar-consumer-group"
                  :bootstrap-servers "1.1.1.1:9092,2.2.2.2:9092,3.3.3.3:9092"
                  :topics ["foos" "bars" "bazs"]
                  :threads 42
                  :poll-ms 100
                  :auto-offset-reset "earliest"}}})

produce

produce a couple of messages (to foos topic):

=> (def p (g/producer (get-in config [:kafka :producer])))

;; send a couple of messages to topics: "foos" and "bars"
=> (g/send! p "foos" "{:answer 42}")
=> (g/send! p "bars" "{:answer 42}")

=> (g/close p)

consume

a sample consuming function "process":

;; the "process" function takes a batch of 'org.apache.kafka.clients.consumer.ConsumerRecords'
;; which can be turned to a seq of maps with 'consumer-records->maps'"

=> ;; not using "consumer" arg here, but you may
   (defn process [consumer batch]
     (let [batch (g/consumer-records->maps batch)
           bsize (count batch)]
       (when (pos? bsize)
         (println "picked up" bsize "events:" batch))))

start a farm of consumers (42 threads as per config):

=> (def consumers (g/run-consumers process (get-in config [:kafka :consumer])))

once the "farm" is started you'll see those two messages that were produces above:

;;   picked up 2 events: ({:value {:answer 42},
;;                         :key #object[[B 0x65ae581f [B@65ae581f],
;;                         :partition 2,
;;                         :topic foos,
;;                         :offset 1000,
;;                         :timestamp 1586888551200,
;;                         :timestamp-type CreateTime}
;;                        {:value {:answer 42},
;;                         :key #object[[B 0x499b3437 [B@499b3437],
;;                         :partition 13,
;;                         :topic foos,
;;                         :offset 3239,
;;                         :timestamp 1586889147336,
;;                         :timestamp-type CreateTime})

values here are strings, but could be byte arrays given bytearray de/serializers.

as with other thread pools, it's a good idea to shut them down once we done working with them:

=> (g/stop-consumers consumers)

callbacks

a kafka producer has an internal accumulator (kept in Deque) where it stores all the events before sending them out to the server (a.k.a. broker). when it is ready to send them, it splits the events stored in the accumulator in batches (controlled by a "batch.size" prop) and sends them out batch by batch.

the wait time before the actual "publish | send" is controlled via a "linger.ms" producer configuration property that maintains the balance of latency vs. throughput.

hence the kafka publishing process is asynchronous by design.

once the events are published to the broker, kafka producer informs the calling API about the status via an optional callback.

this callback is a function that takes two arguments:

  • metadata in a form of
{:offset 42
 :partition 13
 :topic "eagle-nebula"}
  • and, in case of a problem, an exception

this callback can be provided to a grete's send-then! function:

=> (g/send-then! p "foos" "{:answer 42}"
     (fn [metadata exception]
       (println {:meta metadata
                 :exception exception})))

#object[org.apache.kafka.clients.producer.internals.FutureRecordMetadata 0x753e4eb5 "org.apache.kafka.clients.producer.internals.FutureRecordMetadata@753e4eb5"]
{:meta {:offset 2
        :partition 0
        :topic foos}
 :exception nil}

;; this part ๐Ÿ‘† is returned to a producer via a callback
;; this part ๐Ÿ‘‡ is returned to a consumer

picked up 1 events: ({:value {:answer 42}, :key nil, :partition 0, :topic foos, :offset 2, :timestamp 1701364230786, :timestamp-type CreateTime})

Java API

consumer props:

bootstrap-servers: "1.1.1.1:9092,2.2.2.2:9092,3.3.3.3:9092"
threads: 42
poll-ms: 10
topics: "foos,bars,bazs"
group-id: "foobar-consumer-group"
auto-offset-reset: "earliest"
enable-auto-commit: "false"
heartbeat-interval-ms: "3000"
default-api-timeout-ms: "600000"
session-timeout-ms: "30000"

a mesage processing function:

static void process(ConsumerRecords<byte[], byte[]> records) {
   // ...
}

a map of consumers:

import tolitius.Grete;

BiConsumer<KafkaConsumer, ConsumerRecords<byte[], byte[]>> consume =
        (consumer, records) -> process(records);

Map consumers = Grete.startConsumers(consume, props);

Grete.stopConsumers(consumers);

could be "process(consumer, records)" if "KafkaConsumer" is also needed

several topics at once

In case the same group of consumer threads are listening to multiple topics and the distinction needs to be made, i.e. what messages came from which topics, the records need to be groupped by topic:

static Map<String, List<ConsumerRecord<byte[], byte[]>>> groupByTopic(ConsumerRecords<byte[], byte[]> records) {

    if (records.isEmpty()) {
        log.trace("no new records in kafka, hence there is nothing to transport");
        return null;
    }

    var byTopic = new ConcurrentHashMap<String, List<ConsumerRecord<byte[], byte[]>>>();

    records.forEach(record -> {
        var topic = record.topic();
        var rs = byTopic.getOrDefault(topic, new ArrayList<>());
        rs.add(record);
        byTopic.put(record.topic(), rs);
    });

    return byTopic;
}

this is the "process" function from a previous example with a group by topic:

static void process(ConsumerRecords<byte[], byte[]> records) {

    // since consumer may be subscribed to multiple topics the batch might include
    // records of different types / from different topics.
    // group all the the records in the batch by the topic to later pipe it to the proper function
    var byTopic = groupByTopic(records);

    if (byTopic != null) {
        byTopic.forEach((topic, rs) -> {

            // ...
        });
    }
}

License

Copyright ยฉ 2023 tolitius

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

grete's People

Contributors

danboykis avatar tolitius avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.