Giter VIP home page Giter VIP logo

kafka's Introduction

Apache Kafka

See our web site for details on the project.

You need to have Java installed.

We build and test Apache Kafka with Java 8, 11 and 15. We set the release parameter in javac and scalac to 8 to ensure the generated binaries are compatible with Java 8 or higher (independently of the Java version used for compilation).

Scala 2.13 is used by default, see below for how to use a different Scala version or all of the supported Scala versions.

Build a jar and run it

./gradlew jar

Follow instructions in https://kafka.apache.org/quickstart

Build source jar

./gradlew srcJar

Build aggregated javadoc

./gradlew aggregatedJavadoc

Build javadoc and scaladoc

./gradlew javadoc
./gradlew javadocJar # builds a javadoc jar for each module
./gradlew scaladoc
./gradlew scaladocJar # builds a scaladoc jar for each module
./gradlew docsJar # builds both (if applicable) javadoc and scaladoc jars for each module

Run unit/integration tests

./gradlew test # runs both unit and integration tests
./gradlew unitTest
./gradlew integrationTest

Force re-running tests without code change

./gradlew cleanTest test
./gradlew cleanTest unitTest
./gradlew cleanTest integrationTest

Running a particular unit/integration test

./gradlew clients:test --tests RequestResponseTest

Running a particular test method within a unit/integration test

./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic
./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testMetadataUpdateWaitTime

Running a particular unit/integration test with log4j output

Change the log4j setting in either clients/src/test/resources/log4j.properties or core/src/test/resources/log4j.properties

./gradlew clients:test --tests RequestResponseTest

Specifying test retries

By default, each failed test is retried once up to a maximum of five retries per test run. Tests are retried at the end of the test task. Adjust these parameters in the following way:

./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=5

See Test Retry Gradle Plugin for more details.

Generating test coverage reports

Generate coverage reports for the whole project:

./gradlew reportCoverage -PenableTestCoverage=true

Generate coverage for a single module, i.e.:

./gradlew clients:reportCoverage -PenableTestCoverage=true

Building a binary release gzipped tar ball

./gradlew clean releaseTarGz

The above command will fail if you haven't set up the signing key. To bypass signing the artifact, you can run:

./gradlew clean releaseTarGz -x signArchives

The release file can be found inside ./core/build/distributions/.

Building auto generated messages

Sometimes it is only necessary to rebuild the RPC auto-generated message data when switching between branches, as they could fail due to code changes. You can just run:

./gradlew processMessages processTestMessages

Cleaning the build

./gradlew clean

Running a task with one of the Scala versions available (2.12.x or 2.13.x)

Note that if building the jars with a version other than 2.13.x, you need to set the SCALA_VERSION variable or change it in bin/kafka-run-class.sh to run the quick start.

You can pass either the major version (eg 2.12) or the full version (eg 2.12.7):

./gradlew -PscalaVersion=2.12 jar
./gradlew -PscalaVersion=2.12 test
./gradlew -PscalaVersion=2.12 releaseTarGz

Running a task with all the scala versions enabled by default

Invoke the gradlewAll script followed by the task(s):

./gradlewAll test
./gradlewAll jar
./gradlewAll releaseTarGz

Running a task for a specific project

This is for core, examples and clients

./gradlew core:jar
./gradlew core:test

Streams has multiple sub-projects, but you can run all the tests:

./gradlew :streams:testAll

Listing all gradle tasks

./gradlew tasks

Building IDE project

Note that this is not strictly necessary (IntelliJ IDEA has good built-in support for Gradle projects, for example).

./gradlew eclipse
./gradlew idea

The eclipse task has been configured to use ${project_dir}/build_eclipse as Eclipse's build directory. Eclipse's default build directory (${project_dir}/bin) clashes with Kafka's scripts directory and we don't use Gradle's build directory to avoid known issues with this configuration.

Publishing the jar for all version of Scala and for all projects to maven

./gradlewAll uploadArchives

Please note for this to work you should create/update ${GRADLE_USER_HOME}/gradle.properties (typically, ~/.gradle/gradle.properties) and assign the following variables

mavenUrl=
mavenUsername=
mavenPassword=
signing.keyId=
signing.password=
signing.secretKeyRingFile=

Publishing the streams quickstart archetype artifact to maven

For the Streams archetype project, one cannot use gradle to upload to maven; instead the mvn deploy command needs to be called at the quickstart folder:

cd streams/quickstart
mvn deploy

