Giter VIP home page Giter VIP logo

splash's People

Contributors

jealous avatar ronniles avatar sheperdh avatar yuelimv avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

splash's Issues

Improve the performance and memory efficiency in the unsafe code path

We have discovered some places where the IO buffer is not efficiently used in the unsafe code path.
There are also some places in which we could reduce the number of memory copies. Here are the details:

  1. In SplashUnsafeSorter.writeSortedFile, we used a writerBuffer to hold the serialized data. And then write the content in this buffer to the output stream. To avoid the second copy during the write, we create our own SplashBufferedOutputStream which exposes the internal buffer so that it could be used by the serializer to fill the serialized data directly. By doing this, we could also save the memory used by the original writerBuffer. It could also improve the testability of the buffer mechanism. Unit tests are added for SplashBufferedOutputStream to make sure we manage the buffer correctly.
  2. Replace IOUtils.copy with SplashUtils.copy. This function borrows most of the code from IOUtils.copy. The only difference is that it allows the user to specify the size of the buffer. In previous tests, we identified some 4K IO requests. Those IO requests are issued by IOUtils.copy. Because this function uses a fixed 4K IO buffer. This is not efficient nor elastic in a shared file system or distributed file system. This buffer now shares the same Spark configuration spark.shuffle.file.buffer. What's more, since we already have this IO buffer. We could use InputStream and OutputStream directly instead of the buffered version. This helps us to save more memory. Since the copy procedure is executed in the same thread, we could safely reuse the same buffer during the copy. It helps us reduce the GC time.

Create empty shuffle data file failed when the file already exists.

Consider this scenario, a previous task failed and didn't clean up the temp file, the re-run task somehow create the temp file at the same location. This operation will fail due to the FileAlreadyExistsException. We need to delete and retry creation in this scenario.

Do not rely on `OutputStream.available()`

We use OutputStream.available() to retrieve the partition size. In the implementation of LimitedInputStream.available, it always checks the available size of the wrapped stream. This behavior causes extra calls to the getSize function.

java.lang.ClassNotFoundException: org.apache.spark.shuffle.SplashShuffleManager

Hello,
I have built and set up splash and spark according to the documentation. But when I try to run any spark application I am getting the following error message:
java.lang.ClassNotFoundException: org.apache.spark.shuffle.SplashShuffleManager
image

This is my spark-default.con file:

spark.files jars/splash-0.7.0.jar,jars/splash-0.7.0-sources.jar,jars/splash-0.7.0-shaded.jar,jars/splash-0.7.0-javadoc.jar
spark.driver.extraClassPath ./splash-0.7.0.jar:./splash-0.7.0-sources.jar:./splash-0.7.0-shaded.jar:./splash-0.7.0-javadoc.jar
spark.executor.extraClassPath jars/splash-0.7.0.jar,jars/splash-0.7.0-sources.jar,jars/splash-0.7.0-shaded.jar,jars/splash-0.7.0-javadoc.jar
#spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager

set shuffle manager and storage plugin

spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager
spark.shuffle.splash.storageFactory com.memverge.splash.shared.SharedFSFactory
spark.shuffle.splash.clearShuffleOutput false

# set the location of your shared folder

spark.shuffle.splash.folder /home/ruroy/spark/cache

If you could provide any hint to solve this issue that will be very helpful.

Add deploy script for different Spark & Hadoop versions

Add a deploy script to compile and deploy the Splash artifact for different combinations of the Spark and Hadoop versions.
Splash has a dependency on Spark core. The Spark core has a dependency on Hadoop. We need to compile different binaries for different combination of the versions. Add a script would help us to manage those artifacts better.

Add MD5 for each partition when we write

When we read data from underlying storage, we met stream corruption issue, so we turned log level to DEBUG to let splash compute MD5 for each partition during read phase.
However, we don't have corresponding MD5 info during write phase to check.

Support fast merge in the storage plugin interface.

