Giter VIP home page Giter VIP logo

decaton's Introduction

Decaton

Build Status

Decaton is a streaming task processing framework built on top of Apache Kafka. It is designed to enable "concurrent processing of records consumed from one partition" which isn't possible in many Kafka consumer frameworks.

Here is the list of property that Decaton enables by default:

  • Concurrent (either multi-threaded or asynchronous) processing of records consumed from one partition
  • Preserve ordering guarantee by record keys
  • Preserve At-least-once delivery semantics no matter in which order does records gets processed

Decaton is a library rather than a full-stack execution environment so you can easily integrate it into your existing/new JVM applications just by adding few lines of artifact dependencies.

Since it has been designed, optimized and being used for LINE's server system which produces over 1 million, I/O intensive tasks per second for each stream, its internal implementation for enabling concurrent processing of records is highly optimized and can produce ideal throughput with minimal count of servers, maximizing their resource utilization.

Getting Started / Tutorial

Please see Getting Started

When to use (and when to not)

It's good idea to use Decaton when you have requirement for high-throughput and/or low-latency processing capability with your processing logic containing I/O access against external systems (e.g, DB access, Web API access) which tends to apply certain processing latency in each tasks.

It would be better idea to look for other frameworks like Kafka Streams when you need to do complex stream processing/aggregation such as streams join, windowed processing without needing to access external storage/web APIs.

Minimum dependencies

Below is the minimum dependencies to add Decaton artifacts into your Gradle project. It's for people who prefers to try the APIs first by adding it into your project, please see Getting Started for the detailed explanation and proper use of it.

// For task producers
implementation "com.linecorp.decaton:decaton-common:$DECATON_VERSION"
implementation "com.linecorp.decaton:decaton-client:$DECATON_VERSION"
// For processors
implementation "com.linecorp.decaton:decaton-common:$DECATON_VERSION"
implementation "com.linecorp.decaton:decaton-processor:$DECATON_VERSION"

Features

The core feature of Decaton is support for concurrent processing of records consumed from one partition. See Why Decaton part of document for the detail.

Besides that, it got a lot of unique and useful features through its adoption spread for many services at LINE which are all came out of real needs for building services. Below are some examples. See Index for the full list.

  • Retry Queuing - Retry a failed task with back-off without blocking other tasks flow
  • Dynamic Rate Limiting - Apply and update processing rate quota dynamically
  • Task Compaction - Crush preceding tasks which its processing results will be overwritten by following task

Performance

High performance is one of the biggest functionalities of Decaton so we're carefully tracking its performance transitions commit-by-commit. You can see performance tracking dashboard from here (Note that the actual performance could be more or less depending on machine resource and configuration).

How to build

We use Gradle to build this project.

./gradlew build

How to contribute

See CONTRIBUTING.md.

If you believe you have discovered a vulnerability or have an issue related to security, please contact the maintainer directly or send us a email to [email protected] before sending a pull request.

License

Copyright 2020 LINE Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

See LICENSE for more detail.

Contributor Credit

Below people made great contributions to this project in early days but their name doesn't appear in commit history because we had to erase commit logs when open-sourcing it by security reasons. Thank you so much for your contributions!

Haruki Okada, Wonpill Seo, Szuyung Wang, Vincent Pericart, Ryosuke Hasebe, Maytee Chinavanichkit, Junpei Koyama, Yusuke Imai, Alex Moreno

decaton's People

Contributors

4jessie avatar ajalab avatar be-hase avatar cormoran avatar from-unknown avatar hase1108 avatar karbonitekream avatar kawamuray avatar kazuki-ma avatar kojilin avatar koyapei avatar lazmond3 avatar m50d avatar mauhiz avatar mosmeh avatar ocadaruma avatar palindrom615 avatar ryamagishi avatar skkap avatar ta7uw avatar tadashiya avatar yang-33 avatar ykubota avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

decaton's Issues

Provide a way of specifying a partition in producing a task

