Giter VIP home page Giter VIP logo

automq / automq Goto Github PK

View Code? Open in Web Editor NEW
1.9K 1.9K 88.0 171.37 MB

AutoMQ is a cloud-native fork of Kafka by separating storage to S3 and EBS. 10x cost-effective. Autoscale in seconds. Single-digit ms latency.

Home Page: https://www.automq.com/docs

Shell 0.24% Batchfile 0.06% Java 79.22% Scala 18.27% HTML 0.01% XSLT 0.01% Python 2.09% Roff 0.06% Dockerfile 0.03%
cloud cloud-economics cloud-first cloud-native ebs kafka messaging minio s3 serverless spot storage streaming

automq's Introduction

AutoMQ: A Cloud-Native fork of Kafka by separating storage to S3 and EBS

GitHub release (with filter) Twitter URL


🍵 AutoMQ vs Other Streaming Platforms

Feature AutoMQ Apache Kafka Confluent Apache Pulsar Redpanda Warpstream
Apache Kafka Compatibility[1] Native Kafka Non-Kafka Kafka Protocol
Source Code Availability Yes Yes No Yes Yes No
Stateless Broker Yes No No Yes No Yes
Publisher Latency(P99) Single-digit ms latency > 620ms
Continuous Self-Balancing Yes No Yes Yes Yes Yes
Scale in/out In seconds In hours/days In hours In hours
(scale-in);
In seconds
(scale-out)
In hours
In seconds (Enterprise Only)
In seconds
Spot Instance Support Yes No No No No Yes
Partition Reassignment In seconds In hours/days In hours In seconds In hours
In seconds (Enterprise Only)
In seconds
Component Broker Broker
Zookeeper
(Non-KRaft)
Broker
Zookeeper
Bookkeeper
Proxy(Optional)
Broker Agent
MetadataServer
Durability Guaranteed by S3/EBS[2] Guaranteed by ISR Guaranteed by Bookkeeper Guaranteed by Raft Guaranteed by S3
Inter-AZ Networking Fees No Yes No

[1] Apache Kafka Compatibility's definition is coming from this blog.

[2] EBS Durability: On Azure, GCP, and Alibaba Cloud, Regional EBS replicas span multiple AZs. On AWS, ensure durability by double writing to EBS and S3 Express One Zone in different AZs.

🔶 Why AutoMQ

  • Cost effective: The first true cloud-native streaming storage system, designed for optimal cost and efficiency on the cloud. Refer to this report to see how we cut Apache Kafka billing by 90% on the cloud.
  • High Reliability: Leverage cloud-shared storage services(EBS and S3) to achieve zero RPO, RTO in seconds and 99.999999999% durability.
  • Serverless:
    • Auto Scaling: Monitor cluster metrics and automatically scale in/out to align with your workload, enabling a pay-as-you-go model.
    • Scaling in seconds: The computing layer (broker) is stateless and can scale in/out within seconds, making AutoMQ a truly serverless solution.
    • Infinite scalable: Utilize cloud object storage as the primary storage solution, eliminating concerns about storage capacity.
  • Manage-less: The built-in auto-balancer component automatically schedules partitions and network traffic between brokers, eliminating manual partition reassignment.
  • High performance:
    • Low latency: Accelerate writing with high-performance EBS as WAL, achieving single-digit millisecond latency.
    • High throughput: Leverage pre-fetching, batch processing, and parallel technologies to maximize the capabilities of cloud object storage.

    Refer to the AutoMQ Performance White Paper to see how we achieve this.

  • A superior alternative to Apache Kafka: 100% compatible with Apache Kafka and does not lose any key features, but cheaper and better.

✨Architecture

image

AutoMQ adopts a Shared-Storage architecture, replacing the storage layer of Apache Kafka with a shared streaming storage library called S3Stream in a storage-compute separation manner, making the Broker completely stateless.

Compared to the classic Kafka Shared-Nothing or Tiered-Storage architectures, AutoMQ's computing layer (Broker) is truly stateless, enabling features such as Auto-Scaling, Self-Balancing, and Partition Reassignment in Seconds that significantly reduce costs and improve efficiency.

