memverge / splash Goto Github PK
View Code? Open in Web Editor NEWSplash, a flexible Spark shuffle manager that supports user-defined storage backends for shuffle data storage and exchange
License: Apache License 2.0
Splash, a flexible Spark shuffle manager that supports user-defined storage backends for shuffle data storage and exchange
License: Apache License 2.0
The output of the shuffle performance tool is not precise enough. For example, 1.23GB would only be displayed as 1GB. The output should keep at least two digits for the purpose of tuning and performance comparison.
Print the package version information in the log to make it easier for the operator to track the jar version.
Investigate a solution to improve the performance of merge sort in the external sorter.
A NoSuchElementException
is raised when the Splash shuffle manager tries to stop itself when the application hasn't started successfully.
We have discovered some places where the IO buffer is not efficiently used in the unsafe
code path.
There are also some places in which we could reduce the number of memory copies. Here are the details:
SplashUnsafeSorter.writeSortedFile
, we used a writerBuffer
to hold the serialized data. And then write the content in this buffer to the output stream. To avoid the second copy during the write, we create our own SplashBufferedOutputStream
which exposes the internal buffer so that it could be used by the serializer to fill the serialized data directly. By doing this, we could also save the memory used by the original writerBuffer
. It could also improve the testability of the buffer mechanism. Unit tests are added for SplashBufferedOutputStream
to make sure we manage the buffer correctly.IOUtils.copy
with SplashUtils.copy
. This function borrows most of the code from IOUtils.copy
. The only difference is that it allows the user to specify the size of the buffer. In previous tests, we identified some 4K IO requests. Those IO requests are issued by IOUtils.copy
. Because this function uses a fixed 4K IO buffer. This is not efficient nor elastic in a shared file system or distributed file system. This buffer now shares the same Spark configuration spark.shuffle.file.buffer
. What's more, since we already have this IO buffer. We could use InputStream
and OutputStream
directly instead of the buffered version. This helps us to save more memory. Since the copy procedure is executed in the same thread, we could safely reuse the same buffer during the copy. It helps us reduce the GC time.Consider this scenario, a previous task failed and didn't clean up the temp file, the re-run task somehow create the temp file at the same location. This operation will fail due to the FileAlreadyExistsException
. We need to delete and retry creation in this scenario.
We use OutputStream.available()
to retrieve the partition size. In the implementation of LimitedInputStream.available
, it always checks the available size of the wrapped stream. This behavior causes extra calls to the getSize
function.
Hello,
I have built and set up splash and spark according to the documentation. But when I try to run any spark application I am getting the following error message:
java.lang.ClassNotFoundException: org.apache.spark.shuffle.SplashShuffleManager
This is my spark-default.con file:
spark.files jars/splash-0.7.0.jar,jars/splash-0.7.0-sources.jar,jars/splash-0.7.0-shaded.jar,jars/splash-0.7.0-javadoc.jar
spark.driver.extraClassPath ./splash-0.7.0.jar:./splash-0.7.0-sources.jar:./splash-0.7.0-shaded.jar:./splash-0.7.0-javadoc.jar
spark.executor.extraClassPath jars/splash-0.7.0.jar,jars/splash-0.7.0-sources.jar,jars/splash-0.7.0-shaded.jar,jars/splash-0.7.0-javadoc.jar
#spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager
spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager
spark.shuffle.splash.storageFactory com.memverge.splash.shared.SharedFSFactory
spark.shuffle.splash.clearShuffleOutput false
spark.shuffle.splash.folder /home/ruroy/spark/cache
If you could provide any hint to solve this issue that will be very helpful.
Add a deploy script to compile and deploy the Splash artifact for different combinations of the Spark and Hadoop versions.
Splash has a dependency on Spark core. The Spark core has a dependency on Hadoop. We need to compile different binaries for different combination of the versions. Add a script would help us to manage those artifacts better.
When we read data from underlying storage, we met stream corruption issue, so we turned log level to DEBUG to let splash compute MD5 for each partition during read phase.
However, we don't have corresponding MD5 info during write phase to check.
Create a shuffle performance tool which allows the user to quickly check the performance of the storage factory without starting a Spark application.
Allow the plugin developer to implement a fast merge which will be invoked when:
spark.shuffle.unsafe.fastMergeEnabled
option is true.Note the performance of this fast merge function could seriously impact the performance of Spark SQL joins with multiple spills.
Test data shows that when we have 40K partitions (200 mappers & 200 reducers) and small partition size (around 256K), the shuffle manager took almost half the total time retrieving the index file.
Reading 40000 partitions with 8 threads 99% (39991/40000)
Read shuffle data completed in 19049 milliseconds
Reading index file: 10187 ms
storage factory: com.memverge.splash.shared.SharedFSFactory
number of mappers: 200
number of reducers: 200
total shuffle size: 3GB
bytes written: 3GB
bytes read: 3GB
number of blocks: 64
blocks size: 256KB
partition size: 81KB
concurrent tasks: 8
bandwidth: 167MB/s
Due to the fact that comparing to the shuffle data, shuffle indices are relatively small, we could try the best to cache them in memory to eliminate this overhead.
For example, in the configuration described above, each node will only cache:
8 * 201 * 200 = 320K
8 is the length of long. 201 is the number of partition offsets. 200 is the number of the index files.
When memory is not enough, we should fallback to retrieve the data from the file system.
Unit tests report a lot of warning logs such as:
o.a.s.m.ExecutionMemoryPool : Internal error: release called on 12800 bytes but task only has 0 bytes of memory from the on-heap execution pool
These warning logs need to be investigated and eliminated.
For some plugin, the output file is not created when there is no actual write.
We need to simulate the scenario and make sure the default implementation of the merge function in TmpShuffleFile
could deal with it correctly. Zero should be returned to create a correct index file.
Before build I've changed spark version in pom.xml:
Spark-submit done in local[*] mode
<spark.version>2.4.0</spark.version>
java.lang.IllegalAccessError: tried to access class org.apache.spark.shuffle.sort.ShuffleInMemorySorter from class org.apache.spark.shuffle.sort.SplashUnsafeSorter
at org.apache.spark.shuffle.sort.SplashUnsafeSorter.(SplashUnsafeSorter.scala:77)
at org.apache.spark.shuffle.sort.SplashUnsafeShuffleWriter.(SplashUnsafeShuffleWriter.scala:56)
at org.apache.spark.shuffle.SplashShuffleManager.getWriter(SplashShuffleManager.scala:84)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:98)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
when the first time Application failed due to dmo run out of space, dmo will remove the application folder to prevent left over cause data corruption.
2018-12-16 10:56:11,981 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - cleanup shuffle folder /application_1544982954867_0001/shuffle for application_1544982954867_0001
2018-12-16 10:56:11,982 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:285) - Delete /application_1544982954867_0001/shuffle
2018-12-16 10:56:18,034 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:280) - unlink success for '/application_1544982954867_0001/shuffle'.
2018-12-16 10:56:18,036 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:280) - unlink success for '/application_1544982954867_0001/shuffle'.
2018-12-16 10:56:18,047 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - MemoryStore cleared
2018-12-16 10:56:18,048 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - MemoryStore cleared
2018-12-16 10:56:18,048 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - clear DataStore by remove /application_1544982954867_0001.
2018-12-16 10:56:18,048 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - clear DataStore by remove /application_1544982954867_0001.
2018-12-16 10:56:18,052 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:285) - Delete /application_1544982954867_0001
2018-12-16 10:56:18,052 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:285) - Delete /application_1544982954867_0001
2018-12-16 10:56:20,138 [shuffle-server-5-2] DEBUG (Slf4JLogger.java:81) - Freed 5 thread-local buffer(s) from thread: shuffle-server-5-2
2018-12-16 10:56:20,240 [shuffle-server-5-1] DEBUG (Slf4JLogger.java:81) - Freed 5 thread-local buffer(s) from thread: shuffle-server-5-1
2018-12-16 10:56:20,240 [shuffle-server-5-2] DEBUG (Slf4JLogger.java:81) - Freed 7 thread-local buffer(s) from thread: shuffle-server-5-2
2018-12-16 10:56:20,599 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:280) - unlink success for '/application_1544982954867_0001'.
2018-12-16 10:56:20,599 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:280) - unlink success for '/application_1544982954867_0001'.
Later on, DMO crashed due to other node still try to access the data which already removed
2018-12-16 10:56:44,538 [Executor task launch worker for task 7] DEBUG (DMOFile.java:224) - get attribute with path for /application_1544982954867_0001/shuffle/shuffle_0_7_0.index
2018-12-16 10:56:44,540 [Executor task launch worker for task 7] ERROR (DMOFile.java:227) - failed to get size of '/application_1544982954867_0001/shuffle/shuffle_0_7_0.index'.
com.memverge.mvfs.error.MVFSException: get attributes failed, dmo id: "/application_1544982954867_0001/shuffle/shuffle_0_7_0.index", rc: -2052.
at com.memverge.mvfs.dmo.DMOFile.checkGetAttrResult(DMOFile.java:142)
at com.memverge.mvfs.dmo.DMOFile.attribute(DMOFile.java:149)
at com.memverge.mvfs.dmo.DMOFile.getSize(DMOFile.java:225)
at com.memverge.splash.dmo.DMOShuffleFile.getSize(DMOShuffleFile.java:30)
at org.apache.spark.shuffle.SplashShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$3.apply(SplashShuffleBlockResolver.scala:177)
at org.apache.spark.shuffle.SplashShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$3.apply(SplashShuffleBlockResolver.scala:176)
at org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)
Hi,
I read the design doc https://docs.google.com/document/d/1kSpbBB-sDk41LeORm3-Hfr-up98Ozm5wskvB49tUhSs/edit?usp=sharing of splash. It's a great work which let spark use remote FS to store shuffle file with plugins. I only have one question that how the splash delete the shuffle files on the remote fs when spark job is killed or crashed. In the originally spark on yarn, Nodemanager will delete working datas of an app when the app finish, but i don't know who will do this work when running spark with splash.
In cloud environments, it is a common requirement to be able to persist shuffle data outside of the node on which a Spark task is running. Since many workloads run on top of file systems which implement HDFS semantics (FileContext
and FileSystem
specifically), a storage plugin for these systems will be used to provide within the code base. This will also allow users of Spark 2.4 releases to use external shuffle storage which is HDFS compliant.
Dump the current partition to a temp local folder to allow the developer to diagnose the problem when a shuffle read error happens. This would help the developer to diagnose problems like data corruption.
During ShuffleListener::onApplicationStart
, Spark conf instance is not available. This may cause issues for some listener implementations.
Compiling Splash for Spark 2.4.0 failed with the error:
[ERROR] /git/splash/src/main/scala/org/apache/spark/shuffle/SplashShuffleReader.scala:50: type mismatch;
found : Iterator[(org.apache.spark.storage.BlockId, Long)]
required: Seq[(org.apache.spark.storage.BlockId, Long)]
[ERROR] readShuffleBlocks(shuffleBlocks)
[ERROR] ^
[ERROR] one error found
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
MapOutputTracker.getMapSizesByExecutorId() in Spark 2.4.0 has changed its return value type from Seq to Iterator. The Splash code needs to updated accordingly as well.
ShuffleFile.getSize
is a meta-data call of the file system. It could take several microseconds or even milliseconds in a shared file system. This would cause some overhead to the shuffle procedure.
When possible, we should use the recorded committed size or written bytes that are available in the memory instead of calling ShuffleFile.getSize
to retrieve this information.
The code of shuffle reader is not well structured and lack the capability of inserting tests for those classes. The separation of responsibility is not clear. Re-structure the code to meet the following requirements:
SplashShuffleFetchIterator
and make it a case class
.SplashShuffleFetcher
class which is responsible for:
SplashShuffleReader
to separate functions
getAggregatedIterator
adds the combiner logic into the iteratorgetSortedIterator
adds the sorter logic into the iterator.Can not pass the build with:
mvn clean package -DskipTests=true -Dspark.version=2.1.0 -Dhadoop.version=2.6.0
I am testing Splash with Spark version 2.4.6 and Hadoop 2.8.5 for a use case. When I try to run tests in SplashUnsafeSorterTest
, all of them fail with error message Spark tried to parse '' as a Spark version string, but it could not find the major/minor/maintenance version numbers.
This issue doesn't happen with earlier versions of Spark. I have tested 2.4.0 and 2.4.1. The detailed stack trace in the output of mvn test -Dspark.version=2.4.6
is below.
java.lang.ExceptionInInitializerError
at org.apache.spark.SparkContext$$anonfun$3.apply(SparkContext.scala:183)
at org.apache.spark.SparkContext$$anonfun$3.apply(SparkContext.scala:183)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at org.apache.spark.SparkContext.logInfo(SparkContext.scala:73)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:183)
at org.apache.spark.shuffle.TestUtil$.newSparkContext(TestUtil.scala:149)
at org.apache.spark.shuffle.sort.SplashUnsafeSorterTest.newSparkContextWithForceSpillSize(SplashUnsafeSorterTest.scala:47)
at org.apache.spark.shuffle.sort.SplashUnsafeSorterTest.testSpillLocalSortByKey(SplashUnsafeSorterTest.scala:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.privateRun(TestRunner.java:648)
at org.testng.TestRunner.run(TestRunner.java:505)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
at org.testng.SuiteRunner.run(SuiteRunner.java:364)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
at org.testng.TestNG.runSuites(TestNG.java:1049)
at org.testng.TestNG.run(TestNG.java:1017)
at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
Caused by: java.lang.IllegalArgumentException: Spark tried to parse '' as a Spark version string, but it could not find the major/minor/maintenance version numbers.
at org.apache.spark.util.VersionUtils$.shortVersion(VersionUtils.scala:48)
at org.apache.spark.package$.<init>(package.scala:94)
at org.apache.spark.package$.<clinit>(package.scala)
... 32 more
2020-09-03 15:19:03.664 WARN [main] o.a.s.SparkContext : Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext may be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:76)
org.apache.spark.shuffle.TestUtil$.newSparkContext(TestUtil.scala:149)
org.apache.spark.shuffle.sort.SplashUnsafeSorterTest.newSparkContextWithForceSpillSize(SplashUnsafeSorterTest.scala:47)
org.apache.spark.shuffle.sort.SplashUnsafeSorterTest.testSpillLocalSortByKey(SplashUnsafeSorterTest.scala:73)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
org.testng.TestRunner.privateRun(TestRunner.java:648)
org.testng.TestRunner.run(TestRunner.java:505)
org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
org.testng.SuiteRunner.run(SuiteRunner.java:364)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.