We can inject a custom org.apache.kafka.clients.producer.Partitioner by implementing it and passing it through DecatonClientBuilder#producerConfig.

However, A decaton client wraps a task set by our application in DecatonTaskRequest.
So, to determine a partition based on a task in Partitioner, we should handle a DecatonTaskRequest (deserialize our defined task. after that proceed partition step) like the following.

public class CustomPartitioner extends DefaultPartitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (value instanceof DecatonTaskRequest) {
            DecatonTaskRequest request = (DecatonTaskRequest) value;
            MyTask task = new ProtocolBuffersDeserializer<>(MyTask.parser())
                                           .deserialize(request.getSerializedTask().toByteArray());
            # determine partition
            .... 
            return result;
        }
        return super.partition(topic, key, keyBytes, value, valueBytes, cluster);
    }

The above seems redundant and Inefficient due to deserializing a serialized task.

Could we provide a way of specifying a partition in producing a task like the following API?

public class DecatonClientImpl<T> implements DecatonClient<T> {
   ....
  public CompletableFuture<PutTaskResult> put(String key, T task, int partition) {
  ...
}

The partition that is passed as the argument would be set to TaskMetadataProto and be used in an internal Partitioner that determines a partition base on it.

What do you think about this feature?

Support unfixed backoff strategy

In the retrying mechanism of decaton processor, currently, the fixed backoff is only supported.
It is preferable to increase the interval as the number of retries increases rather than the fixed interval.

I guess it is useful to support exponential backoff, fibonacci backoff, and randomized backoff.

Make CentralDogma property supplier's file auto-creation optional

On CentralDogma, it's safer for application tokens to be readonly.
Futhermore, in git mirroring-enabled environments, writing via CD may be overridden by the next git commit.

Proposal