⛄ Get started with AutoMQ

Deploy Locally on a Single Host

curl https://download.automq.com/community_edition/standalone_deployment/install_run.sh | bash

The easiest way to run AutoMQ. You can experience features like Partition Reassignment in Seconds and Continuous Self-Balancing in your local machine. Learn more

There are more deployment options available:

💬 Community

You can join the following groups or channels to discuss or ask questions about AutoMQ:

👥 How to contribute

If you've found a problem with AutoMQ, please open a GitHub Issues. To contribute to AutoMQ please see Code of Conduct and Contributing Guide. We have a list of good first issues that help you to get started, gain experience, and get familiar with our contribution process. To claim one, simply reply with 'pick up' in the issue and the AutoMQ maintainers will assign the issue to you. If you have any questions about the 'good first issue' please feel free to ask. We will do our best to clarify any doubts you may have.

🙋 Contact Us

Want to learn more, Talk with our product experts.

automq's People

Contributors

bbejeck avatar c0urante avatar cadonna avatar chia7712 avatar chillax-0v0 avatar cmccabe avatar dajac avatar dengziming avatar dguy avatar enothereska avatar ewencp avatar gharris1727 avatar granthenke avatar guozhangwang avatar hachikuji avatar ijuma avatar jolshan avatar jsancio avatar junrao avatar mimaison avatar mjsax avatar mumrah avatar nehanarkhede avatar omkreddy avatar rajinisivaram avatar scnieh avatar showuon avatar superhx avatar vahidhashemian avatar vvcephei avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

automq's Issues

[bug] ApiKey GET_OPENING_STREAMS is not currently handled in `request`

kafka node failed to startup.

The bug can be reproduced if log4j.properties enables logging requests.

server.log:

[2023-09-12 17:50:34,461] ERROR Closing socket for 127.0.0.1:9592-127.0.0.1:55954-13 because of error (kafka.network.Processor)
java.lang.IllegalStateException: ApiKey GET_OPENING_STREAMS is not currently handled in `request`, the code should be updated to do so.
	at kafka.network.RequestConvertToJson$.request(RequestConvertToJson.scala:98)
	at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:106)
	at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:1096)
	at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647)
	at kafka.network.Processor.processCompletedReceives(SocketServer.scala:1074)
	at kafka.network.Processor.run(SocketServer.scala:960)
	at java.base/java.lang.Thread.run(Thread.java:833)

The error is due to missing xxxxRequestDataJsonConverter in kafka.network.RequestConvertToJson#request.

EBS WAL recover from crash data lost

How to reproduce:

  1. Launch kafka server
  2. Use the console producer to send several messages
  3. Kill -9 the server
  4. Reboot the server
  5. Use the console the consumer tries to receive, but it is empty.

Accelerate Log close speed

ElasticLog close step:

  1. Mark clean shutdown in metadata.
  2. Append metadata to meta stream.
  3. Close meta stream and other streams. => trigger uploading WAL to S3

When multiple ElasticLog sequential closes exist, each finish will generate a new WAL object and the total close time cost is too high.

consuming offset reseted to 0 after broker restart

How to reproduce

  • produce and consume some messages; (committed offset moved to non-zero value here)
  • restart the broker;
  • produce other messages and then consume;

At last, the consumer still consumed messages from offset 0.

consumer log after broker restarting