Please note for this to work you should create/update user maven settings (typically, ${USER_HOME}/.m2/settings.xml) to assign the following variables

<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
                       https://maven.apache.org/xsd/settings-1.0.0.xsd">
...                           
<servers>
   ...
   <server>
      <id>apache.snapshots.https</id>
      <username>${maven_username}</username>
      <password>${maven_password}</password>
   </server>
   <server>
      <id>apache.releases.https</id>
      <username>${maven_username}</username>
      <password>${maven_password}</password>
    </server>
    ...
 </servers>
 ...

Installing the jars to the local Maven repository

./gradlewAll install

Building the test jar

./gradlew testJar

Determining how transitive dependencies are added

./gradlew core:dependencies --configuration runtime

Determining if any dependencies could be updated

./gradlew dependencyUpdates

Running code quality checks

There are two code quality analysis tools that we regularly run, spotbugs and checkstyle.

Checkstyle

Checkstyle enforces a consistent coding style in Kafka. You can run checkstyle using:

./gradlew checkstyleMain checkstyleTest

The checkstyle warnings will be found in reports/checkstyle/reports/main.html and reports/checkstyle/reports/test.html files in the subproject build directories. They are also printed to the console. The build will fail if Checkstyle fails.

Spotbugs

Spotbugs uses static analysis to look for bugs in the code. You can run spotbugs using:

./gradlew spotbugsMain spotbugsTest -x test

The spotbugs warnings will be found in reports/spotbugs/main.html and reports/spotbugs/test.html files in the subproject build directories. Use -PxmlSpotBugsReport=true to generate an XML report instead of an HTML one.

JMH microbenchmarks

We use JMH to write microbenchmarks that produce reliable results in the JVM.

See jmh-benchmarks/README.md for details on how to run the microbenchmarks.

Common build options

