Giter VIP home page Giter VIP logo

apache / incubator-uniffle Goto Github PK

View Code? Open in Web Editor NEW
359.0 22.0 134.0 10.56 MB

Uniffle is a high performance, general purpose Remote Shuffle Service.

Home Page: https://uniffle.apache.org/

License: Apache License 2.0

Python 0.05% Shell 1.67% Java 86.24% Dockerfile 0.14% Makefile 0.09% Go 5.28% Rust 5.61% JavaScript 0.32% HTML 0.03% Vue 0.57%
mapreduce shuffle spark remote-shuffle-service rss tez

incubator-uniffle's Issues

[Improvement] Disallow sendShuffleData if requireBufferId expired

We found shuffle server which under high load is easy encounter java.lang.OutOfMemoryError: Java heap space even we allocate more jvm heap memory and less rss.server.buffer.capacity

The steps for the exception above:

  1. When shuffle server under high load, requireBufferId is easy to expire, and Shuffle server release usedMemory
  2. Client sendShuffleData using a expired requireBufferId,
  3. Shuffle server recive shuffle data and store in rpc queue(this part of memory usage was not be added to usedMemory)
  4. Other clients requireBuffer success because usedMemory is enough

[Code Style] Indentation rules

There are 2 indentation styles in the current code base. Which are mostly the same except for line wrapping.
It would be better to have it unified early. How do you think?

4 spaces for line wrapping

public static void main(String[] args)
    throws Exception {
  for (int i = 0; i < args.length; i++) {
    System.out.println("Greeting the " + i +
        "-th argument");
    System.out.println("hello, " + args[i]);
  }
}

2 spaces for line wrapping

public static void main(String[] args)
  throws Exception {
  for (int i = 0; i < args.length; i++) {
    System.out.println("Greeting the " + i +
      "-th argument");
    System.out.println("hello, " + args[i]);
  }
}

[Performance Optimization] Improve the speed of writing index file in shuffle server

Motivation

When I test uniffle performance, i found a huge performance drop due to the low speed of writing index file. Flame graph attached:

reliao_img_1658917352873

Solution

Use the dataOutputStream = new DataOutputStream(new BufferedOutputStream(fileOutputStream)); in LocalFileWriter. Please refer to: https://stackoverflow.com/questions/38439410/java-dataoutputstream-writelong-byte-double-int-speed

Result

After apply this optimization:
reliao_img_1658917526994

[Bug] Blocks read inconsistent: expected xxx blocks, actual xxx blocks

  1. If we set spark.rss.data.replica.write=2 and spark.rss.data.replica=3,Data integrity cannot be guaranteed in any one shuffle server. right?
  2. But in method org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler#readShuffleData, it just read from one shuffle server

[Improvement] ShuffleBlock should be release when finished reading

We found spark executor is easy be killed by yarn, and i found it is because executor use too mush offheap memory when read shuffle data.
I found most of offheap memory is used to store uncompressed shuffle Data, and this part of memory will be release only when GC is triggered

[Improvement][Aqe] Avoid calling `getShuffleResult` multiple times

When we use AQE, we may call shuffleWriteClient.getShuffleResult multiple times. But if both partition 1 and partition 2 are on the server A, we call getShuffleResult(partition 1) to get data form server A, and then we call getShuffleResult(partition 2) to get data form server A, it's not necassray. We can get getShuffleResult(partition 1, partition 2) instead.

Introduce the reservedData to support more custom pluggable accessCheckers

Background

In current implementation, DelegationRssShuffleManager could decide whether to use uniffle or sort shuffle service by calling remote coordinator. The pluggable accessCheckers can be extended in coordinator to support more custom requirements.

Motivation

When we hope the uniffle can be applied to specified spark jobs to control which shuffle service to use, the mechanism of pluggable accessCheckers is useful.

We can implement the CustomDelegationRssShuffleManager(like DelegationRssShuffleManager)to inject some custom accessInfo and remote call the coordinator which use the custom access policy to decide which spark job use the uniffle shuffle service. However the current codebase dont support inject more accessinfo in client, only have tags and accessid. So this proposal hopes to introduce the data structure to store more custom extensible requirement.

Goals

  1. Support injecting more custom data in AccessClusterRequest, which can be set in client

Tasks

[Improvement] Support more tasks of the application