  • add a boolean parameter autocreate to factory; legacy method may keep auto-creating the file by default
  • log a warn instead of failing if the file is missing

NPE in DecatonTaskRetryQueueingProcessor if RetryConfig.backoff is null

NPE occurs in DecatonTaskRetryQueueingProcessor#process if backoff is null.

public void process(ProcessingContext<byte[]> context, byte[] serializedTask)
throws InterruptedException {
TaskMetadata originalMeta = context.metadata();
long nextRetryCount = originalMeta.retryCount() + 1;
long nextTryTimeMillis = System.currentTimeMillis() + backoff.toMillis();
TaskMetadataProto taskMetadata =
TaskMetadataProto.newBuilder(originalMeta.toProto())
.setRetryCount(nextRetryCount)
.setScheduledTimeMillis(nextTryTimeMillis)
.build();

Should we check if backoff is null when building RetryConfig or building Subscription which is configured for retry processing?

if (retryConfig != null) {
Properties producerConfig = new Properties();
producerConfig.putAll(presetRetryProducerConfig);
producerConfig.putAll(Optional.ofNullable(retryConfig.producerConfig())
.orElseGet(producerConfigSupplier(consumerConfig)));
KafkaProducerSupplier producerSupplier = Optional.ofNullable(retryConfig.producerSupplier())
.orElseGet(DefaultKafkaProducerSupplier::new);
retryProcessorSupplier = new DecatonProcessorSupplierImpl<>(() -> {
DecatonTaskProducer producer = new DecatonTaskProducer(
scope.retryTopic().get(), producerConfig, producerSupplier);
return new DecatonTaskRetryQueueingProcessor(scope, producer);
}, ProcessorScope.SINGLETON);

Check performance regression as part of integration test

There were several cases in the past that we have noticed some changes like dependency upgrade, fixing bug impacted decaton's performance either in a good way or bad way.

Since it's biggest feature is performance we should have a automated test against its performance that can catch performance regression or unexpected improvement.

Probably after #17

Provide round-robin mode in SubPartitioner for non-null key

  • Currently, Decaton's SubPartitioner uses murmur2 hash of the key to determine the subpartition to ensure per-key processing ordering guarantee.
  • However, sometimes we want to use round-robin strategy even for non-null key. e.g.:
    • Topic is consumed by multiple consumers, and some consumers need ordering guarantee (e.g. windowed-processing) but some consumers don't (just collecting stats).
    • In latter type of consumers, using hash-partitioning doesn't make sense. (just spoils the processing throughput when key-bursting happens)
  • It's nice to have a feature to enable round-robin mode even for non-null keyed topics.

Fix flaky test MetricsTest. testMetricsLifecycleManagement

testMetricsLifecycleManagement fails in #93 (refs: https://github.com/line/decaton/pull/93/checks?check_run_id=2026634635), but it succeeds on my local and seems not related to the changes of #93.

com.linecorp.decaton.processor.metrics.MetricsTest > testMetricsLifecycleManagement FAILED
    java.lang.AssertionError: expected:<[MeterId{name='decaton.tasks.scheduling.delay', tags=[tag(partition=1),tag(subscription=abc),tag(topic=topic)]}, MeterId{name='decaton.partition.throttled.time', tags=[tag(partition=1),tag(subscription=abc),tag(topic=topic)]}]> but was:<[MeterId{name='decaton.processor.compaction.compacted.keys', tags=[tag(subscription=subscription)]}, MeterId{name='decaton.processor.processed.time', tags=[tag(partition=0),tag(subpartition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.partition.throttled.time', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.retry.queued.tasks', tags=[tag(subscription=subscription)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=poll),tag(subscription=subsc)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=commit),tag(subscription=subsc)]}, MeterId{name='decaton.tasks.process.duration', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=reload),tag(subscription=subsc)]}, MeterId{name='decaton.partition.paused.time', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.tasks.discarded', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.processor.compaction.compacted', tags=[tag(subscription=subscription)]}, MeterId{name='decaton.tasks.error', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.tasks.scheduling.delay', tags=[tag(partition=1),tag(subscription=abc),tag(topic=topic)]}, MeterId{name='decaton.partition.queue.starved.time', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.tasks.scheduling.delay', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.tasks.processed', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.tasks.pending', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.partition.throttled.time', tags=[tag(partition=1),tag(subscription=abc),tag(topic=topic)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=records),tag(subscription=subsc)]}, MeterId{name='decaton.retry.queueing.failed', tags=[tag(subscription=subscription)]}, MeterId{name='decaton.tasks.complete.duration', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.partition.paused', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=pause),tag(subscription=subsc)]}, MeterId{name='decaton.tasks.queued', tags=[tag(partition=0),tag(subpartition=0),tag(subscription=subscription),tag(topic=topic)]}]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.junit.Assert.assertEquals(Assert.java:146)
        at com.linecorp.decaton.processor.metrics.MetricsTest.testMetricsLifecycleManagement(MetricsTest.java:53)

Enqueue different taskData on retry (partial retry support)

We often use decaton to process a bunch of targets as a single task for better batching to middleware or downstream.
For example, update 100 users' data, delete 100 linked data when unregistration and so on.

Such kinds of tasks might hit a partial failure due to downstream issues like DB failure, or so on.
e.g. some targets succeeded, but some failed
In such a case, we'd like to retry partially to avoid duplicated operations for successful targets or avoid unnecessary load downstream.

However, the current decaton doesn't support such scenarios in native and we retry the whole task or implement such logic by ourselves touching TaskMetadata, DecatonTaskRetryQueueingProcessor and handling defer completion.

Retry the whole task is not preferred, in addition to the above reason, because succeeded targets in the previous attempt might fail in the retry attempt and eventually the task is not considered as succeeded even though each target succeeded eventually.

Can decaton support such retry in ProcessingContext?

I'd like to do following:

process(ProcessingContext<MyTask> context, MyTask task) {
  List<Id> targets = task.getTargetIds();
  List<Id> failedTargets = doMyProcess(targets);

  if (failedTargets.isNotEmpty() && maxRetryCount > context.metadata().retryCount()) {
    context.retry(new MyTask(failedTargets));
  }
}