[23-09-18 11:03:03.140] [ INFO] -.-.-.-.-.AppInfoParserlr[.a5.k5.c5.u5.AppInfoParserlr: Kafka version: 3.4.0 [org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:119)]
[23-09-18 11:03:03.140] [ INFO] -.-.-.-.-.AppInfoParserlr[.a5.k5.c5.u5.AppInfoParserlr: Kafka commitId: 2e1947d240607d53 [org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:120)]
[23-09-18 11:03:03.140] [ INFO] -.-.-.-.-.AppInfoParserlr[.a5.k5.c5.u5.AppInfoParserlr: Kafka startTimeMs: 1695006183139 [org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:121)]
[23-09-18 11:03:03.141] [ INFO] -.-.-.-.-.KafkaConsumerlr[.a5.k5.c5.c5.KafkaConsumerlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Subscribed to topic(s): test_topic [org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:969)]
[23-09-18 11:03:03.268] [ INFO] -.-.-.-.Metadatalr[.a5.k5.c5.Metadatalr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Resetting the last seen epoch of partition test_topic-0 to 2 since the associated topicId changed from null to BRuS_MZ3TXi_gIv5hPtJeg [org.apache.kafka.clients.Metadata.updateLatestMetadata(Metadata.java:402)]
[23-09-18 11:03:03.270] [ INFO] -.-.-.-.Metadatalr[.a5.k5.c5.Metadatalr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Cluster ID: HLmZ1l4pScybLkyK-xg5_g [org.apache.kafka.clients.Metadata.update(Metadata.java:287)]
[23-09-18 11:03:03.272] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null) [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:896)]
[23-09-18 11:03:03.273] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] (Re-)joining group [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:566)]
[23-09-18 11:03:03.292] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Request joining group due to: need to re-join with the given member-id: consumer-ConsumerGroup-Dev-1-61ae3112-029d-4bb0-ac01-7e90a68950ee [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.requestRejoin(AbstractCoordinator.java:1062)]
[23-09-18 11:03:03.292] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.requestRejoin(AbstractCoordinator.java:1062)]
[23-09-18 11:03:03.292] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] (Re-)joining group [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:566)]
[23-09-18 11:03:06.316] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Successfully joined group with generation Generation{generationId=1, memberId='consumer-ConsumerGroup-Dev-1-61ae3112-029d-4bb0-ac01-7e90a68950ee', protocol='range'} [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:627)]
[23-09-18 11:03:06.318] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Finished assignment for group at generation 1: {consumer-ConsumerGroup-Dev-1-61ae3112-029d-4bb0-ac01-7e90a68950ee=Assignment(partitions=[test_topic-0])} [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:710)]
[23-09-18 11:03:06.362] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Successfully synced group in generation Generation{generationId=1, memberId='consumer-ConsumerGroup-Dev-1-61ae3112-029d-4bb0-ac01-7e90a68950ee', protocol='range'} [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:802)]
[23-09-18 11:03:06.363] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Notifying assignor about the new Assignment(partitions=[test_topic-0]) [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:305)]
[23-09-18 11:03:06.364] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Adding newly assigned partitions: test_topic-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:317)]
[23-09-18 11:03:06.371] [ INFO] -.-.-.-.-.-.ConsumerCoordinatorlr[.a5.k5.c5.c5.i5.ConsumerCoordinatorlr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Found no committed offset for partition test_topic-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetFetchResponseHandler.handle(ConsumerCoordinator.java:1543)]
[23-09-18 11:03:06.380] [ INFO] -.-.-.-.-.-.SubscriptionStatelr[.a5.k5.c5.c5.i5.SubscriptionStatelr: [Consumer clientId=consumer-ConsumerGroup-Dev-1, groupId=ConsumerGroup-Dev] Resetting offset for partition test_topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=2}}. [org.apache.kafka.clients.consumer.internals.SubscriptionState.maybeSeekUnvalidated(SubscriptionState.java:399)]
[23-09-18 11:03:06.433] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (0, Message_0) at offset 0 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (2, Message_2) at offset 1 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (4, Message_4) at offset 2 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (6, Message_6) at offset 3 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (8, Message_8) at offset 4 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (10, Message_10) at offset 5 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (12, Message_12) at offset 6 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (14, Message_14) at offset 7 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (16, Message_16) at offset 8 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (18, Message_18) at offset 9 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (0, Message_0) at offset 10 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (2, Message_2) at offset 11 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (4, Message_4) at offset 12 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (6, Message_6) at offset 13 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (8, Message_8) at offset 14 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.434] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (10, Message_10) at offset 15 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.435] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (12, Message_12) at offset 16 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.435] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (14, Message_14) at offset 17 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.435] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (16, Message_16) at offset 18 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]
[23-09-18 11:03:06.435] [ INFO] -.-.ConsumerDemolr[.e5.ConsumerDemolr: ConsumerGroup-Dev received message : from topicPartition test_topic-0, (18, Message_18) at offset 19 [org.example.ConsumerDemo.runConsumer(ConsumerDemo.java:59)]

Error when trimming streams

logs in server.log:

[2023-09-14 20:29:58,645] ERROR Unexpected error while trimming stream: TrimStreamRequestData(streamId=40, streamEpoch=0, brokerId=1, newStartOffset=248), code: OFFSET_NOT_MATCHED (kafka.log.s3.streams.ControllerStreamManager)
[2023-09-14 20:29:58,645] ERROR Trim stream[40] (new offset = 248) failed, retry later (kafka.log.es.AlwaysSuccessClient)
org.apache.kafka.common.errors.s3.OffsetNotMatchedException: The offset is not matched.

logs in controller.log:

[2023-09-14 20:29:58,644] WARN [Controller 1] [TrimStream]: stream 40's new start offset 248 is larger than current range's end offset 0 (org.apache.kafka.controller.stream.StreamControlManager)

ByteBuf LEAK

[2023-09-11 20:30:16,526] ERROR LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:96)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	kafka.log.s3.operator.DefaultS3Operator.rangeRead(DefaultS3Operator.java:112)
	kafka.log.s3.operator.S3Operator.rangeRead(S3Operator.java:41)
	kafka.log.s3.ObjectReader.asyncGetBasicObjectInfo0(ObjectReader.java:73)
	kafka.log.s3.ObjectReader.asyncGetBasicObjectInfo(ObjectReader.java:69)
	kafka.log.s3.ObjectReader.<init>(ObjectReader.java:52)
	kafka.log.s3.cache.DefaultS3BlockCache.read0(DefaultS3BlockCache.java:66)
	kafka.log.s3.cache.DefaultS3BlockCache.lambda$read0$1(DefaultS3BlockCache.java:108)
	java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
	java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	kafka.log.s3.operator.DefaultS3Operator.lambda$rangeRead0$1(DefaultS3Operator.java:122)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
	java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
	java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
	java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177)
	java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
	java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:238)
	software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
	java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
	java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	java.base/java.lang.Thread.run(Thread.java:833) (io.netty.util.ResourceLeakDetector)