The following options should be set with a -P switch, for example ./gradlew -PmaxParallelForks=1 test.

  • commitId: sets the build commit ID as .git/HEAD might not be correct if there are local commits added for build purposes.
  • mavenUrl: sets the URL of the maven deployment repository (file://path/to/repo can be used to point to a local repository).
  • maxParallelForks: limits the maximum number of processes for each task.
  • ignoreFailures: ignore test failures from junit
  • showStandardStreams: shows standard out and standard error of the test JVM(s) on the console.
  • skipSigning: skips signing of artifacts.
  • testLoggingEvents: unit test events to be logged, separated by comma. For example ./gradlew -PtestLoggingEvents=started,passed,skipped,failed test.
  • xmlSpotBugsReport: enable XML reports for spotBugs. This also disables HTML reports as only one can be enabled at a time.
  • maxTestRetries: the maximum number of retries for a failing test case.
  • maxTestRetryFailures: maximum number of test failures before retrying is disabled for subsequent tests.
  • enableTestCoverage: enables test coverage plugins and tasks, including bytecode enhancement of classes required to track said coverage. Note that this introduces some overhead when running tests and hence why it's disabled by default (the overhead varies, but 15-20% is a reasonable estimate).

Dependency Analysis

The gradle dependency debugging documentation mentions using the dependencies or dependencyInsight tasks to debug dependencies for the root project or individual subprojects.

Alternatively, use the allDeps or allDepInsight tasks for recursively iterating through all subprojects:

./gradlew allDeps

./gradlew allDepInsight --configuration runtime --dependency com.fasterxml.jackson.core:jackson-databind

These take the same arguments as the builtin variants.

Running system tests

See tests/README.md.

Running in Vagrant

See vagrant/README.md.

Contribution

Apache Kafka is interested in building the community; we would welcome any thoughts or patches. You can reach us on the Apache mailing lists.

To contribute follow the instructions here:

kafka's People

Contributors

bbejeck avatar becketqin avatar cadonna avatar chia7712 avatar cmccabe avatar dajac avatar dguy avatar enothereska avatar ewencp avatar granthenke avatar guozhangwang avatar gwenshap avatar hachikuji avatar huxihx avatar ijuma avatar jkreps avatar junrao avatar kkonstantine avatar lindong28 avatar mimaison avatar mjsax avatar nehanarkhede avatar omkreddy avatar rajinisivaram avatar rhauch avatar showuon avatar stanislavkozlovski avatar tombentley avatar vahidhashemian avatar vvcephei avatar

Stargazers

 avatar  avatar

kafka's Issues

Build failure for 2.8.x-tiered-storage

Run: ./gradlew build

Observe the following:

> Task :core:compileTestScala
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:27: Unused import
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:28: Unused import
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala:91: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:160: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:1059: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:1094: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:1126: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala:105: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala:87: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala:384: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:2177: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:2229: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:2292: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:50: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala:340: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:1358: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:1385: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:1394: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala:344: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala:52: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala:39: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala:41: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala:47: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala:119: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:1459: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala:46: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala:82: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala:127: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala:167: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala:198: @nowarn annotation does not suppress any warnings
[Error] /local/home/diviv/oss/kafka/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala:29: @nowarn annotation does not suppress any warnings
31 errors found
FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':core:compileTestScala'.
> Compilation failed

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.8.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD FAILED in 4m 24s
378 actionable tasks: 286 executed, 92 up-to-date

Add a TierLag metric to track the difference in offsets between the highest tiered offset and the LEO of a partition.

Introduce a metric type TotalTierLag to measure the number of records in the non-active segments of a topic-partition which are not yet uploaded to the remote storage. This metric can be used for instance:

  • To monitor delays in the propagation of data from the local to remote storage and be alerted when a certain threshold is breached.
  • To assess the performance of data tiering and fine tune offload parameters and rate limits.

The metric is currently defined at topic-broker level rather than for each individual partition of a topic to limit the number of metrics and MBeans generated by this metric type.

MBean

kafka.server:name=TotalTierLag,topic=<topic name>,type=BrokerTopicTierLagMetrics

Unit test

In order to validate the tier lag calculation, a segment upload is exercised via RLMTask#copyLogSegmentsToRemote(). The following logs are present in the local storage before upload:

  • SegmentToUpload
  • SegmentNotToUpload
  • ActiveSegment

in the following configuration:

                  SegmentToUpload      SegmentNotToUpload      ActiveSegment

... 110| | 111 ... 159 | 160 161 ... 169 | 170 180


               ELO                        LSO                            LEO
                                     <------ TIER LAG ---->

.............. ..................................................................
Tiered Log Local log
SegmentNotToUpload contains the LSO and is not uploaded. ActiveSegment is not uploaded. Only SegmentToUpload is uploaded. The tier lag at the end of the execution of the RLM task is expected to be 169 - 159 = 10. On the diagram, the tier lag after upload of SegmentToUpload is represented.

The unit test in its current state is not readable. A refactoring of the RemoteLogManager is required to make it testable.

Fix flaky test DeleteTopicWithSecondaryStorageTest > executeTieredStorageTest()

DeleteTopicWithSecondaryStorageTest > executeTieredStorageTest() FAILED
    org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
        at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
        at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
        at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35)
        at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162)
        at kafka.tiered.storage.ExpectEmptyRemoteStorageAction.doExecute(TieredStorageTestSpec.scala:522)
        at kafka.tiered.storage.TieredStorageTestAction.execute(TieredStorageTestSpec.scala:110)
        at kafka.tiered.storage.TieredStorageTestAction.execute$(TieredStorageTestSpec.scala:108)
        at kafka.tiered.storage.ExpectEmptyRemoteStorageAction.execute(TieredStorageTestSpec.scala:517)
        at kafka.tiered.storage.TieredStorageTestHarness.$anonfun$executeTieredStorageTest$3(TieredStorageTestHarness.scala:125)
        at kafka.tiered.storage.TieredStorageTestHarness.$anonfun$executeTieredStorageTest$3$adapted(TieredStorageTestHarness.scala:125)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
        at kafka.tiered.storage.TieredStorageTestHarness.$anonfun$executeTieredStorageTest$2(TieredStorageTestHarness.scala:125)
        at kafka.tiered.storage.TieredStorageTestHarness.$anonfun$executeTieredStorageTest$2$adapted(TieredStorageTestHarness.scala:125)
        at scala.Option.foreach(Option.scala:437)
        at kafka.tiered.storage.TieredStorageTestHarness.executeTieredStorageTest(TieredStorageTestHarness.scala:125)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
        at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
        at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
        at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
        at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)

This occurred with HEAD for 2.8.x-tiered-storage branch at f86ae5c

Replica is synced with incorrect (Global)logStartOffset

Scenario
Leader has a TopicPartition which is partially offloaded to RemoteStorage. This would result in leader having different values for LocalLogStartOffset and (Global)LogStartOffset). When a new replica is created, it tries to sync from the leader at the ReplicaFetcherThread.buildRemoteLogAuxState() function. This function calls truncateFullyAndStartAt which should set the correct offsets for the topicPartitions in the replica.