Currently, ProcessingContextImpl doesn't know how to serialize task to byte[], so to implement this function, we may pass serializer to ProcessingContext in anyway, or retry signature will be retry(byte[]), however, it might not a good signature.

ERROR logged when using ProcessorSubscription as a Runnable

Since #53 ProcessorSubscription.commitCompletedOffsets will print an ERROR log when the ProcessorSubscription is run as a Runnable (e.g. submitted to an ExecutorService) rather than a Thread.
The assumption (Thread.currentThread == this) is not valid in that case.

The log is harmless but noisy.
I am not sure who else uses ProcessorSubscription in this way but it does not seem unreasonable (e.g. to get metrics by thread group) so I think the best way is to just remove the extra check

Closing ProcessorSubscription twice causes NPE

trace:

java.lang.NullPointerException: null
	at com.linecorp.decaton.processor.metrics.Metrics$AbstractMetrics.close(Metrics.java:84)
	at com.linecorp.decaton.processor.metrics.Metrics$SubscriptionMetrics.close(Metrics.java:95)
	at com.linecorp.decaton.processor.runtime.ProcessorSubscription.awaitShutdown(ProcessorSubscription.java:424)
	at com.linecorp.decaton.processor.runtime.AsyncShutdownable.close(AsyncShutdownable.java:27)

It looks like closed meter should be removed from meters List in AbstractMetrics.

Flaky test: CentralDogmaPropertySupplierIntegrationTest

https://github.com/line/decaton/actions/runs/4239651470/jobs/7367906297

com.linecorp.decaton.centraldogma.CentralDogmaPropertySupplierIntegrationTest > testCDIntegration FAILED
    java.util.concurrent.CompletionException: com.linecorp.armeria.client.UnprocessedRequestException: com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException: Failed to select within 3200 ms an endpoint from: HealthCheckedEndpointGroup{endpoints=[Endpoint{127.0.0.1:35307, weight=1000}], numEndpoints=1, candidates=[Endpoint{127.0.0.1:35307, weight=1000}], numCandidates=1, selectionStrategy=class com.linecorp.armeria.client.endpoint.WeightedRoundRobinStrategy, initialized=true}
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
        at com.linecorp.armeria.common.util.UnmodifiableFuture.doCompleteExceptionally(UnmodifiableFuture.java:167)
        at com.linecorp.armeria.common.util.UnmodifiableFuture.lambda$wrap$0(UnmodifiableFuture.java:101)
        at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
        at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
        at com.linecorp.armeria.common.stream.DeferredStreamMessage.lambda$delegate$0(DeferredStreamMessage.java:134)
        at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
        at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
        at com.linecorp.armeria.common.stream.StreamMessageCollector.onError(StreamMessageCollector.java:80)
        at com.linecorp.armeria.internal.common.stream.AbortedStreamMessage.subscribe0(AbortedStreamMessage.java:91)
        at com.linecorp.armeria.internal.common.stream.AbortedStreamMessage.lambda$subscribe$1(AbortedStreamMessage.java:85)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:[470](https://github.com/line/decaton/actions/runs/4239651470/jobs/7367906297#step:4:471))
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:391)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)

        Caused by:
        com.linecorp.armeria.client.UnprocessedRequestException: com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException: Failed to select within 3200 ms an endpoint from: HealthCheckedEndpointGroup{endpoints=[Endpoint{127.0.0.1:35307, weight=1000}], numEndpoints=1, candidates=[Endpoint{127.0.0.1:35307, weight=1000}], numCandidates=1, selectionStrategy=class com.linecorp.armeria.client.endpoint.WeightedRoundRobinStrategy, initialized=true}

            Caused by:
            com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException: Failed to select within 3200 ms an endpoint from: HealthCheckedEndpointGroup{endpoints=[Endpoint{127.0.0.1:35307, weight=1000}], numEndpoints=1, candidates=[Endpoint{127.0.0.1:35307, weight=1000}], numCandidates=1, selectionStrategy=class com.linecorp.armeria.client.endpoint.WeightedRoundRobinStrategy, initialized=true}
                at app//com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException.get(EndpointSelectionTimeoutException.java:48)
                at app//com.linecorp.armeria.client.endpoint.AbstractEndpointSelector.lambda$select$0(AbstractEndpointSelector.java:96)
                at app//io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
                at app//io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
                at app//io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
                at app//io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
                at app//io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
                at app//io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:394)
                at app//io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
                at app//io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
                at app//io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
                at [email protected]/java.lang.Thread.run(Thread.java:833)