[2023-09-11 20:30:16,527] ERROR LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:96)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	kafka.log.s3.operator.DefaultS3Operator.rangeRead(DefaultS3Operator.java:112)
	kafka.log.s3.operator.S3Operator.rangeRead(S3Operator.java:41)
	kafka.log.s3.ObjectReader.asyncGetBasicObjectInfo0(ObjectReader.java:73)
	kafka.log.s3.ObjectReader.asyncGetBasicObjectInfo(ObjectReader.java:69)
	kafka.log.s3.ObjectReader.<init>(ObjectReader.java:52)
	kafka.log.s3.cache.DefaultS3BlockCache.read0(DefaultS3BlockCache.java:66)
	kafka.log.s3.cache.DefaultS3BlockCache.read(DefaultS3BlockCache.java:51)
	kafka.log.s3.S3Storage.read0(S3Storage.java:143)
	kafka.log.s3.S3Storage.lambda$read$1(S3Storage.java:134)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	java.base/java.lang.Thread.run(Thread.java:833) (io.netty.util.ResourceLeakDetector)

Log level & isXXEnable

Search all LOGGER.debug in s3stream module and add LOGGER.isDebugEnabled() check for them

Commit WAL object NPE when has stream split

java.lang.NullPointerException: Cannot invoke "java.util.List.size()" because "this.sourceObjectIds" is null
        at org.apache.kafka.common.message.CommitWALObjectRequestData$StreamObject.addSize(CommitWALObjectRequestData.java:685)
        at org.apache.kafka.common.message.CommitWALObjectRequestData.addSize(CommitWALObjectRequestData.java:210)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:112)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104)
        at kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
        at kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)
        at kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)
        at kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:421)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

We don't need sourceObjectsIds in StreamObject.

Controller NPE cause controller re-election