The current blockId is designed as following:

 // BlockId is long and composed by partitionId, executorId and AtomicInteger
 // AtomicInteger is first 19 bit, max value is 2^19 - 1
 // partitionId is next 24 bit, max value is 2^24 - 1
 // taskAttemptId is rest of 20 bit, max value is 2^20 - 1

Why we need blockId?
It's designed for data check, filter, memory data read, etc.

Why blockId is designed as above?
BlockId will be stored in Shuffle server, to reduce memory cost. Roaringbitmap is used to cache it.
According to implementation of Roaringbitmap, the design of BlockId is target to use BitmapContainer instead of ArrayContainer for memory saving.

What's the problem of blockId?
It can't support taskId which is greater than 2^20 - 1

Proposal
I think the first 19 bit is too much for atomic int, and we can leverage some of them for taskId.

[Feature] Support short-circuit read in local-file

Motivation

When uniffle shuffle servers are co-located with Yarn nodemanagers, we could use the short-circuit read to improve performance and reduce the overhead.

How to do

There are two options to solve this

  1. Directly read shuffle-data files by client side.
  2. Use the domain socket.

Option 1 - Directly read

This is the fastest way, but the local-files' read permission should be open for client side. This maybe have security problem.

Option2 - Domain socket

This way could be avoid security problem, but will be slower than above. And its implementation will be complex.

Conclusion

In my opinion, above two ways could be all supported in uniffle, which could be as different policies for users to choose.

[Improvement] Support Empty assignment to Shuffle Server

When I tested hudi, I got an error.
this is spark driver log, ERROR: Empty assignment to Shuffle Server

52278 [dag-scheduler-event-loop] INFO  org.apache.spark.shuffle.RssShuffleManager  - Generate application id used in rss: spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844
52281 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl  - Empty assignment to Shuffle Server
52282 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl  - Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
52283 [dag-scheduler-event-loop] WARN  org.apache.spark.scheduler.DAGScheduler  - Creating new stage failed due to exception - job: 5
com.tencent.rss.common.exception.RssException: Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
        at com.tencent.rss.client.impl.ShuffleWriteClientImpl.throwExceptionIfNecessary(ShuffleWriteClientImpl.java:440)
        at com.tencent.rss.client.impl.ShuffleWriteClientImpl.getShuffleAssignments(ShuffleWriteClientImpl.java:291)
        at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.java:247)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:97)
        at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
        at org.apache.spark.rdd.RDD.$anonfun$dependencies$2(RDD.scala:264)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.dependencies(RDD.scala:260)
        at org.apache.spark.scheduler.DAGScheduler.getShuffleDependenciesAndResourceProfiles(DAGScheduler.scala:634)
        at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:597)
        at org.apache.spark.scheduler.DAGScheduler.getOrCreateShuffleMapStage(DAGScheduler.scala:394)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateParentStages$1(DAGScheduler.scala:580)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
        at scala.collection.SetLike.map(SetLike.scala:104)
        at scala.collection.SetLike.map$(SetLike.scala:104)
        at scala.collection.mutable.AbstractSet.map(Set.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:579)
        at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:564)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1115)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2396)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2388)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2377)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
52287 [main] INFO  org.apache.spark.scheduler.DAGScheduler  - Job 5 failed: countByKey at BaseSparkCommitActionExecutor.java:191, took 0.076660 s

this is coordinator log , request partitionNum is 0

[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
full log
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:26,946 Grpc-267 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:27,033 Grpc-270 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:27,033 Grpc-270 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:27,034 Grpc-270 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:37,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:29:43,047 Grpc-283 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:43,048 Grpc-283 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:43,048 Grpc-283 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,165 Grpc-293 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,166 Grpc-293 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,166 Grpc-293 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,247 Grpc-298 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,267 Grpc-297 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,267 Grpc-297 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,268 Grpc-297 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,335 Grpc-301 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Remove expired application:spark-d7f3e51ca713472e88568db90c91bdea1660187027133

Environment:

uniffle: firestorm 0.4.1
spark: 3.1.2
hudi: 0.10.0
k8s: v1.21.3

Remove experimental feature with ShuffleUploader

ShuffleUploader is an experimental feature which is target to merge and upload data from local disk to remote storage if local disk hasn't enough space.
Currently, MEMORY_LOCAL_HDFS is introduced as the common solution for shuffle data storage.
ShuffleUploader is not needed any more, and we also can avoid maintenance this in the future.

[Improvement][AQE] Sort MapId before the data are flushed

When we use aqe, we need use mapId to filter the data which we don't need, If we sort MapId before the data are flushed. We split the data to segments, if a segment don't have the data which we want to read, we will drop the data. If data is sorted by mapId, we can filter more data and mprove our performance.

Flaky test HealthCheckCoordinatorGrpcTest on Apple Silicon

Run mvn clean install on Apple Silicon, HealthCheckCoordinatorGrpcTest failed as follows:

[INFO] Running org.apache.uniffle.test.HealthCheckCoordinatorGrpcTest
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 71.009 s <<< FAILURE! - in org.apache.uniffle.test.HealthCheckCoordinatorGrpcTest
[ERROR] healthCheckTest  Time elapsed: 70.285 s  <<< FAILURE!
org.opentest4j.AssertionFailedError
	at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:35)
	at org.junit.jupiter.api.Assertions.fail(Assertions.java:115)
	at org.apache.uniffle.test.HealthCheckCoordinatorGrpcTest.healthCheckTest(HealthCheckCoordinatorGrpcTest.java:143)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