Allow the plugin developer to implement a fast merge which will be invoked when:

  • Encryption is disabled.
  • Compression is disabled or the compression codec supports concatenation of serialized streams.
  • The spark.shuffle.unsafe.fastMergeEnabled option is true.
  • The plugin supports fast merge.

Note the performance of this fast merge function could seriously impact the performance of Spark SQL joins with multiple spills.

Cache shuffle index in memory

Test data shows that when we have 40K partitions (200 mappers & 200 reducers) and small partition size (around 256K), the shuffle manager took almost half the total time retrieving the index file.

Reading 40000 partitions with 8 threads   99% (39991/40000)
Read shuffle data completed in 19049 milliseconds
    Reading index file:  10187 ms
    storage factory:     com.memverge.splash.shared.SharedFSFactory
    number of mappers:   200
    number of reducers:  200
    total shuffle size:  3GB
    bytes written:       3GB
    bytes read:          3GB
    number of blocks:    64
    blocks size:         256KB
    partition size:      81KB
    concurrent tasks:    8
    bandwidth:           167MB/s

Due to the fact that comparing to the shuffle data, shuffle indices are relatively small, we could try the best to cache them in memory to eliminate this overhead.

For example, in the configuration described above, each node will only cache:
8 * 201 * 200 = 320K
8 is the length of long. 201 is the number of partition offsets. 200 is the number of the index files.

When memory is not enough, we should fallback to retrieve the data from the file system.

Test reports warning log related to memory pool.

Unit tests report a lot of warning logs such as:

o.a.s.m.ExecutionMemoryPool : Internal error: release called on 12800 bytes but task only has 0 bytes of memory from the on-heap execution pool

These warning logs need to be investigated and eliminated.

File not found during merge in bypass merge sort code path

For some plugin, the output file is not created when there is no actual write.
We need to simulate the scenario and make sure the default implementation of the merge function in TmpShuffleFile could deal with it correctly. Zero should be returned to create a correct index file.

java.lang.IllegalAccessError when running with Spark 2.4.0

Before build I've changed spark version in pom.xml:
Spark-submit done in local[*] mode

<spark.version>2.4.0</spark.version>

java.lang.IllegalAccessError: tried to access class org.apache.spark.shuffle.sort.ShuffleInMemorySorter from class org.apache.spark.shuffle.sort.SplashUnsafeSorter
at org.apache.spark.shuffle.sort.SplashUnsafeSorter.(SplashUnsafeSorter.scala:77)
at org.apache.spark.shuffle.sort.SplashUnsafeShuffleWriter.(SplashUnsafeShuffleWriter.scala:56)
at org.apache.spark.shuffle.SplashShuffleManager.getWriter(SplashShuffleManager.scala:84)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:98)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Dmo crash due to application folder deletion.

when the first time Application failed due to dmo run out of space, dmo will remove the application folder to prevent left over cause data corruption.
2018-12-16 10:56:11,981 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - cleanup shuffle folder /application_1544982954867_0001/shuffle for application_1544982954867_0001
2018-12-16 10:56:11,982 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:285) - Delete /application_1544982954867_0001/shuffle
2018-12-16 10:56:18,034 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:280) - unlink success for '/application_1544982954867_0001/shuffle'.
2018-12-16 10:56:18,036 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:280) - unlink success for '/application_1544982954867_0001/shuffle'.
2018-12-16 10:56:18,047 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - MemoryStore cleared
2018-12-16 10:56:18,048 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - MemoryStore cleared
2018-12-16 10:56:18,048 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - clear DataStore by remove /application_1544982954867_0001.
2018-12-16 10:56:18,048 [CoarseGrainedExecutorBackend-stop-executor] INFO (Logging.scala:54) - clear DataStore by remove /application_1544982954867_0001.
2018-12-16 10:56:18,052 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:285) - Delete /application_1544982954867_0001
2018-12-16 10:56:18,052 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:285) - Delete /application_1544982954867_0001
2018-12-16 10:56:20,138 [shuffle-server-5-2] DEBUG (Slf4JLogger.java:81) - Freed 5 thread-local buffer(s) from thread: shuffle-server-5-2
2018-12-16 10:56:20,240 [shuffle-server-5-1] DEBUG (Slf4JLogger.java:81) - Freed 5 thread-local buffer(s) from thread: shuffle-server-5-1
2018-12-16 10:56:20,240 [shuffle-server-5-2] DEBUG (Slf4JLogger.java:81) - Freed 7 thread-local buffer(s) from thread: shuffle-server-5-2
2018-12-16 10:56:20,599 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:280) - unlink success for '/application_1544982954867_0001'.
2018-12-16 10:56:20,599 [CoarseGrainedExecutorBackend-stop-executor] DEBUG (DMOFile.java:280) - unlink success for '/application_1544982954867_0001'.