[2023-09-13 14:58:09,396] WARN [Controller 1] checkS3ObjectsLifecycle: failed with unknown server exception NullPointerException at epoch 94 in 187 us.  Renouncing leadership and reverting to the last committed offset 13094. (org.apache.kafka.controller.QuorumController)
java.lang.NullPointerException
	at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:178)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.concurrent.LinkedBlockingDeque.forEachFrom(LinkedBlockingDeque.java:1325)
	at java.base/java.util.concurrent.LinkedBlockingDeque$LBDSpliterator.forEachRemaining(LinkedBlockingDeque.java:1257)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
	at org.apache.kafka.controller.stream.S3ObjectControlManager.checkS3ObjectsLifecycle(S3ObjectControlManager.java:244)
	at org.apache.kafka.controller.QuorumController.lambda$checkS3ObjectsLifecycle$32(QuorumController.java:2259)
	at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)
	at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:201)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:175)

Repeating log:

[2023-09-13 14:52:14,736] INFO [BrokerToControllerChannelManager broker=1 name=heartbeat]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-09-13 14:52:14,737] INFO [BrokerToControllerChannelManager broker=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
[2023-09-13 14:52:14,737] INFO [BrokerToControllerChannelManager broker=1 name=heartbeat]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)

Gracefully shutdown still has many scan exception.

[2023-09-12 11:11:09,672] INFO recoverEntireWALAndCorrectWALHeader ReadRecordException: readRecord Exception: readRecordHeader.getMagicCode()[0] != RecordHeaderMagicCode[-2023406815], recoverStartOffset: 17510400, meetIllegalRecordTimes: 3904, recoverRemainingBytes: 790629 (kafka.log.s3.wal.BlockWALService)

node can not shut down after deleting all topics

How to reproduce

  1. produce and consume some messages;
  2. delete all topics;
bin/kafka-topics.sh --delete --topic ".*" --bootstrap-server localhost:9092
  1. shutdown the node;

server.log

[2023-09-13 10:54:28,947] INFO [ReplicaManager broker=1] Shut down completely (kafka.server.ReplicaManager)
[2023-09-13 10:54:28,947] INFO [BrokerToControllerChannelManager broker=1 name=alterPartition]: Shutting down (kafka.server.BrokerToControllerRequestThread)
[2023-09-13 10:54:28,947] INFO [BrokerToControllerChannelManager broker=1 name=alterPartition]: Stopped (kafka.server.BrokerToControllerRequestThread)
[2023-09-13 10:54:28,947] INFO [BrokerToControllerChannelManager broker=1 name=alterPartition]: Shutdown completed (kafka.server.BrokerToControllerRequestThread)
[2023-09-13 10:54:28,948] INFO Broker to controller channel manager for alterPartition shutdown (kafka.server.BrokerToControllerChannelManagerImpl)
[2023-09-13 10:54:28,948] INFO Shutting down. (kafka.log.LogManager)
[2023-09-13 10:54:28,950] INFO [UnifiedLog partition=__consumer_offsets-29, dir=/tmp/kraft-combined-logs] Closing log (kafka.log.es.ElasticUnifiedLog)
[2023-09-13 10:54:28,952] INFO [ElasticLog partition=__consumer_offsets-29 epoch=0] [ElasticLog partition=__consumer_offsets-29 epoch=0] save partition meta ElasticPartitionMeta{startOffset=0, cleanerOffset=0, recoverOffset=0, cleanedShutdown=true} (kafka.log.es.ElasticLog)
[2023-09-13 10:54:29,427] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:29,928] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:30,432] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:30,931] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:31,435] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:31,942] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:32,412] INFO flushWALHeader success, position: 0, walHeader: WALHeader{magicCode=305419896, capacity=2147483648, trimOffset=0, lastWriteTimestamp=1694573672412, nextWriteOffset=1499236, slidingWindowMaxLength=1048576, shutdownType=UNGRACEFULLY, crc=1849378678} (kafka.log.s3.wal.BlockWALService)
[2023-09-13 10:54:32,443] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:32,949] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:33,460] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:33,960] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:34,466] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:34,970] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:35,479] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:35,987] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:36,484] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:36,988] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:37,491] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:37,998] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:38,503] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:39,001] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:39,508] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:40,009] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:40,515] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:41,017] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:41,525] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:42,030] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:42,410] INFO [CompactionManager id=1] Compaction started (kafka.log.s3.compact.CompactionManager)
[2023-09-13 10:54:42,414] INFO [CompactionManager id=1] Get 0 WAL objects from metadata (kafka.log.s3.compact.CompactionManager)
[2023-09-13 10:54:42,415] INFO [CompactionManager id=1] 0 WAL objects to be force split (kafka.log.s3.compact.CompactionManager)
[2023-09-13 10:54:42,415] INFO flushWALHeader success, position: 4096, walHeader: WALHeader{magicCode=305419896, capacity=2147483648, trimOffset=0, lastWriteTimestamp=1694573682415, nextWriteOffset=1499236, slidingWindowMaxLength=1048576, shutdownType=UNGRACEFULLY, crc=1712639504} (kafka.log.s3.wal.BlockWALService)
[2023-09-13 10:54:42,417] INFO [CompactionManager id=1] 0 WAL objects as compact candidates (kafka.log.s3.compact.CompactionManager)
[2023-09-13 10:54:42,417] INFO [CompactionManager id=1] No compaction plans to execute (kafka.log.s3.compact.CompactionManager)
[2023-09-13 10:54:42,419] INFO [CompactionManager id=1] Build compact request complete, time cost: 5 ms, start committing objects (kafka.log.s3.compact.CompactionManager)
[2023-09-13 10:54:42,537] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:43,038] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:43,544] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:44,048] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:44,552] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:45,057] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:45,555] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:46,061] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:46,568] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:47,069] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:47,575] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:48,083] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:48,584] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:49,092] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:49,594] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:50,093] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:50,599] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:51,104] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:51,606] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:52,110] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)
[2023-09-13 10:54:52,415] INFO flushWALHeader success, position: 0, walHeader: WALHeader{magicCode=305419896, capacity=2147483648, trimOffset=0, lastWriteTimestamp=1694573692414, nextWriteOffset=1499236, slidingWindowMaxLength=1048576, shutdownType=UNGRACEFULLY, crc=648185841} (kafka.log.s3.wal.BlockWALService)
[2023-09-13 10:54:52,615] INFO [BrokerMetadataListener id=1] Not processing HandleCommitsEvent because the event queue is closed. (kafka.server.metadata.BrokerMetadataListener)