Environment:

os: Big Sur 11.4
cpu: Apple M1
java: zulu 1.8.0_322
maven: 3.8.6

Flaky test RssShuffleUtilsTest

run mvn clean install

[INFO] Running org.apache.uniffle.common.RssShuffleUtilsTest
[ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.356 s <<< FAILURE! - in org.apache.uniffle.common.RssShuffleUtilsTest
[ERROR] testDestroyDirectByteBuffer  Time elapsed: 0.222 s  <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
	at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
	at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
	at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35)
	at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)
	at org.apache.uniffle.common.RssShuffleUtilsTest.testDestroyDirectByteBuffer(RssShuffleUtilsTest.java:72)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

Environment:

os: Big Sur 11.4
cpu: Apple M1
java: zulu 1.8.0_322
maven: 3.8.6

[Umbrella] Netty replace Grpc on data transfer

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the proposal

When we use Grpc, we find that our bottleneck is on Grpc, Grpc brings the cost of data copy and data serialization. We must encounter GC problems when we use Grpc. We should use Netty replace Grpc on data transfer and use off heap memory to reduce GC time.

Task list

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

[Feature Request] Deploy Uniffle on Kubernetes

It's good to have a Kubernetes operator to help us deploy Uniffle on Kubernetes. We can automate our system operations. This issue track the pull requests about Kubernetes operator.

Support lower Hadoop versions in client-mr

Currently, uniffle use the default Hadoop version of 2.8.5.

When using the ./build_distribution.sh --spark2-profile 'spark2' --spark3-mvn '-Dspark.version=2.4.3' --spark3-profile 'spark3' --spark3-mvn '-Dspark.version=3.1.1' -Dhadoop.version=2.6.0, it will throw exceptions due to some methods and vars not supported in Hadoop 2.6.0.

Some non-compatible params and methods as follows

  1. CallContext, introduced by >= 2.8.0.
  2. MRJobConfig.DEFAULT_SHUFFLE_MERGE_PERCENT introduced by 2.8.0. ticket link
  3. MRApps.getSystemPropertiesToLog introduced by 2.8.0 ticket link

I think we could use the reflection to be compatible with lower hadoop version.

[Feature Request] Introduce the unique trace id to debug easily

Motivation

It's hard to analyze which process cost the most time of one remote request in current codebase, due to lacking corresponding trace id in client and server side.

Plan

Maybe we could introduce the unique trace id which is generated on time/client-machine-id in one remote request, and then record it in client's log. And when requesting remote server, we should populate this to server and make it recorded in server's log.

[Performance Optimization] Multiple channels when getting shuffle data in client side

Motivation

Now the executor only will use the single TCP connection with the specified shuffle server, so when multiple tasks are running concurrently, it will share this channel. Maybe it will reduce the whole throughput.

Do we have any plan to introduce extra config to allow user to create more channels in client side?

Maybe we should do some performance test to prove this improvement effective. The update will be included in this ticket.

Improve our test code style

Now, there are many style errors in our test code. Because our checkstyle plugin don't check test code. To enable the plugin, we should fix the style errors of the code first. Tencent/Firestorm#155 pr tells us how enable plugin to check test code style. We should enable the plugin to check test code style in one branch, fix the style errors module by module and
cherry-pick the fix code in another branch, and raise a fix pr.

This issue will track the pull requests.
Related prs:
#99 fix common module
#122 fix coordinator module
#131 fix storage module

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.