Expectation
LocalLogStartOffset for the replica topic partition should be equal to the LocalLogStartOffset and (Global)LogStartOffset) should be equal to (Global)LogStartOffset).

Reality (bug)
(Global)LogStartOffset) for the replica topic partition is equal to LocalLogStartOffset of the leader. This occurs because the logOffset passed to Log.truncateFullyAndStartAt from ReplicaFetcherThread.buildRemoteLogAuxState is incorrect passed as Leader's leaderLocalLogStartOffset instead of leaderLogStartOffset

Result
In case the leader goes down, the replica becomes the new leader with an incorrect (Global)LogStartOffset) and hence, consumer will not be able to read valid data stored in RemoteStorage.

Incorrect topic Id due to race condition

When we enable tiered storage at a topic level which was created with a disabled mode, ConfigHandler attempts to invoke onLeadershipChange and here the topic ids are not present (yet).

replicaManager.remoteLogManager.foreach(rlm =>
        rlm.onLeadershipChange(leaderPartitions.toSet, followerPartitions.toSet, topicIds))

As a result, topicid defaults to something like the following:

[testng] [2022-03-14 23:26:16,885] WARN Previous cached topic id 4tN_eJuTTrSE3KMT8m2aKA for PhilCollins-0 does not match update topic id AAAAAAAAAAAAAAAAAAAAAA (kafka.log.remote.RemoteLogManager)
AAAAAAAAAAAAAAAAAAAAAA-0-0 --> 4tN_eJuTTrSE3KMT8m2aKA-0-0

This problem happens because the ConfigHandler relies on Log.scala -> Partition metadata file to be initialized with topic ids via LeaderAndISRRequest but in reality this is not the case. The Partition metadata file which stores the topic ids in all brokers are empty when this lookup is made.

The topicids are populated by the controller via LeaderAndISRRequest and this arrives after the ConfigHandler call to rlm.onLeadershipChange:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-PartitionMetadatafile

PrimaryConsumerTask crashes when seeking consumer for metadata partition not assigned to it

There is a bug in the PrimaryConsumerTask that causes the task to error, thus causing subsequent updates to any RemoteLogSegment to fail.