Provide CentralDogmaPropertySupplier that can read YAML

The properties files on centraldogma must be written in JSON format for now.
I want to leave some comments on the properties files, but JSON doesn't support comment-out.
So, I hope that decaton-centraldogma supports YAML which supports comment-out.

Add default values of new properties to JSON files on CentralDogma when they are supported

Currently, CentralDogmaPropertySupplier#register creates a default property file if it doesn't exist on CentralDogma.
Only the properties that exist at the time are included, so when new properties are supported, they will not be included in the files.

I guess it is useful that when new properties are supported, their default values will be added to JSON files on CentralDogma.

Published JAR files contain invalid `Class-Path` entry in manifest

$ cat META-INF/MANIFEST.MF   
Manifest-Version: 1.0
Class-Path: client-0.0.40-SNAPSHOT.jar common-0.0.40-SNAPSHOT.jar micr
 ometer-core-1.5.1.jar protocol-0.0.40-SNAPSHOT.jar kafka-clients-2.4.
 0.jar HdrHistogram-2.1.12.jar LatencyUtils-2.0.3.jar protobuf-java-3.
 3.0.jar zstd-jni-1.4.3-1.jar lz4-java-1.6.0.jar snappy-java-1.1.7.3.j
 ar slf4j-api-1.7.28.jar

This is due to the gradle shadow jar plugin that we use to build our release jars, but for now there's no easy workaround.
Related: johnrengelman/shadow#324

Provide BatchingProcessor as a built-in processor

  • Task batching is a common-pattern that many Decaton users often implement by their own
    • i.e. Batching several tasks of type T to List<T> and process them at once. e.g. when downstream-DB supports batching I/O (which often very efficient)
    • Batch-flushing should be done in time-based and size-based.
  • So it's better to provide BatchingProcessor built-in to meet the common needs

Narrow the scope of waiting pending task count upon dynamic property reload

Though this "safepoint"-like mechanism was to preserve per-key process ordering guarantee even on concurrency change, at least, we can narrow down the scope of safepoint to per-partition instead of global without violating per-key ordering guarantee.

Consumer's properties are also used for a producer to retry

The following type of warning occurs many times when using the retry mechanism of decaton.

The configuration 'group.id' was supplied but isn't a known config.

This warning occurs when a producer is given unused properties.

https://github.com/apache/kafka/blob/779f1444f128e9d77486243cb94d7efcdea26f52/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L353-L355

I found that the following code causes this warning.

Properties producerConfig = Optional.ofNullable(retryConfig.producerConfig())
.orElse(consumerConfig);

Properties for a consumer are also used for properties for a retry producer.
I think this behavior is designed to set bootstrap.servers and so on without settings for retry.
But, more consumer's properties there are, the more warnings may occur.
It might be a good idea to filter consumer's properties in order to set the properties to a retry producer without warnings.

Silence WARNs during rebalancing

On decaton 6, observing the following WARNs, when

  • cooperating rebalancing is ON
  • a node joins the consumer group
com.linecorp.decaton.processor.runtime.internal.CommitManager:131

Offset commit failed asynchronously

org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.

I don't think the end-user needs to know about this, since poll() is soon going to be called again in ProcessorSubscription.consumeLoop

Upgrade Spring Boot dependency

  • As of 2021-11-05, decaton-spring's spring-boot dependency version is 2.1.7.RELEASE, which is way behind from boot's latest (2.5)
  • Though decaton-spring only uses core features of boot so should be work even with newer boot version, it's better to upgrade the dependency as much as possible

