line / decaton Goto Github PK
View Code? Open in Web Editor NEWHigh throughput asynchronous task processing on Apache Kafka
License: Apache License 2.0
High throughput asynchronous task processing on Apache Kafka
License: Apache License 2.0
https://github.com/line/decaton/actions/runs/4239651470/jobs/7367906297
com.linecorp.decaton.centraldogma.CentralDogmaPropertySupplierIntegrationTest > testCDIntegration FAILED
java.util.concurrent.CompletionException: com.linecorp.armeria.client.UnprocessedRequestException: com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException: Failed to select within 3200 ms an endpoint from: HealthCheckedEndpointGroup{endpoints=[Endpoint{127.0.0.1:35307, weight=1000}], numEndpoints=1, candidates=[Endpoint{127.0.0.1:35307, weight=1000}], numCandidates=1, selectionStrategy=class com.linecorp.armeria.client.endpoint.WeightedRoundRobinStrategy, initialized=true}
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at com.linecorp.armeria.common.util.UnmodifiableFuture.doCompleteExceptionally(UnmodifiableFuture.java:167)
at com.linecorp.armeria.common.util.UnmodifiableFuture.lambda$wrap$0(UnmodifiableFuture.java:101)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at com.linecorp.armeria.common.stream.DeferredStreamMessage.lambda$delegate$0(DeferredStreamMessage.java:134)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at com.linecorp.armeria.common.stream.StreamMessageCollector.onError(StreamMessageCollector.java:80)
at com.linecorp.armeria.internal.common.stream.AbortedStreamMessage.subscribe0(AbortedStreamMessage.java:91)
at com.linecorp.armeria.internal.common.stream.AbortedStreamMessage.lambda$subscribe$1(AbortedStreamMessage.java:85)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:[470](https://github.com/line/decaton/actions/runs/4239651470/jobs/7367906297#step:4:471))
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:391)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by:
com.linecorp.armeria.client.UnprocessedRequestException: com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException: Failed to select within 3200 ms an endpoint from: HealthCheckedEndpointGroup{endpoints=[Endpoint{127.0.0.1:35307, weight=1000}], numEndpoints=1, candidates=[Endpoint{127.0.0.1:35307, weight=1000}], numCandidates=1, selectionStrategy=class com.linecorp.armeria.client.endpoint.WeightedRoundRobinStrategy, initialized=true}
Caused by:
com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException: Failed to select within 3200 ms an endpoint from: HealthCheckedEndpointGroup{endpoints=[Endpoint{127.0.0.1:35307, weight=1000}], numEndpoints=1, candidates=[Endpoint{127.0.0.1:35307, weight=1000}], numCandidates=1, selectionStrategy=class com.linecorp.armeria.client.endpoint.WeightedRoundRobinStrategy, initialized=true}
at app//com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException.get(EndpointSelectionTimeoutException.java:48)
at app//com.linecorp.armeria.client.endpoint.AbstractEndpointSelector.lambda$select$0(AbstractEndpointSelector.java:96)
at app//io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at app//io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
at app//io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at app//io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at app//io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at app//io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:394)
at app//io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at app//io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at app//io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at [email protected]/java.lang.Thread.run(Thread.java:833)
Currently, CentralDogmaPropertySupplier#register
creates a default property file if it doesn't exist on CentralDogma.
Only the properties that exist at the time are included, so when new properties are supported, they will not be included in the files.
I guess it is useful that when new properties are supported, their default values will be added to JSON files on CentralDogma.
cooperative rebalancing protocol
, which mitigates the stop-the-world effect upon rebalancing
CooperativeStickyAssignor
in partition.assignment.strategy
config or implementing ConsumerPartitionAssignor
which returns RebalanceProtocol.COOPERATIVE
from supportedProtocols
ConsumerRebalanceListener
as like below:
onPartitionsRevoked
: called only when there's at least one partition that revoked from current assignmentonPartitionsAssigned
: called with newly assigned partitionsonPartitionsAssigned
's behavior change makes Decaton to not working, because AssignmentManager
assumes partitions which previously assigned and kept assigned after rebalance as revoked
unintentionally.
trace:
java.lang.NullPointerException: null
at com.linecorp.decaton.processor.metrics.Metrics$AbstractMetrics.close(Metrics.java:84)
at com.linecorp.decaton.processor.metrics.Metrics$SubscriptionMetrics.close(Metrics.java:95)
at com.linecorp.decaton.processor.runtime.ProcessorSubscription.awaitShutdown(ProcessorSubscription.java:424)
at com.linecorp.decaton.processor.runtime.AsyncShutdownable.close(AsyncShutdownable.java:27)
It looks like closed meter
should be removed from meters
List in AbstractMetrics
.
CompactionProcessorTest > testOutputDelayed
fails occasionally with following error:com.linecorp.decaton.processor.processors.CompactionProcessorTest > testOutputDelayed FAILED
java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:87)
at org.junit.Assert.assertTrue(Assert.java:42)
at org.junit.Assert.assertTrue(Assert.java:53)
at com.linecorp.decaton.processor.processors.CompactionProcessorTest.testOutputDelayed(CompactionProcessorTest.java:218)
From changes and advantages of JUnit 5, it's time to bum version to 5.x
It would be useful for precise detection and spot the causing record when DeferredCompletion
leak occurred by user-supplied code.
Hi, decaton team.
I suggest the basic function of Kafka.
Decaton does not support consumer header information. If decaton provides header information, it will be useful to all users.
For example, I need 3 Kafka headers(kafka_replyTopic
, kafka_replyPartition
, kafka_correlationId
) to use the request-reply pattern:))
testMetricsLifecycleManagement fails in #93 (refs: https://github.com/line/decaton/pull/93/checks?check_run_id=2026634635), but it succeeds on my local and seems not related to the changes of #93.
com.linecorp.decaton.processor.metrics.MetricsTest > testMetricsLifecycleManagement FAILED
java.lang.AssertionError: expected:<[MeterId{name='decaton.tasks.scheduling.delay', tags=[tag(partition=1),tag(subscription=abc),tag(topic=topic)]}, MeterId{name='decaton.partition.throttled.time', tags=[tag(partition=1),tag(subscription=abc),tag(topic=topic)]}]> but was:<[MeterId{name='decaton.processor.compaction.compacted.keys', tags=[tag(subscription=subscription)]}, MeterId{name='decaton.processor.processed.time', tags=[tag(partition=0),tag(subpartition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.partition.throttled.time', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.retry.queued.tasks', tags=[tag(subscription=subscription)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=poll),tag(subscription=subsc)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=commit),tag(subscription=subsc)]}, MeterId{name='decaton.tasks.process.duration', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=reload),tag(subscription=subsc)]}, MeterId{name='decaton.partition.paused.time', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.tasks.discarded', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.processor.compaction.compacted', tags=[tag(subscription=subscription)]}, MeterId{name='decaton.tasks.error', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.tasks.scheduling.delay', tags=[tag(partition=1),tag(subscription=abc),tag(topic=topic)]}, MeterId{name='decaton.partition.queue.starved.time', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.tasks.scheduling.delay', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.tasks.processed', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.tasks.pending', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.partition.throttled.time', tags=[tag(partition=1),tag(subscription=abc),tag(topic=topic)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=records),tag(subscription=subsc)]}, MeterId{name='decaton.retry.queueing.failed', tags=[tag(subscription=subscription)]}, MeterId{name='decaton.tasks.complete.duration', tags=[tag(partition=1),tag(subscription=subscriptionId),tag(topic=topic)]}, MeterId{name='decaton.partition.paused', tags=[tag(partition=0),tag(subscription=subscription),tag(topic=topic)]}, MeterId{name='decaton.subscription.process.durations', tags=[tag(section=pause),tag(subscription=subsc)]}, MeterId{name='decaton.tasks.queued', tags=[tag(partition=0),tag(subpartition=0),tag(subscription=subscription),tag(topic=topic)]}]>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:120)
at org.junit.Assert.assertEquals(Assert.java:146)
at com.linecorp.decaton.processor.metrics.MetricsTest.testMetricsLifecycleManagement(MetricsTest.java:53)
When I was verifying the operation using the retry mechanism of Decaton, I found a phenomenon that the retry was not performed in the process that expected the retry.
When sending a normal Decaton Task, we use org.apache.kafka.common.serialization.LongSerializer
to serialize a long value and use it as a key for Kafka Record.
However, when the retry task is sent by com.linecorp.decaton.client.internal.DecatonTaskProducer
, the long type byte string is converted to String once when retrying, and com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer
Will be serialized into a byte string.
Here, if the long value is converted to a byte string and the character is extracted again, the following exception will occur because it is not an ASCII character, and the task will not be sent to the retry topic.
ERROR c.l.d.p.runtime.internal.ProcessPipeline:121 - Uncaught exception thrown by processor
...
org.apache.kafka.common.errors.SerializationException: illegal character: ^ @
Limiting the value used for key to an ASCII string when retrying seems like an unexpected behavior.
The following type of warning occurs many times when using the retry mechanism of decaton.
The configuration 'group.id' was supplied but isn't a known config.
This warning occurs when a producer is given unused properties.
I found that the following code causes this warning.
Properties for a consumer are also used for properties for a retry producer.
I think this behavior is designed to set bootstrap.servers
and so on without settings for retry.
But, more consumer's properties there are, the more warnings may occur.
It might be a good idea to filter consumer's properties in order to set the properties to a retry producer without warnings.
$ cat META-INF/MANIFEST.MF
Manifest-Version: 1.0
Class-Path: client-0.0.40-SNAPSHOT.jar common-0.0.40-SNAPSHOT.jar micr
ometer-core-1.5.1.jar protocol-0.0.40-SNAPSHOT.jar kafka-clients-2.4.
0.jar HdrHistogram-2.1.12.jar LatencyUtils-2.0.3.jar protobuf-java-3.
3.0.jar zstd-jni-1.4.3-1.jar lz4-java-1.6.0.jar snappy-java-1.1.7.3.j
ar slf4j-api-1.7.28.jar
This is due to the gradle shadow jar plugin that we use to build our release jars, but for now there's no easy workaround.
Related: GradleUp/shadow#324
poll()
after transitioned to RUNNINGauto.offset.reset = none
when they migrate their consumer to Decaton from other framework (which likely includes a lot of code changes), to make sure group.id
is configured correctly so can continue consumption from committed offset.
group.id
-mistake by checking the subscription transitioned to RUNNING or not. (auto.offset.reset = none
throws an exception when starts up if group.id doesn't exist, so they assumed the subscription doesn't transition to RUNNING if group.id is not configured properly)SHUTTING_DOWN -> TERMINATED
soon)We often use decaton to process a bunch of targets as a single task for better batching to middleware or downstream.
For example, update 100 users' data, delete 100 linked data when unregistration and so on.
Such kinds of tasks might hit a partial failure due to downstream issues like DB failure, or so on.
e.g. some targets succeeded, but some failed
In such a case, we'd like to retry partially to avoid duplicated operations for successful targets or avoid unnecessary load downstream.
However, the current decaton doesn't support such scenarios in native and we retry the whole task or implement such logic by ourselves touching TaskMetadata
, DecatonTaskRetryQueueingProcessor
and handling defer completion.
Retry the whole task is not preferred, in addition to the above reason, because succeeded targets in the previous attempt might fail in the retry attempt and eventually the task is not considered as succeeded even though each target succeeded eventually.
Can decaton support such retry in ProcessingContext
?
I'd like to do following:
process(ProcessingContext<MyTask> context, MyTask task) {
List<Id> targets = task.getTargetIds();
List<Id> failedTargets = doMyProcess(targets);
if (failedTargets.isNotEmpty() && maxRetryCount > context.metadata().retryCount()) {
context.retry(new MyTask(failedTargets));
}
}
Currently, ProcessingContextImpl
doesn't know how to serialize task to byte[], so to implement this function, we may pass serializer to ProcessingContext
in anyway, or retry signature will be retry(byte[])
, however, it might not a good signature.
There was a report from a user that tells their decaton instance's subscription thread got terminated with logs: ProcessorSubscription .. got exception while consuming, currently assigned: [(topics)...]
.
After some investigation I found that KafkaConsumer#committed
seems to have changed its behavior from "never to throw exception with indefinite timeout" to "throw an exception after request.timeout.ms" in between 1.1.x and the latest.
Since the #committed
call is not guarded with exception handling we have to fix it.
2.1.7.RELEASE
, which is way behind from boot's latest (2.5
)Scenario:
We are consuming Kafka topic using Decaton processor with retrying.
However, ProcessorsBuilder.consuming(String topic, TaskExtractor<T> taskExtractor)
is not working correctly with DefaultTaskExtractor
.
retryTaskExtractor
will unwrap DecatonTaskRequest
using DefaultTaskExtractor
, then taskExtractor.extract()
here.
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76
But if taskExtractor
is DefaultTaskExtractor
or a TaskExctractor
which is delegating deserialization to DefaultTaskExtractor
, deserialization will be failed on retryTaskExtractor
and the retrying task will be discarded.
Stacktrace:
java.lang.IllegalArgumentException: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:45)
at com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor.extract(TimedTaskExtractor.kt:31)
at com.linecorp.decaton.processor.runtime.ProcessorsBuilder.lambda$consuming$1(ProcessorsBuilder.java:83)
at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.extract(ProcessPipeline.java:96)
at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.scheduleThenProcess(ProcessPipeline.java:68)
at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.processTask(ProcessorUnit.java:73)
at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.lambda$putTask$1(ProcessorUnit.java:60)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
at com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:101)
at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:551)
at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipMessage(CodedInputStream.java:649)
at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipField(CodedInputStream.java:581)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1073)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1041)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1638)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1633)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:163)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:209)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:214)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.parseFrom(Decaton.java:1250)
at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:36)
... 9 common frames omitted
We passed original com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor
to taskExtractor
which is like following (written in Kotlin).
This TimedTaskExtractor
is for observing consuming delay.
The issue will cause with delegate = DefaultTaskExtractor
.
class TimedTaskExtractor<T>(
private val delegate: TaskExtractor<T>,
subscription: String,
topic: String,
meterRegistry: MeterRegistry
) : TaskExtractor<T> {
private val timer = meterRegistry.timer(
"decaton.processor.${TimedTaskExtractor::class.simpleName?.toLowerCase()}.timestamp_delay",
Tags.of(Tag.of("topic", topic), Tag.of("subscription", subscription))
)
override fun extract(bytes: ByteArray): DecatonTask<T> {
return delegate.extract(bytes).also {
if (it.metadata().timestampMillis() > 0) {
timer.record(
System.currentTimeMillis() - it.metadata().timestampMillis(),
TimeUnit.MILLISECONDS
)
}
}
}
}
On CentralDogma, it's safer for application tokens to be readonly.
Futhermore, in git mirroring-enabled environments, writing via CD may be overridden by the next git commit.
Proposal
autocreate
to factory; legacy method may keep auto-creating the file by defaultrefs: #95 (comment)
CompactionProcessor
is an exception
subscriptionId
upon CompactionProcessor
instantiation, CompactionProcessor
's metrics are registered every time per task, so AbstractMetrics
isn't useable
CompactionProcessor
constructor to receive subscriptionId
ProcessorsBuilder#thenProcess
overload that receives supplier
of type Function<ScopeInfo, DecatonProcessor>
T
to List<T>
and process them at once. e.g. when downstream-DB supports batching I/O (which often very efficient)In the retrying mechanism of decaton processor, currently, the fixed backoff is only supported.
It is preferable to increase the interval as the number of retries increases rather than the fixed interval.
I guess it is useful to support exponential backoff, fibonacci backoff, and randomized backoff.
We can inject a custom org.apache.kafka.clients.producer.Partitioner
by implementing it and passing it through DecatonClientBuilder#producerConfig
.
However, A decaton client wraps a task set by our application in DecatonTaskRequest
.
So, to determine a partition based on a task in Partitioner
, we should handle a DecatonTaskRequest
(deserialize our defined task. after that proceed partition step) like the following.
public class CustomPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (value instanceof DecatonTaskRequest) {
DecatonTaskRequest request = (DecatonTaskRequest) value;
MyTask task = new ProtocolBuffersDeserializer<>(MyTask.parser())
.deserialize(request.getSerializedTask().toByteArray());
# determine partition
....
return result;
}
return super.partition(topic, key, keyBytes, value, valueBytes, cluster);
}
The above seems redundant and Inefficient due to deserializing a serialized task.
Could we provide a way of specifying a partition in producing a task like the following API?
public class DecatonClientImpl<T> implements DecatonClient<T> {
....
public CompletableFuture<PutTaskResult> put(String key, T task, int partition) {
...
}
The partition that is passed as the argument would be set to TaskMetadataProto
and be used in an internal Partitioner
that determines a partition base on it.
What do you think about this feature?
There was a case of batching consumer that was experiencing low throughput due to too small default max.pending.records caused frequently pause/resume loop and lowering throughput.
We set it conservatively low to avoid causing high memory pressure by default setting expecting the case where average record size is large, but maybe we can default much higher expecting in most cases record size is not so big and there's more likelihood of people get benefit of high throughput with out of the box setup.
max.poll.interval.ms
is set to long value because sometimes task takes long time to complete (Basically Decaton doesn't block poll()
by task processing, but there's an exception in rebalance-listener)onPartitionAssigned
calls partition-0's PartitionProcessor#close
, which waits processor unit's executor to terminate indefinitelyonPartitionAssigned
will be blocked until the task completesonPartitionAssigned
, so cannot send JoinGroup
max.poll.interval.ms
at max"), some users use Decaton for processing long-running tasks, and aborting task "ungracefully" is preferable to waiting task indefinitelyTaskExtractor
, TaskMetadata#timestampMillis
could be set to 0 (i.e. long's default value) if we don't set it explicitly.ProducerRecord#timestamp
from TaskMetadata
ProducerRecord#timestamp=0
This could cause the messages to be deleted on the broker side unexpectedly earlier, since retention-based log deletion is happened based on record's timestamp. (https://kafka.apache.org/documentation/#impl_deletes)
There were several cases in the past that we have noticed some changes like dependency upgrade, fixing bug impacted decaton's performance either in a good way or bad way.
Since it's biggest feature is performance we should have a automated test against its performance that can catch performance regression or unexpected improvement.
Probably after #17
See: #44
The properties files on centraldogma must be written in JSON format for now.
I want to leave some comments on the properties files, but JSON doesn't support comment-out.
So, I hope that decaton-centraldogma supports YAML which supports comment-out.
KafkaConsumer#poll(long)
which could block indefinitely and already marked as deprecated
poll(Duration)
overloadOn decaton 6, observing the following WARNs, when
com.linecorp.decaton.processor.runtime.internal.CommitManager:131
Offset commit failed asynchronously
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
I don't think the end-user needs to know about this, since poll()
is soon going to be called again in ProcessorSubscription.consumeLoop
Though DecatonClient is preferring over DecatonProcess (the docs say) but when I do deal with retry and retry times, Decaton lacks of this information because of the TaskMetadata
does not have any properties to store retry count as the TaskMetadataProto
so that we have to use DecatonProcess
which is more verbose and more codes to write!
So could you consider to update TaskMetadata
?
NPE occurs in DecatonTaskRetryQueueingProcessor#process
if backoff is null.
Should we check if backoff
is null when building RetryConfig
or building Subscription
which is configured for retry processing?
Since #53 ProcessorSubscription.commitCompletedOffsets
will print an ERROR log when the ProcessorSubscription
is run as a Runnable (e.g. submitted to an ExecutorService) rather than a Thread.
The assumption (Thread.currentThread == this
) is not valid in that case.
The log is harmless but noisy.
I am not sure who else uses ProcessorSubscription in this way but it does not seem unreasonable (e.g. to get metrics by thread group) so I think the best way is to just remove the extra check
Non-volatile field CommitManager.asyncCommitInFlight
is updated from
CommitManager.commitAsync
(decaton subscription thread) consumer.commitAsync
callback (kafka background I/O thread)To ensure it is always correctly updated, it should be atomic or at least volatile.
It got out of sync in #99 and for some reason the build did not detect the failure.
Retry producer can be configured to use a different cluster but retry consumer cannot. (shares the consumer with the original one)
While this is saves resources in most cases, it prevents enabling retry feature on a cluster you can't freely create topics on.
CentralDogmaPropertySupplier#register
, the file consists of default values.CentralDogmaPropertySupplier#register
Despite shutting a processor down before async task completes isn't a critical problem thanks to later re-process, it is a common requirement to await all async task's completion to avoid unnecessary errors/warns from middleware clients or so.
close()
will wait indefinitely if closing a ProcessorSubscription
without calling start()
.
This symptom occurs from version 1.1.0
.
If I don't need to start it, I can implement it so that it doesn't create a ProcessorSubscription
, but I thought this symptom was strange.
Here is the simple reproducer
var properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<bootstrap-servers>");
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
var subscription = SubscriptionBuilder.newBuilder("my-decaton-processor").processorsBuilder(
ProcessorsBuilder.consuming("my-decaton-topic", new ProtocolBuffersDeserializer<>(SomeTask.parser())))
.consumerConfig(properties).build();
subscription.close(); // hang-up...
I guess we should have a list of all decaton metrics and a monitoring guide.
It is useful for users to have appropriate monitoring targets.
We want a feature setting a timeout for ProcessingContext.deferCompletion()
to avoid stuck DecatonProcessor
in any case.
Decaton supports async task processing with using deferCompletion()
on processor.
However, async processing usually makes a bug and it might miss to call complete()
.
Finally, it causes critical issue of consuming topics.
If decaton has a timeout for deferCompletion()
, we can avoid such outage in most cases.
Though this "safepoint"-like mechanism was to preserve per-key process ordering guarantee even on concurrency change, at least, we can narrow down the scope of safepoint to per-partition instead of global without violating per-key ordering guarantee.
(Actually, this is a missing long-awaited Decaton feature that necessity exists from early days...)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.