zensum / franz Goto Github PK
View Code? Open in Web Editor NEWA Kotlin library for running workers on Kafka
License: MIT License
A Kotlin library for running workers on Kafka
License: MIT License
Sometimes we need to only process a limited set of jobs on a topic and then let the worker exit. It is however not quire clear how and what this limits should be.
Given that we want a job not to run forever, we must limit it to a certain threshold. This threshold may be an absolute time stamp in UNIX epoch or a duration for which the worker is willing to wait at most for a new job on the topic. Both these options raises some questions. What does a UNIX epoch time stamp that is in the future mean? What if we only set a max wait time, but keep getting new jobs at intervals lower than the max wait time?
kotlin.KotlinNullPointerException: null
at franz.engine.kafka_one.JobStatusesKt.getOrFail(JobStatuses.kt:8)
at franz.engine.kafka_one.JobStatusesKt.access$getOrFail(JobStatuses.kt:1)
at franz.engine.kafka_one.JobStatuses.rescheduleTransientFailures(JobStatuses.kt:46)
at franz.engine.kafka_one.KafkaConsumerActorKt.retryTransientFailures(KafkaConsumerActor.kt:58)
at franz.engine.kafka_one.KafkaConsumerActorKt.access$retryTransientFailures(KafkaConsumerActor.kt:1)
at franz.engine.kafka_one.KafkaConsumerActorKt$consumerLoop$1.invoke(KafkaConsumerActor.kt:123)
at franz.engine.kafka_one.KafkaConsumerActorKt$consumerLoop$1.invoke(KafkaConsumerActor.kt)
at franz.engine.kafka_one.KafkaConsumerActorKt.iterate(KafkaConsumerActor.kt:109)
at franz.engine.kafka_one.KafkaConsumerActorKt.consumerLoop(KafkaConsumerActor.kt:115)
at franz.engine.kafka_one.KafkaConsumerActorKt.access$consumerLoop(KafkaConsumerActor.kt:1)
at franz.engine.kafka_one.KafkaConsumerActor$createThread$1.run(KafkaConsumerActor.kt:138)
at java.lang.Thread.run(Thread.java:748)
This has happened in several different services today. We are investigating the root cause.
[main] WARN org.apache.kafka.clients.producer.KafkaProducer - metadata.fetch.timeout.ms config is deprecated and will be removed soon. Please use max.block.ms
[main] WARN org.apache.kafka.clients.producer.KafkaProducer - timeout.ms config is deprecated and will be removed soon. Please use request.timeout.ms
So we can have se.zensum instead of com.github
To create a consumer you need a groupId. Kafka just logs a warning and fails to consume when this is the case.
Exception in thread "Thread-2" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading array of size 9092, only 916 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:153)
at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:559)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:655)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:441)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at franz.internal.ConsumerActorKt.fetchMessagesFromKafka(ConsumerActor.kt:39)
at franz.internal.ConsumerActorKt.access$fetchMessagesFromKafka(ConsumerActor.kt:1)
at franz.internal.ConsumerActorKt$consumerLoop$1.invoke(ConsumerActor.kt:107)
at franz.internal.ConsumerActorKt$consumerLoop$1.invoke(ConsumerActor.kt)
at franz.internal.ConsumerActorKt.iterate(ConsumerActor.kt:98)
at franz.internal.ConsumerActorKt.consumerLoop(ConsumerActor.kt:104)
at franz.internal.ConsumerActorKt.access$consumerLoop(ConsumerActor.kt:1)
at franz.internal.ConsumerActor$createThread$1.run(ConsumerActor.kt:126)
at java.lang.Thread.run(Thread.java:748)
@williamhogman Any ideas?
https://github.com/zensum/franz/blob/master/src/main/kotlin/KafkaConsumer.kt#L12
[main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'compression.type' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'acks' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'timeout.ms' was supplied but isn't a known config.
This is what happens when you copy my broken code ๐
For reference (some of them might be replaced): https://kafka.apache.org/0101/documentation.html#consumerconfigs
Value that can be null is accessed in an unsafe manner. This results in a NullPointerException
subscription-event_1 | 08:02:30.840 [ForkJoinPool.commonPool-worker-1] ERROR f.e.kafka_one.KafkaConsumerActor - Job threw an exception
subscription-event_1 | franz.JobStateException: Listify events
subscription-event_1 | at franz.JobState$processMap$lastInterceptor$1.doResume(JobState.kt:156)
subscription-event_1 | at franz.JobState$processMap$lastInterceptor$1.invoke(JobState.kt)
subscription-event_1 | at franz.JobState$processMap$lastInterceptor$1.invoke(JobState.kt:22)
subscription-event_1 | at franz.WorkerInterceptor.executeNext(WorkerInterceptor.kt:12)
subscription-event_1 | at se.zensum.franzSentry.SentryInterceptor$1.doResume(SentryInterceptor.kt:21)
subscription-event_1 | at se.zensum.franzSentry.SentryInterceptor$1.invoke(SentryInterceptor.kt)
subscription-event_1 | at se.zensum.franzSentry.SentryInterceptor$1.invoke(SentryInterceptor.kt:16)
subscription-event_1 | at franz.JobState.processToStatus(JobState.kt:239)
subscription-event_1 | at franz.JobState.processMap(JobState.kt:160)
subscription-event_1 | at franz.JobState.mapRequire(JobState.kt:113)
subscription-event_1 | at se.zensum.subscription_event.SubscriptionEventWorker$getWorkerBuilderEventsIn$2.doResume(SubscriptionEventWorker.kt:55)
subscription-event_1 | at se.zensum.subscription_event.SubscriptionEventWorker$getWorkerBuilderEventsIn$2.invoke(SubscriptionEventWorker.kt)
subscription-event_1 | at se.zensum.subscription_event.SubscriptionEventWorker$getWorkerBuilderEventsIn$2.invoke(SubscriptionEventWorker.kt:34)
subscription-event_1 | at franz.WorkerBuilderKt$pipedWorker$2.doResume(WorkerBuilder.kt:19)
subscription-event_1 | at franz.WorkerBuilderKt$pipedWorker$2.invoke(WorkerBuilder.kt)
subscription-event_1 | at franz.WorkerBuilderKt$pipedWorker$2.invoke(WorkerBuilder.kt)
subscription-event_1 | at franz.engine.kafka_one.KafkaConsumerActor$worker$1$1.doResume(KafkaConsumerActor.kt:170)
subscription-event_1 | at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:42)
subscription-event_1 | at kotlinx.coroutines.experimental.DispatchedTask$DefaultImpls.run(Dispatched.kt:161)
subscription-event_1 | at kotlinx.coroutines.experimental.DispatchedContinuation.run(Dispatched.kt:25)
subscription-event_1 | at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
subscription-event_1 | at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
subscription-event_1 | at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
subscription-event_1 | at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
subscription-event_1 | at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
subscription-event_1 | Caused by: kotlin.KotlinNullPointerException: null
subscription-event_1 | at franz.JobState$processMap$lastInterceptor$1.doResume(JobState.kt:150)
subscription-event_1 | ... 24 common frames omitted
The stacktrace above references JobState:150, but thats because its an older version, the equivalent for current version in line 161 in JobState.
When I write to the topic test I thought the execute block should execute. But right now its not working.
Example that's not working
https://pastebin.com/XC1fjcBQ
Its working when I'm using apache Consumer.
https://pastebin.com/JPBLB0bw
Currently when calling key(), when no key exists a NPE is thrown. While this forcing function is by design a hasKey method
would allow users to missing keys in a manner appropriate.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.