apache / incubator-uniffle Goto Github PK
View Code? Open in Web Editor NEWUniffle is a high performance, general purpose Remote Shuffle Service.
Home Page: https://uniffle.apache.org/
License: Apache License 2.0
Uniffle is a high performance, general purpose Remote Shuffle Service.
Home Page: https://uniffle.apache.org/
License: Apache License 2.0
We found shuffle server which under high load is easy encounter java.lang.OutOfMemoryError: Java heap space
even we allocate more jvm heap memory and less rss.server.buffer.capacity
The steps for the exception above:
requireBufferId
is easy to expire, and Shuffle server release usedMemory
sendShuffleData
using a expired requireBufferId
,usedMemory
)requireBuffer
success because usedMemory
is enoughIt's better to run TPCDS to verify the correctness of our system. It will help us find problems more quickly.
We can implement some rest api for the shuffle server and coordinator to get server's status and control server's behavior, and also provide api for the web ui.
There are 2 indentation styles in the current code base. Which are mostly the same except for line wrapping.
It would be better to have it unified early. How do you think?
public static void main(String[] args)
throws Exception {
for (int i = 0; i < args.length; i++) {
System.out.println("Greeting the " + i +
"-th argument");
System.out.println("hello, " + args[i]);
}
}
public static void main(String[] args)
throws Exception {
for (int i = 0; i < args.length; i++) {
System.out.println("Greeting the " + i +
"-th argument");
System.out.println("hello, " + args[i]);
}
}
We found the value of grpc_open
sometime very big(>1000) even no application run in our cluster
When I test uniffle performance, i found a huge performance drop due to the low speed of writing index file. Flame graph attached:
Use the dataOutputStream = new DataOutputStream(new BufferedOutputStream(fileOutputStream));
in LocalFileWriter
. Please refer to: https://stackoverflow.com/questions/38439410/java-dataoutputstream-writelong-byte-double-int-speed
spark.rss.data.replica.write=2
and spark.rss.data.replica=3
,Data integrity cannot be guaranteed in any one shuffle server. right?org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler#readShuffleData
, it just read from one shuffle serverAt present, there is only disk i/o in the health detection of the machine. Whether to add the index of memory utilization? What do u think of that? @jerqi
We found spark executor is easy be killed by yarn, and i found it is because executor use too mush offheap memory when read shuffle data.
I found most of offheap memory is used to store uncompressed shuffle Data, and this part of memory will be release only when GC is triggered
If oom encountered in shuffle server, rpc client will blocked until shuffle server was killed
Should shuffleBufferManager supports triggering flush according to the size of single ShuffleBuffer? so that we can make more full use of disk io without waiting for the buffer to reach rss.server.buffer.capacity * rss.server.memory.shuffle.highWaterMark.percentage
Ignore failure on initializing local storage when setting multiple local storage paths in shuffle server client
Could the community provide the official benchmark with ESS and uniffle?
When we use AQE, we may call shuffleWriteClient.getShuffleResult
multiple times. But if both partition 1 and partition 2 are on the server A, we call getShuffleResult(partition 1) to get data form server A, and then we call getShuffleResult(partition 2) to get data form server A, it's not necassray. We can get getShuffleResult(partition 1, partition 2) instead.
Now uniffle's delegation shuffle manager support fallback to sort shuffle manager in app level. We could implement it in shuffle level.
In current implementation, DelegationRssShuffleManager could decide whether to use uniffle or sort shuffle service by calling remote coordinator. The pluggable accessCheckers can be extended in coordinator to support more custom requirements.
When we hope the uniffle can be applied to specified spark jobs to control which shuffle service to use, the mechanism of pluggable accessCheckers is useful.
We can implement the CustomDelegationRssShuffleManager(like DelegationRssShuffleManager
)to inject some custom accessInfo and remote call the coordinator which use the custom access policy to decide which spark job use the uniffle shuffle service. However the current codebase dont support inject more accessinfo in client, only have tags
and accessid
. So this proposal hopes to introduce the data structure to store more custom extensible requirement.
The current blockId is designed as following:
// BlockId is long and composed by partitionId, executorId and AtomicInteger
// AtomicInteger is first 19 bit, max value is 2^19 - 1
// partitionId is next 24 bit, max value is 2^24 - 1
// taskAttemptId is rest of 20 bit, max value is 2^20 - 1
Why we need blockId?
It's designed for data check, filter, memory data read, etc.
Why blockId is designed as above?
BlockId will be stored in Shuffle server, to reduce memory cost. Roaringbitmap is used to cache it.
According to implementation of Roaringbitmap, the design of BlockId is target to use BitmapContainer
instead of ArrayContainer
for memory saving.
What's the problem of blockId?
It can't support taskId which is greater than 2^20 - 1
Proposal
I think the first 19 bit is too much for atomic int, and we can leverage some of them for taskId.
The push mode will be more convenient and more real-time. We have implemented it. If necessary, I will create a PR
When uniffle shuffle servers are co-located with Yarn nodemanagers, we could use the short-circuit read to improve performance and reduce the overhead.
There are two options to solve this
This is the fastest way, but the local-files' read permission should be open for client side. This maybe have security problem.
This way could be avoid security problem, but will be slower than above. And its implementation will be complex.
In my opinion, above two ways could be all supported in uniffle, which could be as different policies for users to choose.
The current load balancing strategy only considers the available memory, whether to add another strategy based on disk performance.
When I tested hudi, I got an error.
this is spark driver log, ERROR: Empty assignment to Shuffle Server
52278 [dag-scheduler-event-loop] INFO org.apache.spark.shuffle.RssShuffleManager - Generate application id used in rss: spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844
52281 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl - Empty assignment to Shuffle Server
52282 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl - Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
52283 [dag-scheduler-event-loop] WARN org.apache.spark.scheduler.DAGScheduler - Creating new stage failed due to exception - job: 5
com.tencent.rss.common.exception.RssException: Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
at com.tencent.rss.client.impl.ShuffleWriteClientImpl.throwExceptionIfNecessary(ShuffleWriteClientImpl.java:440)
at com.tencent.rss.client.impl.ShuffleWriteClientImpl.getShuffleAssignments(ShuffleWriteClientImpl.java:291)
at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.java:247)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:97)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
at org.apache.spark.rdd.RDD.$anonfun$dependencies$2(RDD.scala:264)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:260)
at org.apache.spark.scheduler.DAGScheduler.getShuffleDependenciesAndResourceProfiles(DAGScheduler.scala:634)
at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:597)
at org.apache.spark.scheduler.DAGScheduler.getOrCreateShuffleMapStage(DAGScheduler.scala:394)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateParentStages$1(DAGScheduler.scala:580)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
at scala.collection.SetLike.map(SetLike.scala:104)
at scala.collection.SetLike.map$(SetLike.scala:104)
at scala.collection.mutable.AbstractSet.map(Set.scala:48)
at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:579)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:564)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1115)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2396)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2388)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2377)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
52287 [main] INFO org.apache.spark.scheduler.DAGScheduler - Job 5 failed: countByKey at BaseSparkCommitActionExecutor.java:191, took 0.076660 s
this is coordinator log , request partitionNum is 0
[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
full log
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:26,946 Grpc-267 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:27,033 Grpc-270 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:27,033 Grpc-270 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:27,034 Grpc-270 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:37,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:29:43,047 Grpc-283 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:43,048 Grpc-283 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:43,048 Grpc-283 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,165 Grpc-293 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,166 Grpc-293 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,166 Grpc-293 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,247 Grpc-298 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,267 Grpc-297 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,267 Grpc-297 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,268 Grpc-297 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,335 Grpc-301 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Remove expired application:spark-d7f3e51ca713472e88568db90c91bdea1660187027133
Environment:
uniffle: firestorm 0.4.1
spark: 3.1.2
hudi: 0.10.0
k8s: v1.21.3
ShuffleUploader is an experimental feature which is target to merge and upload data from local disk to remote storage if local disk hasn't enough space.
Currently, MEMORY_LOCAL_HDFS
is introduced as the common solution for shuffle data storage.
ShuffleUploader is not needed any more, and we also can avoid maintenance this in the future.
We have 1.2T memory per host, so we need deploy multiple shuffle servers in a single node.
But current partition assignment policy is not suitable to do that.
Because partition maybe assign to a same host and client maybe fail if one host down
Now the assignment number specified by coordinator's conf of rss.coordinator.shuffle.nodes.max
. But i think it's not suitable for all spark jobs.
We should introduce new config to let client specify the assignment server number. rss.coordinator.shuffle.nodes.max
should be as a max limitation of clients' number.
When we use aqe, we need use mapId to filter the data which we don't need, If we sort MapId before the data are flushed. We split the data to segments, if a segment don't have the data which we want to read, we will drop the data. If data is sorted by mapId, we can filter more data and mprove our performance.
We have multi disk in a host. If shuffle server exit abnormally, there will be many files need to be clear when shuffle server start again and this operation will cost a lot of time
kill process is not graceful, so we need shuffle server support decommissioned
"master" may have some unfriendly meanings, as an alternative, many new projects have leveraged "main".
When we turn on HealthCheck, shall we add a new metric for unhealthy node. WDYT? @jerqi
Run mvn clean install
on Apple Silicon, HealthCheckCoordinatorGrpcTest
failed as follows:
[INFO] Running org.apache.uniffle.test.HealthCheckCoordinatorGrpcTest
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 71.009 s <<< FAILURE! - in org.apache.uniffle.test.HealthCheckCoordinatorGrpcTest
[ERROR] healthCheckTest Time elapsed: 70.285 s <<< FAILURE!
org.opentest4j.AssertionFailedError
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:35)
at org.junit.jupiter.api.Assertions.fail(Assertions.java:115)
at org.apache.uniffle.test.HealthCheckCoordinatorGrpcTest.healthCheckTest(HealthCheckCoordinatorGrpcTest.java:143)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
Environment:
os: Big Sur 11.4
cpu: Apple M1
java: zulu 1.8.0_322
maven: 3.8.6
In org.apache.uniffle.server.ShuffleFlushManager#processPendingEvents
,OOM will happen if a large number of events need to be dropped, because usedMemory
release immediately, but the speed of GC is not fast enough.
run mvn clean install
[INFO] Running org.apache.uniffle.common.RssShuffleUtilsTest
[ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.356 s <<< FAILURE! - in org.apache.uniffle.common.RssShuffleUtilsTest
[ERROR] testDestroyDirectByteBuffer Time elapsed: 0.222 s <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)
at org.apache.uniffle.common.RssShuffleUtilsTest.testDestroyDirectByteBuffer(RssShuffleUtilsTest.java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
Environment:
os: Big Sur 11.4
cpu: Apple M1
java: zulu 1.8.0_322
maven: 3.8.6
When DelegationRssShuffleManager
send an AccessClusterRequest
to the Coordinator, should we increase the logic of overtime resending?
When we use Grpc, we find that our bottleneck is on Grpc, Grpc brings the cost of data copy and data serialization. We must encounter GC problems when we use Grpc. We should use Netty replace Grpc on data transfer and use off heap memory to reduce GC time.
It's good to have a Kubernetes operator to help us deploy Uniffle on Kubernetes. We can automate our system operations. This issue track the pull requests about Kubernetes operator.
If registerShuffleServers failed, task will fail and then application will failed
Process don't exit if exec start script using ansible. Therefore, we can't do batch start operation for this.
Currently, uniffle use the default Hadoop version of 2.8.5.
When using the ./build_distribution.sh --spark2-profile 'spark2' --spark3-mvn '-Dspark.version=2.4.3' --spark3-profile 'spark3' --spark3-mvn '-Dspark.version=3.1.1' -Dhadoop.version=2.6.0
, it will throw exceptions due to some methods and vars not supported in Hadoop 2.6.0.
Some non-compatible params and methods as follows
CallContext
, introduced by >= 2.8.0.MRJobConfig.DEFAULT_SHUFFLE_MERGE_PERCENT
introduced by 2.8.0. ticket linkMRApps.getSystemPropertiesToLog
introduced by 2.8.0 ticket linkI think we could use the reflection to be compatible with lower hadoop version.
The retry mechanism is introduced by #127. But in some access checker, there's no need to retry, like candidates checker. But the health checker maybe need.
So I think we need introduce the NON_TRANSIENT_ACCESS_DENIED
to avoid retry in some checker to reduce time.
It's hard to analyze which process cost the most time of one remote request in current codebase, due to lacking corresponding trace id in client and server side.
Maybe we could introduce the unique trace id which is generated on time/client-machine-id in one remote request, and then record it in client's log. And when requesting remote server, we should populate this to server and make it recorded in server's log.
Every commit calls must success when sendCommit
now, this will casue if one shuffle server dead, then application fail
In RssShuffleManager
,the workQueue
of threadPoolExecutor
is unbounded now. If sendShuffleData
not fast enough, it will cost a lot of memory
At present, Spark does not support AE in version 2.4, should it be supported? Or are there any reasons why it is not supported?
Now the executor only will use the single TCP connection with the specified shuffle server, so when multiple tasks are running concurrently, it will share this channel. Maybe it will reduce the whole throughput.
Do we have any plan to introduce extra config to allow user to create more channels in client side?
Maybe we should do some performance test to prove this improvement effective. The update will be included in this ticket.
Now, there are many style errors in our test code. Because our checkstyle plugin don't check test code. To enable the plugin, we should fix the style errors of the code first. Tencent/Firestorm#155 pr tells us how enable plugin to check test code style. We should enable the plugin to check test code style in one branch, fix the style errors module by module and
cherry-pick the fix code in another branch, and raise a fix pr.
This issue will track the pull requests.
Related prs:
#99 fix common module
#122 fix coordinator module
#131 fix storage module
We use Spark local mode to write integration test, but we can't cover some cases, such as Tencent/Firestorm#187, so we should add a integration test case for Spark with MiniYarnCluster
Waiting time for executing stop script is too long even if the JVM exits early.
When shuffle servers needed to decommission are included in exclude-node-files, we hope shuffle server will exit by its own when all apps finished.
Draft PR: #85
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.