Waiting indefinitely if closing a unstarted ProcessorSubscription

close() will wait indefinitely if closing a ProcessorSubscription without calling start().
This symptom occurs from version 1.1.0.

If I don't need to start it, I can implement it so that it doesn't create a ProcessorSubscription, but I thought this symptom was strange.

Here is the simple reproducer

var properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<bootstrap-servers>");
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
var subscription = SubscriptionBuilder.newBuilder("my-decaton-processor").processorsBuilder(
                ProcessorsBuilder.consuming("my-decaton-topic", new ProtocolBuffersDeserializer<>(SomeTask.parser())))
                                                                .consumerConfig(properties).build();
subscription.close(); // hang-up...

Introduce a timeout for destroying processors

Summary

  • Currently, rebalance-listener could block indefinitely in below scenario:
    • Let's say:
      • There are 3 partitions (0,1,2)
      • 3 instances (X,Y,Z)
      • partition-0 is assigned to X
      • max.poll.interval.ms is set to long value because sometimes task takes long time to complete (Basically Decaton doesn't block poll() by task processing, but there's an exception in rebalance-listener)
    • Then, let's say partition-0's assignment is taken over by Y by reassignment triggered by Z's restart
    • rebalance-listener's onPartitionAssigned calls partition-0's PartitionProcessor#close, which waits processor unit's executor to terminate indefinitely
    • If long-running task is being executed by a processor, onPartitionAssigned will be blocked until the task completes
    • When Z came back to the group, it initiates another rebalance, but it never finishes because X is in stuck at onPartitionAssigned, so cannot send JoinGroup
    • Meanwhile, all processing on all instances are stuck
  • Though this behavior is Decaton's design decision (i.e. "rebalance could block up to max.poll.interval.ms at max"), some users use Decaton for processing long-running tasks, and aborting task "ungracefully" is preferable to waiting task indefinitely
  • So it might be better to have an option to set timeout on destroying processors

Retry fails if a non-ASCII string is used for the Kafka Record Key

When I was verifying the operation using the retry mechanism of Decaton, I found a phenomenon that the retry was not performed in the process that expected the retry.

When sending a normal Decaton Task, we use org.apache.kafka.common.serialization.LongSerializer to serialize a long value and use it as a key for Kafka Record.

However, when the retry task is sent by com.linecorp.decaton.client.internal.DecatonTaskProducer, the long type byte string is converted to String once when retrying, and com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer Will be serialized into a byte string.

Here, if the long value is converted to a byte string and the character is extracted again, the following exception will occur because it is not an ASCII character, and the task will not be sent to the retry topic.

ERROR c.l.d.p.runtime.internal.ProcessPipeline:121 - Uncaught exception thrown by processor 
...
org.apache.kafka.common.errors.SerializationException: illegal character: ^ @

Limiting the value used for key to an ASCII string when retrying seems like an unexpected behavior.

Isolate bursting key to prevent innocent keys are affected

  • In Kafka, it's common to use some entity's identifier (e.g. user-id) to ensure task processing ordering per entity.
  • At the same time, this tends to cause "hot-spot" key when there's an abusive task generation for single entity
  • When it happens, Decaton's processing thread could be occupied by such bursting key and could affect other innocent keys processing
  • It's nice to have a built-in feature to detect hot-spots and isolate workload to protect innocent keys

(Actually, this is a missing long-awaited Decaton feature that necessity exists from early days...)

Add a feature to supply initial value for self CD registration

  • When we create property file using CentralDogmaPropertySupplier#register, the file consists of default values.
  • If we know appropriate value for a property at the beginning, this will require extra step to configure it
    • CentralDogmaPropertySupplier#register
    • update value on CentralDogma

Allow TaskMetadata include the `retry` count for DecatonClient

Though DecatonClient is preferring over DecatonProcess (the docs say) but when I do deal with retry and retry times, Decaton lacks of this information because of the TaskMetadata does not have any properties to store retry count as the TaskMetadataProto so that we have to use DecatonProcess which is more verbose and more codes to write!