Later on, DMO crashed due to other node still try to access the data which already removed
2018-12-16 10:56:44,538 [Executor task launch worker for task 7] DEBUG (DMOFile.java:224) - get attribute with path for /application_1544982954867_0001/shuffle/shuffle_0_7_0.index
2018-12-16 10:56:44,540 [Executor task launch worker for task 7] ERROR (DMOFile.java:227) - failed to get size of '/application_1544982954867_0001/shuffle/shuffle_0_7_0.index'.
com.memverge.mvfs.error.MVFSException: get attributes failed, dmo id: "/application_1544982954867_0001/shuffle/shuffle_0_7_0.index", rc: -2052.
at com.memverge.mvfs.dmo.DMOFile.checkGetAttrResult(DMOFile.java:142)
at com.memverge.mvfs.dmo.DMOFile.attribute(DMOFile.java:149)
at com.memverge.mvfs.dmo.DMOFile.getSize(DMOFile.java:225)
at com.memverge.splash.dmo.DMOShuffleFile.getSize(DMOShuffleFile.java:30)
at org.apache.spark.shuffle.SplashShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$3.apply(SplashShuffleBlockResolver.scala:177)
at org.apache.spark.shuffle.SplashShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$3.apply(SplashShuffleBlockResolver.scala:176)
at org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)

How to delete the shuffle data

Hi,
I read the design doc https://docs.google.com/document/d/1kSpbBB-sDk41LeORm3-Hfr-up98Ozm5wskvB49tUhSs/edit?usp=sharing of splash. It's a great work which let spark use remote FS to store shuffle file with plugins. I only have one question that how the splash delete the shuffle files on the remote fs when spark job is killed or crashed. In the originally spark on yarn, Nodemanager will delete working datas of an app when the app finish, but i don't know who will do this work when running spark with splash.

Add support for HDFS compliant file systems

In cloud environments, it is a common requirement to be able to persist shuffle data outside of the node on which a Spark task is running. Since many workloads run on top of file systems which implement HDFS semantics (FileContext and FileSystem specifically), a storage plugin for these systems will be used to provide within the code base. This will also allow users of Spark 2.4 releases to use external shuffle storage which is HDFS compliant.

Splash doesn't compile with Spark 2.4.0

Compiling Splash for Spark 2.4.0 failed with the error:

[ERROR] /git/splash/src/main/scala/org/apache/spark/shuffle/SplashShuffleReader.scala:50: type mismatch;
found : Iterator[(org.apache.spark.storage.BlockId, Long)]
required: Seq[(org.apache.spark.storage.BlockId, Long)]
[ERROR] readShuffleBlocks(shuffleBlocks)
[ERROR] ^
[ERROR] one error found
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------

MapOutputTracker.getMapSizesByExecutorId() in Spark 2.4.0 has changed its return value type from Seq to Iterator. The Splash code needs to updated accordingly as well.

Reduce the number of calls to ShuffleFile.getSize