Stream compact log to much

[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1004 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1005 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1006 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1007 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1008 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1009 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1010 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1011 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1012 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1013 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1014 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1015 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1016 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1017 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1018 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1019 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1020 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1021 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1022 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1023 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1024 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1025 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1026 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1027 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1028 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1029 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1030 (kafka.log.s3.S3StreamClient)
[2023-09-15 02:47:49,442] INFO start to do stream objects compaction for stream 1031 (kafka.log.s3.S3StreamClient)

Graceful shutdown

  1. Await inflight request complete.
  2. Upload EBS WAL to S3.
  3. Free resources (thread, ...)

Memory Leak

[2023-09-15 16:34:54,012] ERROR LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	kafka.log.s3.StreamRecordBatchCodec.encode(StreamRecordBatchCodec.java:39)
	kafka.log.s3.model.StreamRecordBatch.encoded(StreamRecordBatch.java:46)
	kafka.log.s3.ObjectWriter$DataBlock.lambda$new$0(ObjectWriter.java:264)
	java.base/java.util.stream.ReferencePipeline$4$1.accept(ReferencePipeline.java:214)
	java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
	java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	java.base/java.util.stream.IntPipeline.reduce(IntPipeline.java:515)
	java.base/java.util.stream.IntPipeline.sum(IntPipeline.java:473)
	kafka.log.s3.ObjectWriter$DataBlock.<init>(ObjectWriter.java:264)
	kafka.log.s3.ObjectWriter$DefaultObjectWriter.lambda$write$0(ObjectWriter.java:101)
	java.base/java.lang.Iterable.forEach(Iterable.java:75)
	kafka.log.s3.ObjectWriter$DefaultObjectWriter.write(ObjectWriter.java:100)
	kafka.log.s3.WALObjectUploadTask.upload0(WALObjectUploadTask.java:117)
	kafka.log.s3.WALObjectUploadTask.lambda$upload$1(WALObjectUploadTask.java:88)
	kafka.log.es.FutureUtil.exec(FutureUtil.java:70)
	kafka.log.s3.WALObjectUploadTask.lambda$upload$2(WALObjectUploadTask.java:88)
	java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
	java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	java.base/java.lang.Thread.run(Thread.java:833) (io.netty.util.ResourceLeakDetector)

direct buffer allocate too much

java.lang.OutOfMemoryError: Cannot reserve 4194304 bytes of direct buffer memory (allocated: 1069831345, limit: 1073741824)
	at java.base/java.nio.Bits.reserveMemory(Bits.java:178)
	at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:121)
	at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:332)
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:649)
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:624)
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:203)
	at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:187)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
	at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:396)
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	at kafka.log.s3.StreamRecordBatchCodec.encode(StreamRecordBatchCodec.java:39)
	at kafka.log.s3.model.StreamRecordBatch.encoded(StreamRecordBatch.java:46)
	at kafka.log.s3.S3Storage.append(S3Storage.java:192)
	at kafka.log.s3.S3Stream.append0(S3Stream.java:110)
	at kafka.log.s3.S3Stream.lambda$append$0(S3Stream.java:101)
	at kafka.log.es.FutureUtil.exec(FutureUtil.java:58)
	at kafka.log.s3.S3Stream.append(S3Stream.java:101)
	at kafka.log.es.AlwaysSuccessClient$StreamImpl.append0(AlwaysSuccessClient.java:242)
	at kafka.log.es.AlwaysSuccessClient$StreamImpl.append(AlwaysSuccessClient.java:234)
	at kafka.log.es.LazyStream.append(LazyStream.java:99)
	at kafka.log.es.DefaultElasticStreamSlice.append(DefaultElasticStreamSlice.java:74)
	at kafka.log.es.ElasticLogFileRecords.append(ElasticLogFileRecords.java:154)
	at kafka.log.es.ElasticLogSegment.append(ElasticLogSegment.scala:100)
	at kafka.log.es.ElasticLog.append(ElasticLog.scala:174)
	at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:955)
	at kafka.log.LocalLog$.maybeHandleIOException(LocalLog.scala:811)
	at kafka.log.es.ElasticUnifiedLog.maybeHandleIOException(ElasticUnifiedLog.scala:57)
	at kafka.log.UnifiedLog.append(UnifiedLog.scala:827)
	at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:766)
	at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1283)
	at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1264)
	at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1008)
	at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
	at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
	at scala.collection.mutable.HashMap.map(HashMap.scala:35)
	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:996)
	at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:654)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$doAppendRecords$1(KafkaApis.scala:757)
	at kafka.server.KafkaApis$$anon$1.run(KafkaApis.scala:771)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