So could you consider to update TaskMetadata?

ProcessorUnit processes tasks without waiting scheduledTime on shutdown

Scenario:

  1. task X (scheduledTime = now + 100sec) is in ProcessorUnit's queue
  2. ProcessorSubscription#close()
    • ProcessosUnit#close() =>
    • ProcessPipeline#close() =>
    • ExecutionScheduler#close() => terminateLatch.countDown()
  3. ExecutionScheduler#schedule() returns immediately for task X without waiting 100sec
  4. task X is passed to DecatonProcessor

Support retrying on a different cluster

Retry producer can be configured to use a different cluster but retry consumer cannot. (shares the consumer with the original one)

While this is saves resources in most cases, it prevents enabling retry feature on a cluster you can't freely create topics on.

Exception raised from `KafkaConsumer#committed` could terminate subscription thread

There was a report from a user that tells their decaton instance's subscription thread got terminated with logs: ProcessorSubscription .. got exception while consuming, currently assigned: [(topics)...].

After some investigation I found that KafkaConsumer#committed seems to have changed its behavior from "never to throw exception with indefinite timeout" to "throw an exception after request.timeout.ms" in between 1.1.x and the latest.

Since the #committed call is not guarded with exception handling we have to fix it.

Change default value of max.pending.records

There was a case of batching consumer that was experiencing low throughput due to too small default max.pending.records caused frequently pause/resume loop and lowering throughput.
We set it conservatively low to avoid causing high memory pressure by default setting expecting the case where average record size is large, but maybe we can default much higher expecting in most cases record size is not so big and there's more likelihood of people get benefit of high throughput with out of the box setup.

asyncCommitInFlight concurrency

Non-volatile field CommitManager.asyncCommitInFlight is updated from

  • CommitManager.commitAsync (decaton subscription thread)
  • consumer.commitAsync callback (kafka background I/O thread)

To ensure it is always correctly updated, it should be atomic or at least volatile.

ProducerRecord for retry tasks could be generated with timestamp=0

This could cause the messages to be deleted on the broker side unexpectedly earlier, since retention-based log deletion is happened based on record's timestamp. (https://kafka.apache.org/documentation/#impl_deletes)

Retrying task will be discarded on task deserialization with using DefaultTaskExtractor

We are consuming Kafka topic using Decaton processor with retrying.

However, ProcessorsBuilder.consuming(String topic, TaskExtractor<T> taskExtractor) is not working correctly with DefaultTaskExtractor.

retryTaskExtractor will unwrap DecatonTaskRequest using DefaultTaskExtractor, then taskExtractor.extract() here.
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76
But if taskExtractor is DefaultTaskExtractor or a TaskExctractor which is delegating deserialization to DefaultTaskExtractor, deserialization will be failed on retryTaskExtractor and the retrying task will be discarded.

Stacktrace:

java.lang.IllegalArgumentException: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
	at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:45)
	at com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor.extract(TimedTaskExtractor.kt:31)
	at com.linecorp.decaton.processor.runtime.ProcessorsBuilder.lambda$consuming$1(ProcessorsBuilder.java:83)
	at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.extract(ProcessPipeline.java:96)
	at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.scheduleThenProcess(ProcessPipeline.java:68)
	at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.processTask(ProcessorUnit.java:73)
	at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.lambda$putTask$1(ProcessorUnit.java:60)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
	at com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:101)
	at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:551)
	at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipMessage(CodedInputStream.java:649)
	at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipField(CodedInputStream.java:581)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1073)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1041)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1638)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1633)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:163)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:209)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:214)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.parseFrom(Decaton.java:1250)
	at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:36)
	... 9 common frames omitted

We passed original com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor to taskExtractor which is like following (written in Kotlin).
This TimedTaskExtractor is for observing consuming delay.
The issue will cause with delegate = DefaultTaskExtractor.