ShuffleFile.getSize is a meta-data call of the file system. It could take several microseconds or even milliseconds in a shared file system. This would cause some overhead to the shuffle procedure.

When possible, we should use the recorded committed size or written bytes that are available in the memory instead of calling ShuffleFile.getSize to retrieve this information.

Structure enhancement of the shuffle reader

The code of shuffle reader is not well structured and lack the capability of inserting tests for those classes. The separation of responsibility is not clear. Re-structure the code to meet the following requirements:

  • Insert the error handling and dump logic in the correct place and test the dump logic in the unit test.
  • Remove the state in SplashShuffleFetchIterator and make it a case class.
  • Extract the SplashShuffleFetcher class which is responsible for:
    • Track the resource usage and resource cleanup of the current partition.
    • Error handling and trace information of the current partition.
    • Data transformation related the current partition.
  • Extract the iterator wrapper in SplashShuffleReader to separate functions
    • getAggregatedIterator adds the combiner logic into the iterator
    • getSortedIterator adds the sorter logic into the iterator.
  • Move all sorter related metrics tracking logic into the sorter itself.

java.lang.IllegalArgumentException because of spark version string being blank

I am testing Splash with Spark version 2.4.6 and Hadoop 2.8.5 for a use case. When I try to run tests in SplashUnsafeSorterTest, all of them fail with error message Spark tried to parse '' as a Spark version string, but it could not find the major/minor/maintenance version numbers. This issue doesn't happen with earlier versions of Spark. I have tested 2.4.0 and 2.4.1. The detailed stack trace in the output of mvn test -Dspark.version=2.4.6 is below.

java.lang.ExceptionInInitializerError
	at org.apache.spark.SparkContext$$anonfun$3.apply(SparkContext.scala:183)
	at org.apache.spark.SparkContext$$anonfun$3.apply(SparkContext.scala:183)
	at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
	at org.apache.spark.SparkContext.logInfo(SparkContext.scala:73)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:183)
	at org.apache.spark.shuffle.TestUtil$.newSparkContext(TestUtil.scala:149)
	at org.apache.spark.shuffle.sort.SplashUnsafeSorterTest.newSparkContextWithForceSpillSize(SplashUnsafeSorterTest.scala:47)
	at org.apache.spark.shuffle.sort.SplashUnsafeSorterTest.testSpillLocalSortByKey(SplashUnsafeSorterTest.scala:73)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
	at org.testng.TestRunner.privateRun(TestRunner.java:648)
	at org.testng.TestRunner.run(TestRunner.java:505)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
	at org.testng.TestNG.runSuites(TestNG.java:1049)
	at org.testng.TestNG.run(TestNG.java:1017)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
Caused by: java.lang.IllegalArgumentException: Spark tried to parse '' as a Spark version string, but it could not find the major/minor/maintenance version numbers.
	at org.apache.spark.util.VersionUtils$.shortVersion(VersionUtils.scala:48)
	at org.apache.spark.package$.<init>(package.scala:94)
	at org.apache.spark.package$.<clinit>(package.scala)
	... 32 more

2020-09-03 15:19:03.664 WARN  [main] o.a.s.SparkContext : Another SparkContext is being constructed (or threw an exception in its constructor).  This may indicate an error, since only one SparkContext may be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:76)
org.apache.spark.shuffle.TestUtil$.newSparkContext(TestUtil.scala:149)
org.apache.spark.shuffle.sort.SplashUnsafeSorterTest.newSparkContextWithForceSpillSize(SplashUnsafeSorterTest.scala:47)
org.apache.spark.shuffle.sort.SplashUnsafeSorterTest.testSpillLocalSortByKey(SplashUnsafeSorterTest.scala:73)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
org.testng.TestRunner.privateRun(TestRunner.java:648)
org.testng.TestRunner.run(TestRunner.java:505)
org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
org.testng.SuiteRunner.run(SuiteRunner.java:364)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.