Giter VIP home page Giter VIP logo

franz's People

Contributors

bonahona avatar mantono avatar torbacka avatar williamhogman avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

franz's Issues

Support only running a worker for a limited time

Sometimes we need to only process a limited set of jobs on a topic and then let the worker exit. It is however not quire clear how and what this limits should be.

Given that we want a job not to run forever, we must limit it to a certain threshold. This threshold may be an absolute time stamp in UNIX epoch or a duration for which the worker is willing to wait at most for a new job on the topic. Both these options raises some questions. What does a UNIX epoch time stamp that is in the future mean? What if we only set a max wait time, but keep getting new jobs at intervals lower than the max wait time?

Franz gets stuck in loop when retrying transient failure with null value

kotlin.KotlinNullPointerException: null
at franz.engine.kafka_one.JobStatusesKt.getOrFail(JobStatuses.kt:8)
at franz.engine.kafka_one.JobStatusesKt.access$getOrFail(JobStatuses.kt:1)
at franz.engine.kafka_one.JobStatuses.rescheduleTransientFailures(JobStatuses.kt:46)
at franz.engine.kafka_one.KafkaConsumerActorKt.retryTransientFailures(KafkaConsumerActor.kt:58)
at franz.engine.kafka_one.KafkaConsumerActorKt.access$retryTransientFailures(KafkaConsumerActor.kt:1)
at franz.engine.kafka_one.KafkaConsumerActorKt$consumerLoop$1.invoke(KafkaConsumerActor.kt:123)
at franz.engine.kafka_one.KafkaConsumerActorKt$consumerLoop$1.invoke(KafkaConsumerActor.kt)
at franz.engine.kafka_one.KafkaConsumerActorKt.iterate(KafkaConsumerActor.kt:109)
at franz.engine.kafka_one.KafkaConsumerActorKt.consumerLoop(KafkaConsumerActor.kt:115)
at franz.engine.kafka_one.KafkaConsumerActorKt.access$consumerLoop(KafkaConsumerActor.kt:1)
at franz.engine.kafka_one.KafkaConsumerActor$createThread$1.run(KafkaConsumerActor.kt:138)
at java.lang.Thread.run(Thread.java:748)

This has happened in several different services today. We are investigating the root cause.

Change deprecated producer config variables

[main] WARN org.apache.kafka.clients.producer.KafkaProducer - metadata.fetch.timeout.ms config is deprecated and will be removed soon. Please use max.block.ms
[main] WARN org.apache.kafka.clients.producer.KafkaProducer - timeout.ms config is deprecated and will be removed soon. Please use request.timeout.ms

Exception: Error reading array of size 9092, only 916 bytes available

Exception in thread "Thread-2" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading array of size 9092, only 916 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:153)
at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:559)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:655)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:441)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at franz.internal.ConsumerActorKt.fetchMessagesFromKafka(ConsumerActor.kt:39)
at franz.internal.ConsumerActorKt.access$fetchMessagesFromKafka(ConsumerActor.kt:1)
at franz.internal.ConsumerActorKt$consumerLoop$1.invoke(ConsumerActor.kt:107)
at franz.internal.ConsumerActorKt$consumerLoop$1.invoke(ConsumerActor.kt)
at franz.internal.ConsumerActorKt.iterate(ConsumerActor.kt:98)
at franz.internal.ConsumerActorKt.consumerLoop(ConsumerActor.kt:104)
at franz.internal.ConsumerActorKt.access$consumerLoop(ConsumerActor.kt:1)
at franz.internal.ConsumerActor$createThread$1.run(ConsumerActor.kt:126)
at java.lang.Thread.run(Thread.java:748)

@williamhogman Any ideas?

Incorrect Kafka consumer config

https://github.com/zensum/franz/blob/master/src/main/kotlin/KafkaConsumer.kt#L12

[main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'compression.type' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'acks' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'timeout.ms' was supplied but isn't a known config.

This is what happens when you copy my broken code ๐Ÿ˜

For reference (some of them might be replaced): https://kafka.apache.org/0101/documentation.html#consumerconfigs

NPE: Unsafe access of nullable value

Value that can be null is accessed in an unsafe manner. This results in a NullPointerException

subscription-event_1 | 08:02:30.840 [ForkJoinPool.commonPool-worker-1] ERROR f.e.kafka_one.KafkaConsumerActor - Job threw an exception
subscription-event_1 | franz.JobStateException: Listify events
subscription-event_1 | at franz.JobState$processMap$lastInterceptor$1.doResume(JobState.kt:156)
subscription-event_1 | at franz.JobState$processMap$lastInterceptor$1.invoke(JobState.kt)
subscription-event_1 | at franz.JobState$processMap$lastInterceptor$1.invoke(JobState.kt:22)
subscription-event_1 | at franz.WorkerInterceptor.executeNext(WorkerInterceptor.kt:12)
subscription-event_1 | at se.zensum.franzSentry.SentryInterceptor$1.doResume(SentryInterceptor.kt:21)
subscription-event_1 | at se.zensum.franzSentry.SentryInterceptor$1.invoke(SentryInterceptor.kt)
subscription-event_1 | at se.zensum.franzSentry.SentryInterceptor$1.invoke(SentryInterceptor.kt:16)
subscription-event_1 | at franz.JobState.processToStatus(JobState.kt:239)
subscription-event_1 | at franz.JobState.processMap(JobState.kt:160)
subscription-event_1 | at franz.JobState.mapRequire(JobState.kt:113)
subscription-event_1 | at se.zensum.subscription_event.SubscriptionEventWorker$getWorkerBuilderEventsIn$2.doResume(SubscriptionEventWorker.kt:55)
subscription-event_1 | at se.zensum.subscription_event.SubscriptionEventWorker$getWorkerBuilderEventsIn$2.invoke(SubscriptionEventWorker.kt)
subscription-event_1 | at se.zensum.subscription_event.SubscriptionEventWorker$getWorkerBuilderEventsIn$2.invoke(SubscriptionEventWorker.kt:34)
subscription-event_1 | at franz.WorkerBuilderKt$pipedWorker$2.doResume(WorkerBuilder.kt:19)
subscription-event_1 | at franz.WorkerBuilderKt$pipedWorker$2.invoke(WorkerBuilder.kt)
subscription-event_1 | at franz.WorkerBuilderKt$pipedWorker$2.invoke(WorkerBuilder.kt)
subscription-event_1 | at franz.engine.kafka_one.KafkaConsumerActor$worker$1$1.doResume(KafkaConsumerActor.kt:170)
subscription-event_1 | at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:42)
subscription-event_1 | at kotlinx.coroutines.experimental.DispatchedTask$DefaultImpls.run(Dispatched.kt:161)
subscription-event_1 | at kotlinx.coroutines.experimental.DispatchedContinuation.run(Dispatched.kt:25)
subscription-event_1 | at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
subscription-event_1 | at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
subscription-event_1 | at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
subscription-event_1 | at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
subscription-event_1 | at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
subscription-event_1 | Caused by: kotlin.KotlinNullPointerException: null
subscription-event_1 | at franz.JobState$processMap$lastInterceptor$1.doResume(JobState.kt:150)
subscription-event_1 | ... 24 common frames omitted

The stacktrace above references JobState:150, but thats because its an older version, the equivalent for current version in line 161 in JobState.

Add hasKey to message

Currently when calling key(), when no key exists a NPE is thrown. While this forcing function is by design a hasKey method would allow users to missing keys in a manner appropriate.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.