class TimedTaskExtractor<T>(
    private val delegate: TaskExtractor<T>,
    subscription: String,
    topic: String,
    meterRegistry: MeterRegistry
) : TaskExtractor<T> {
    private val timer = meterRegistry.timer(
        "decaton.processor.${TimedTaskExtractor::class.simpleName?.toLowerCase()}.timestamp_delay",
        Tags.of(Tag.of("topic", topic), Tag.of("subscription", subscription))
    )

    override fun extract(bytes: ByteArray): DecatonTask<T> {
        return delegate.extract(bytes).also {
            if (it.metadata().timestampMillis() > 0) {
                timer.record(
                    System.currentTimeMillis() - it.metadata().timestampMillis(),
                    TimeUnit.MILLISECONDS
                )
            }
        }
    }
}

Support cooperative rebalancing protocol

Flaky test in CompactionProcessorTest

  • CompactionProcessorTest > testOutputDelayed fails occasionally with following error:
com.linecorp.decaton.processor.processors.CompactionProcessorTest > testOutputDelayed FAILED
    java.lang.AssertionError
        at org.junit.Assert.fail(Assert.java:87)
        at org.junit.Assert.assertTrue(Assert.java:42)
        at org.junit.Assert.assertTrue(Assert.java:53)
        at com.linecorp.decaton.processor.processors.CompactionProcessorTest.testOutputDelayed(CompactionProcessorTest.java:218)

Add a document about metrics

I guess we should have a list of all decaton metrics and a monitoring guide.
It is useful for users to have appropriate monitoring targets.

Transition to RUNNING only after offset-reset happens

Summary

  • Currently, ProcessorSubscription's state transitions to RUNNING at the end of rebalance listener (onPartitionsAssigned)
  • So, offset-reset could be happen in later poll() after transitioned to RUNNING
  • Sometimes, this could cause unintuitive behavior to the user.
    • Even for us. We were not aware about this behavior until we found the benchmark was not running (#81)

Example

  • Some users set auto.offset.reset = none when they migrate their consumer to Decaton from other framework (which likely includes a lot of code changes), to make sure group.id is configured correctly so can continue consumption from committed offset.
    • So they tried to detect the group.id-mistake by checking the subscription transitioned to RUNNING or not. (auto.offset.reset = none throws an exception when starts up if group.id doesn't exist, so they assumed the subscription doesn't transition to RUNNING if group.id is not configured properly)
  • But, in fact, subscription transitions to RUNNING even if there's a mistake in group.id. (and transition to SHUTTING_DOWN -> TERMINATED soon)

Add kafka headers support

Hi, decaton team.
I suggest the basic function of Kafka.
Decaton does not support consumer header information. If decaton provides header information, it will be useful to all users.
For example, I need 3 Kafka headers(kafka_replyTopic, kafka_replyPartition, kafka_correlationId) to use the request-reply pattern:))

[Feature request] Timeout for `deferCompletion()`

We want a feature setting a timeout for ProcessingContext.deferCompletion() to avoid stuck DecatonProcessor in any case.

Decaton supports async task processing with using deferCompletion() on processor.
However, async processing usually makes a bug and it might miss to call complete().
Finally, it causes critical issue of consuming topics.

If decaton has a timeout for deferCompletion(), we can avoid such outage in most cases.

Cleanup CompactionProcessor metrics upon subscription close

refs: #95 (comment)

  • In principle, we close every Decaton's internal metrics by implementing https://github.com/line/decaton/blob/master/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java#L64
  • But CompactionProcessor is an exception
    • Since there's no way to pass subscriptionId upon CompactionProcessor instantiation, CompactionProcessor's metrics are registered every time per task, so AbstractMetrics isn't useable
      • Though we confirmed it doesn't cause much overhead
  • Possible fix for this will be like below:
    • Make CompactionProcessor constructor to receive subscriptionId
      • Or It'd better to pass information which is available on the processor scope (partition, threadId) in addition to subscriptionId
    • Add ProcessorsBuilder#thenProcess overload that receives supplier of type Function<ScopeInfo, DecatonProcessor>

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.