WAL object compact fail

[2023-09-18 16:18:46,866] ERROR [CompactionAnalyzer id=1] Error while analyzing compaction plan (kafka.log.s3.compact.CompactionAnalyzer)
java.lang.IllegalStateException: Duplicate key 0 (attempted merging values S3WALObject{objectId=0, orderId=0, brokerId=1, streamOffsetRanges={1=StreamOffsetRange(streamId=1, startOffset=0, endOffset=5), 33=StreamOffsetRange(streamId=33, startOffset=0, endOffset=2)}, dataTimeInMs=1695025067221} and S3WALObject{objectId=0, orderId=0, brokerId=1, streamOffsetRanges={0=StreamOffsetRange(streamId=0, startOffset=0, endOffset=719), 1=StreamOffsetRange(streamId=1, startOffset=0, endOffset=5), 33=StreamOffsetRange(streamId=33, startOffset=0, endOffset=2)}, dataTimeInMs=1695025067221})
	at java.base/java.util.stream.Collectors.duplicateKeyException(Collectors.java:135)
	at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:182)
	at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
	at kafka.log.s3.compact.CompactionUtils.blockWaitObjectIndices(CompactionUtils.java:70)
	at kafka.log.s3.compact.CompactionAnalyzer.buildCompactedObjects(CompactionAnalyzer.java:146)
	at kafka.log.s3.compact.CompactionAnalyzer.analyze(CompactionAnalyzer.java:63)
	at kafka.log.s3.compact.CompactionManager.buildCompactRequest(CompactionManager.java:218)
	at kafka.log.s3.compact.CompactionManager.compact(CompactionManager.java:114)
	at kafka.log.s3.compact.CompactionManager.lambda$start$2(CompactionManager.java:93)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

