pyr / kinsky Goto Github PK
View Code? Open in Web Editor NEWKafka Clojure client library
License: MIT License
Kafka Clojure client library
License: MIT License
I had some trouble trying to manually commit offsets.
After initializing a consumer and registering it to a topic I start a simple loop like this one:
(go-loop []
(when-let [{:keys [topic offset partition] :as event} (<! (first consumer))]
(when (= :record (:type event))
;; SOME PROCESSING
(put! (second consumer) {:op :commit
:topic-offsets [{:topic topic
:partition partition
:offset (inc offset)
:metadata ""}]}))
(recur)))
A few messages are processed and then it stops processing. The loop seems dead.
After a bit of investigation I found that it fails here https://github.com/pyr/kinsky/blob/master/src/kinsky/async.clj#L108
The commit operations are so close that a wake-up exception is thrown while processing a commit operation. There is no try catch here so the loop dies...
Even with a try catch, poller-ctl only processes one event so if multiple operations are submitted "simultaneously" only the first will be applied.
Is there any reason why seek operation is not supported in async facade?
In the kafka-clients 0.10.x API, KakfaConsumer.pause()
and resume()
take a Collection<TopicPartition>
instead of an array of TopicPartition
as they did in 0.9.x.
Pull request #8 fixes this.
For transaction support need the init/begin/commit/abort(Transaction) apis surfaced
When I start an async consumer on a topic with a large backlog of records, and the consumer is slow, I eventually get:
Exception in thread "async-dispatch-14" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer. (< (.size puts) impl/MAX-QUEUE-SIZE) at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:152) at clojure.core.async$put_BANG_.invokeStatic(async.clj:159) at clojure.core.async$put_BANG_.invoke(async.clj:152) at kinsky.async$consumer$fn__12525$state_machine__8801__auto____12526$fn__12529.invoke(async.clj:159)
This is when the async consumer adds new payload on to the recs
channel it uses put!
.
Thanks.
This commit makes the kinsky async facade go into a tight loop when poll is called before subscriptions have been possible to set up:
apache/kafka@4e1c7d8
When next!
is called here:
https://github.com/pyr/kinsky/blob/master/src/kinsky/async.clj#L167
So polling a KafkaConsumer not yet subscribed to any topics (and hence not assigned any partitions yet), is no longer valid behavior.
So I think the async facade needs to be redesigned so that the user is in control of when the first poll is made, alternatively can provide subscription info on creation.
When subscribing it's possible to use a keyword as topic, but if supplying a collection only strings are allowed.
The single arity variant of stop!
in ConsumerDriver
is not implemented in consumer->driver
. Also, the timeout
argument isn't used.
Should perhaps the timeout
argument be removed from ConsumerDriver
? That would be a breaking change, so perhaps just the single arity variant should be added?
Kinsky currently lacks support for kafka headers which is supported in the official java client. Headers would be a great addition.
I've created a solution, haven't added tests yet.
Fork (updated client.clj): https://github.com/Dangercoder/kinsky
Hi,
Trying to use a deserialization exception handler, am I doing it correctly, is is actually supported, can anyone help me?
I am setting this in the client/consumer options
:default.deserialization.exception.handler (.getName LogAndContinueExceptionHandler)
but no luck so far, a corrupt JSON value stops the poll process.
Would there be any interest in some thin wrapper around the AdminClient? I'm working on some functions in a project at work and could make them nice and maybe create a PR. Any thoughts?
I'd wanted to utilize the :op :stop
command to shut down the async consumer.
Lines 96 to 97 in e008068
Is there any particular reason why this doesn't (seem to) work as documented? The consumer lingers around even after sending that :op
, which in the REPL results in zombie consumers piling up in the back.
The async consumer commit operation {:op :commit}
on 0.1.12 is susceptible to WakeupException
, which causes the consumer to die.
Consumer control msg {:type :exception, :exception #error {
:cause nil
:via
[{:type org.apache.kafka.common.errors.WakeupException
:message nil
:at [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient clientPoll ConsumerNetworkClient.java 324]}]
:trace
[[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient clientPoll ConsumerNetworkClient.java 324]
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient poll ConsumerNetworkClient.java 213]
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient poll ConsumerNetworkClient.java 193]
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient poll ConsumerNetworkClient.java 163]
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator commitOffsetsSync ConsumerCoordinator.java 358]
[org.apache.kafka.clients.consumer.KafkaConsumer commitSync KafkaConsumer.java 968]
[kinsky.client$consumer$reify__12216 commit_BANG_ client.clj 405]
See
Line 569 in a530907
Timestamp defaulting to 11 and not 0 as specified by docstring seems weird.
More importantly, why do we need such defaults? They override Kafka API behavior with regards to default values, and they make (send! [this topic k v])
and (send! [this topic k v headers])
behave inconsistently (the first does not override Kafka API behavior, the second does).
Best is to not define any default values and let library clients handle them when needed.
here is the code:
user kafka sasl_plain auth
(defn init []
(let [[events-ch control-ch] (async/consumer {:bootstrap.servers "x.x.x.x:9092"
:group.id "a4e21756ae15d90c93xxxxxxxxxxxxx"
:security.protocol "SASL_PLAINTEXT"
:sasl.mechanism "PLAIN"
:sasl.jaas.config "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"a4e21756ae15d90c93953xxxxxxxxxxxxx\" password=\"9254604effc3d26a74d1xxxxxxxxxxxxxxx\";"}
(client/string-deserializer)
(client/string-deserializer))
topic "device-data-a4e21756ae15d90c9xxxxxxxxxxxxx"]
(log/error "kfk.consumer init ===> Topic: " topic)
(a/go-loop []
(when-let [record (a/<! events-ch)]
(log/error "receive the msg from topic ===>: " topic)
(log/error (pr-str record))
(recur)))
(a/put! control-ch {:op :subscribe :topic topic})
(a/put! control-ch {:op :commit})
))
thanks!
async/consumer
returns an array of two channels. But when I fixed that it didn't work either
It is possible to manually craft a Produce request which
bypasses transaction/idempotent ACL validation. Only authenticated clients
with Write permission on the respective topics are able to exploit this
vulnerability.
Mitigation: Apache Kafka users should upgrade to 2.1.1 or later where this
vulnerability has been fixed.
https://www.mail-archive.com/[email protected]/msg99277.html
Currently I'm doing something like below in order to lookup offsets at a certain timestamps then reset the topic/partition to that. I've to go through a few Java/Clojure interop in order to match with the API interface. It would be great if all can be done nicely in Clojure.
(require '[kinsky.client :as k])
(let [topic-partitions~
(map k/->topic-partition [{:topic "topic" :partition 0} {:topic "topic" :partition 1}])
topic-partition-times-map~
(into {} (for [tp topic-partitions] [tp timestamp]))
topic-partition-offsets-map
(.offsetsForTimes @consumer topic-partition-times-map (t/duration 10 :seconds))
topic-offsets
(for [[topic-partition offset-and-timestamp] topic-partition-offsets-map
:when (not (nil? offset-and-timestamp))]
(merge (k/topic-partition->data topic-partition) {:offset (.offset offset-and-timestamp)}))]
(k/commit! consumer topic-offsets)))
This looks like it could be helpful to verify correctness. Kinsky is already based on the Java client so there probably shouldn't be any differences, but it might make sense to still check? Not sure.
https://twitter.com/ewencp/status/844420714769866753
apache/kafka#2048
The implementation of flush!
in producer->driver
is (.flush this)
, but should be (.flush producer)
.
(consumer/subscribe! _c "AAA")
;; OK
(consumer/subscribe! _c #"AAA")
Execution error (ClassCastException) at kinsky.client$consumer$reify__1634/subscribe_BANG_ (client.clj:398).
java.util.regex.Pattern cannot be cast to java.util.Collection
The problem is, the function ->topics
has got a Collection
type
If Kafka is down then client/send!
seems to timeout after 60 seconds or so, but no error is raised. Is there a way to respond to an error?
I'm not sure this is an issue. It could be something that is already possible but I'm not aware how to handle it. Basically, how does one receive errors (or success messages) when using the async facade for the producer? Looking at https://github.com/pyr/kinsky/blob/master/src/kinsky/async.clj#L309, it seems the result of the call to send!
is lost in the future loop.
Though I have never used it before, it seems that the async facade accepts a response
channel. Would it be appropriate to use this to send the result, if it is supplied by the user? The primary error that I have run into on the producer side is limitations on message size and it is pretty important in my domain to know when this happens.
I'm happy to put a PR together for this change, provided it is necessary. Just wanted to confirm that it is necessary and that there are currently no means to get the response from the call to send!
.
Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.