Consider this sequence of events -

  1. On initialization, the PrimaryConsumerTask reads a mapping between partition number and consumed offset for the __remote_log_metadata topic from the flat file _rlmm_committed_offsets. It loads this into the readOffsetsByPartition map. Let us say this is equal to {0: 451, 2: 50}
  2. Now, let us say that a new remote topic test1 is created with a single partition on broker 1. It is mapped to the metadata partition 1.
  3. PrimaryConsumerTask now relies on SecondaryConsumerTask to read this partition, then call the callback resumePartitionsForPrimaryConsumption.
  4. The SecondaryConsumerTask consumes all (7) records from metadata partition 1, now calls resumePartitionsForPrimaryConsumption with the newPartitions argument as test1-1, and consumedPartitionToOffsets as {0: 451, 1: 7, 2: 50}. This class uses the metadataPartitionToConsumedOffsets passed to it to construct this second parameter.
  5. In the executeReassignmentAndSeek method, the consumer is assigned to the metadata partition 1 (https://github.com/satishd/kafka/blob/2.8.x-tiered-storage/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/PrimaryConsumerTask.java#L285).
  6. On this line (https://github.com/satishd/kafka/blob/2.8.x-tiered-storage/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/PrimaryConsumerTask.java#L285), it will iterate through all the partition offset mappings in offsetsByPartition, and try to seek the consumer for the partition 0. Since the consumer isn't assigned to partition 0, it will throw an exception.

Sample error log:

2022/05/27 21:32:09.078 INFO [KafkaConsumer] [RLMMConsumerTask] [kafka-server] [] [Consumer clientId=__remote_log_metadata_client_15_consumer _secondary, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2022/05/27 21:32:09.078 INFO [PrimaryConsumerTask] [RLMMConsumerTask] [kafka-server] [] Reassigning partitions to consumer task [[__remote_log_metadata-12, __remote_log_metadata-6, __remote_log_metadata-2]]
2022/05/27 21:32:09.078 INFO [KafkaConsumer] [RLMMConsumerTask] [kafka-server] [] [Consumer clientId=__remote_log_metadata_client_15_consumer _primary, groupId=null] Subscribed to partition(s): __remote_log_metadata-12, __remote_log_metadata-6, __remote_log_metadata-2
2022/05/27 21:32:09.078 ERROR [PrimaryConsumerTask] [RLMMConsumerTask] [kafka-server] [] Error occurred in consumer task, close:[false]
java.lang.IllegalStateException: No current assignment for partition __remote_log_metadata-0
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) ~[kafka-clients-3.0.1-SNAPSHOT.jar:?]
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386) ~[kafka-clients-3.0.1-SNAPSHOT.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1634) ~[kafka-clients-3.0.1-SNAPSHOT.jar:?]
        at org.apache.kafka.server.log.remote.metadata.storage.PrimaryConsumerTask.lambda$executeReassignmentAndSeek$3(PrimaryConsumerTask.java:287) ~[kafka-storage-3.0.1-SNAPSHOT.jar:?]
        at java.util.HashMap.forEach(HashMap.java:1337) ~[?:?]
        at org.apache.kafka.server.log.remote.metadata.storage.PrimaryConsumerTask.executeReassignmentAndSeek(PrimaryConsumerTask.java:286) ~[kafka-storage-3.0.1-SNAPSHOT.jar:?]
        at org.apache.kafka.server.log.remote.metadata.storage.PrimaryConsumerTask.resumePartitionsForPrimaryConsumption(PrimaryConsumerTask.java:200) ~[kafka-storage-3.0.1-SNAPSHOT.jar:?]
        at org.apache.kafka.server.log.remote.metadata.storage.SecondaryConsumerTask.maybeConsumeFromSecondaryConsumer(SecondaryConsumerTask.java:151) ~[kafka-storage-3.0.1-SNAPSHOT.jar:?]
        at org.apache.kafka.server.log.remote.metadata.storage.PrimaryConsumerTask.run(PrimaryConsumerTask.java:166) [kafka-storage-3.0.1-SNAPSHOT.jar:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
2022/05/27 21:32:09.078 INFO [PrimaryConsumerTask] [RLMMConsumerTask] [kafka-server] [] Closing the consumer instances
2022/05/27 21:32:09.079 INFO [Metrics] [RLMMConsumerTask] [kafka-server] [] Metrics scheduler closed
2022/05/27 21:32:09.079 INFO [Metrics] [RLMMConsumerTask] [kafka-server] [] Closing reporter org.apache.kafka.common.metrics.JmxReporter
2022/05/27 21:32:09.079 INFO [Metrics] [RLMMConsumerTask] [kafka-server] [] Metrics reporters closed

Proposed solution

Instead of trying to seek the consumer for every partition in the offsetsByPartition map, it should first check if that partition is one of the partitions that the consumer is assigned to.

Computing the number of segments to delete

Problem statement

In TieredStorage, users configure remote segment deletion by time, by size, or both. In order to delete remote segments by size, we need to find the size to delete which is the total_log_size - retention_size.

Without changing the API, computing the precise total size requires us to scan through all the remote segment metadata and thus takes O(num_remote_segments). This is a problem: we expect num_remote_segments to be large since the main feature of tiered storage is storing a large amount of data.

Segment offloads and segment deletions are run together in the same task and a fixed size thread pool is shared among all topic-partitions, so we have the following chain of impact that may result in loss of availability:

Calculating total_log_size is slow
→ Segment deletion is slow and threads in the thread pool are busy with segment deletions
→ Segment offloads are delayed (since they run together with deletions)
→ Local disk fills up, since local deletion requires the segment to be offloaded
→ If local disk is completely full, Kafka fails

Note:

  1. There is no direct impact on produce/consume since segment offload and deletion happens in a background thread.
  2. This only affects leader-partitions because offload and deletion only happen on leader-partitions.
  3. This also impacts the become-follower path as there are more local logs to copy, but that’s a minor issue

Fix the implementation of findOffsetByTimestamp in RemoteLogManager

Currently, the method RemoteLogManager#findOffsetByTimestamp does not enforce lineage correctness when finding the finding the offset associated to a particular timestamp. This can lead to the wrong offset being provided by the RemoteLogManager, and the wrong segment metadata to be selected thereafter. There is no way for the client of this method to know which lineage was selected, which makes it fail silently and ultimately lead consumers to read invalid data.

Furthermore, this implementation is currently inefficient as it requires a linear search over all the segment metadata of a topic-partition.

It seems to rely on an implicit ordering of offsets and timestamps while iterating through the metadata and choose the first metadata which verifies end offset > search offset and max timestamp > search timestamp, but such ordering does not exist without assumption on the durability configuration of Kafka.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.