REPLAY_WAL_FAIL offset is not continuous

grep ERROR controller.log | head
[2023-09-15 03:02:32,800] ERROR [REPLAY_WAL_FAIL] stream 1037 offset is not continuous, expect 13837 real 14271 (org.apache.kafka.controller.stream.StreamControlManager)
[2023-09-15 03:02:32,800] ERROR [REPLAY_WAL_FAIL] stream 1035 offset is not continuous, expect 13819 real 14242 (org.apache.kafka.controller.stream.StreamControlManager)
[2023-09-15 03:02:32,800] ERROR [REPLAY_WAL_FAIL] stream 1038 offset is not continuous, expect 13815 real 14216 (org.apache.kafka.controller.stream.StreamControlManager)
[2023-09-15 03:02:32,800] ERROR [REPLAY_WAL_FAIL] stream 1046 offset is not continuous, expect 13918 real 14317 (org.apache.kafka.controller.stream.StreamControlManager)
[2023-09-15 03:02:37,296] ERROR [REPLAY_WAL_FAIL] stream 1037 offset is not continuous, expect 13837 real 14663 (org.apache.kafka.controller.stream.StreamControlManager

Use `spotless` to check the style of Scala code

Currently, we are only performing style checks on Scala files under the 'streams' directory:

// build.gradle
spotless {
  scala {
    target 'streams/**/*.scala'
    scalafmt("$versions.scalafmt").configFile('checkstyle/.scalafmt.conf').scalaMajorVersion(versions.baseScala)
    licenseHeaderFile 'checkstyle/java.header', 'package'
  }
}

Once the project stabilizes, we can perform similar checks on the Scala files we have modified under the 'core' directory.

GetObjects hang

"s3-storage-main" #96 prio=5 os_prio=0 cpu=995.77ms elapsed=472.23s tid=0x00007f278803a160 nid=0x292ef waiting on condition  [0x00007f277f2fe000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x00000007e654b450> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:211)
        at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1864)
        at java.util.concurrent.ForkJoinPool.unmanagedBlock([email protected]/ForkJoinPool.java:3465)
        at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3436)
        at java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1898)
        at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:2072)
        at kafka.log.s3.objects.ControllerObjectManager.getObjects(ControllerObjectManager.java:150)
        at kafka.log.s3.cache.DefaultS3BlockCache.read0(DefaultS3BlockCache.java:100)
        at kafka.log.s3.cache.DefaultS3BlockCache.read(DefaultS3BlockCache.java:68)
        at kafka.log.s3.S3Storage.read0(S3Storage.java:243)
        at kafka.log.s3.S3Storage.lambda$read$6(S3Storage.java:234)
        at kafka.log.s3.S3Storage$$Lambda$1186/0x00007f27f858d750.run(Unknown Source)
        at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
        at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run([email protected]/ScheduledThreadPoolExecutor.java:304)
        at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run([email protected]/Thread.java:833)

   Locked ownable synchronizers:
        - <0x0000000609510d78> (a java.util.concurrent.ThreadPoolExecutor$Worker)

TODO:

  • Fix getObjects hang
  • DefaultS3BlockCache isolate read from s3-storage-main thread

Failed to consume messages after stream split

Steps to Reproduce:

  • Set in server.properties:
s3.wal.object.size=524288000
s3.stream.object.split.size=16777216
  • Start the Kafka server
  • Send messages exceeding 500 MB
./bin/kafka-producer-perf-test.sh --print-metrics --topic quickstart-events --producer-props=bootstrap.servers=localhost:9092 --record-size 65536 --throughput -1 --num-records 100000
  • Try to consume them
./bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 --max-messages 100

However, nothing is consumed.

Possible Causes:
After increasing s3.stream.object.split.size to 1073741824(1G), every thing goes well.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.