Giter VIP home page Giter VIP logo

remoteshuffleservice's Introduction

Uber Remote Shuffle Service (RSS)

Uber Remote Shuffle Service provides the capability for Apache Spark applications to store shuffle data on remote servers. See more details on Spark community document: [SPARK-25299][DISCUSSION] Improving Spark Shuffle Reliability.

Please contact us ([email protected]) for any question or feedback.

Supported Spark Version

  • The master branch supports Spark 2.4.x. The spark30 branch supports Spark 3.0.x.

How to Build

Make sure JDK 8+ and maven is installed on your machine.

Build RSS Server

  • Run:
mvn clean package -Pserver -DskipTests

This command creates remote-shuffle-service-xxx-server.jar file for RSS server, e.g. target/remote-shuffle-service-0.0.9-server.jar.

Build RSS Client

  • Run:
mvn clean package -Pclient -DskipTests

This command creates remote-shuffle-service-xxx-client.jar file for RSS client, e.g. target/remote-shuffle-service-0.0.9-client.jar.

How to Run

Step 1: Run RSS Server

  • Pick up a server in your environment, e.g. server1. Run RSS server jar file (remote-shuffle-service-xxx-server.jar) as a Java application, for example,
java -Dlog4j.configuration=log4j-rss-prod.properties -cp target/remote-shuffle-service-0.0.9-server.jar com.uber.rss.StreamServer -port 12222 -serviceRegistry standalone -dataCenter dc1

Step 2: Run Spark application with RSS Client

  • Upload client jar file (remote-shuffle-service-xxx-client.jar) to your HDFS, e.g. hdfs:///file/path/remote-shuffle-service-0.0.9-client.jar

  • Add configure to your Spark application like following (you need to adjust the values based on your environment):

spark.jars=hdfs:///file/path/remote-shuffle-service-0.0.9-client.jar
spark.executor.extraClassPath=remote-shuffle-service-0.0.9-client.jar
spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager
spark.shuffle.rss.serviceRegistry.type=standalone
spark.shuffle.rss.serviceRegistry.server=server1:12222
spark.shuffle.rss.dataCenter=dc1
  • Run your Spark application

Run with High Availability

Remote Shuffle Service could use a Apache ZooKeeper cluster and register live service instances in ZooKeeper. Spark applications will look up ZooKeeper to find and use active Remote Shuffle Service instances.

In this configuration, ZooKeeper serves as a Service Registry for Remote Shuffle Service, and we need to add those parameters when starting RSS server and Spark application.

Step 1: Run RSS Server with ZooKeeper as service registry

  • Assume there is a ZooKeeper server zkServer1. Pick up a server in your environment, e.g. server1. Run RSS server jar file (remote-shuffle-service-xxx-server.jar) as a Java application on server1, for example,
java -Dlog4j.configuration=log4j-rss-prod.properties -cp target/remote-shuffle-service-0.0.9-server.jar com.uber.rss.StreamServer -port 12222 -serviceRegistry zookeeper -zooKeeperServers zkServer1:2181 -dataCenter dc1

Step 2: Run Spark application with RSS Client and ZooKeeper service registry

  • Upload client jar file (remote-shuffle-service-xxx-client.jar) to your HDFS, e.g. hdfs:///file/path/remote-shuffle-service-0.0.9-client.jar

  • Add configure to your Spark application like following (you need to adjust the values based on your environment):

spark.jars=hdfs:///file/path/remote-shuffle-service-0.0.9-client.jar
spark.executor.extraClassPath=remote-shuffle-service-0.0.9-client.jar
spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager
spark.shuffle.rss.serviceRegistry.type=zookeeper
spark.shuffle.rss.serviceRegistry.zookeeper.servers=zkServer1:2181
spark.shuffle.rss.dataCenter=dc1
  • Run your Spark application

remoteshuffleservice's People

Contributors

codeharsh avatar hiboyang avatar mabansal avatar mayurdb avatar vectorijk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

remoteshuffleservice's Issues

Compile error on Spark30 branch

I try to compile Spark30 branch, but it compile error. The error info is as followed:
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:3.1.1:shade (default) on project remote-shuffle-service: Error creating shaded jar: Problem shading JAR /home/ec2-user/.m2/repository/org/glassfish/jersey/core/jersey-common/2.30/jersey-common-2.30.jar entry META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class: java.lang.IllegalArgumentException -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Can anyone help to check this?

shuffle read error

After RSS is enabled for Spark, the shuffle read data displayed on the stage page is inconsistent with the shuffle write data.
branch spark30.

[Spark 3] RSS performance with Adaptive Skew Join Optimization

Current support of RSS with AQE framework does not provide performant APIs for AQE skew join optimization. To explain more, when AQE detects a skew partition it tries to divide the partition into multiple subpartitions – it tries to batch of consecutive mapTaskOutput to achieve target size of a subpartition.

For ESS it works seamlessly as all the mapTaskOutput as stored in separate files, so reducers can fetch the data it requires. Let's say a reducer wants data for mapID - 5 to 7, it can fetch these individual blocks only (no extra data fetch).

But for RSS mapTaskOutput for all the mapper tasks are intertwined in one single partition file. Due to this RSS has to fetch full partition file every time and then filter out the data to get the required mapID. This adds to a lot of overhead to network and compute IO.

Ex: For a partition file size 50 GB and target size 1 GB, AQE will try to divide this partition into 50 subpartitions to divide the work into 50 executors. Now each executor will have to fetch the 50GB partition file and filter out 49GB of data to get the required 1GB data. Moreover, as 1 or 2 RSS servers are serving partition files to all of the executors, it'll affect the network and IO throughput of those servers. We've seen such applications choking the network out throughput for > 5hrs and making hosts unresponsive, this leads up to the failure of other applications as well.

We're proposing an approach to solve this issue by creating an index file for each partition while writing that partition on the shuffle server. And adding a new RSS server API to read partial partition data from partition files using index file. This approach will also require a custom Skew Join Optimizer operator to be plugged in that will divide the partition based on just data size instead of "Batch of consecutive mapTaskOutput".
More details on this doc - https://docs.google.com/document/d/1nctmnhSFpvv5V5coJfqjslhcCwEBDneNY10pG76psEE/edit?usp=sharing

We would love to hear from other users' suggestions on our approach and if they are facing these same issues, how are they tackling them?

Using remote shuffle service with Spark operator

@hiboyang Have you tried using remote shuffle service with spark operator? (spark on K8s operator)?

I tested it with the client jar in my 'SparkApplication' image and it works as expected.

Although I want to include the client jar in my spark operator image so every job that I am submitting to spark operator uses the client jar from spark operator and I don't have to include the client jar in every job image.

I pretty sure this can be done but probably would need the code changes in remote shuffle service?

Unable to match RssShuffleHandle

Hi,
I'm trying to use the RSS client with Spark 2.4.1 but it fails to match RssShuffleHandle, the following is the error.

scala.MatchError: RssShuffleHandle (shuffleId 0, numMaps: 4107, rssServers: 2 servers), partitionFanout: 1 (of class org.apache.spark.shuffle.RssShuffleHandle)
        at org.apache.spark.shuffle.RssShuffleManager.getWriter(RssShuffleManager.scala:152)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:98)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

How to get metrics value from RSS?

We want to monitor some RSS cluster using metrics in RemoteShuffleService project, but I dont know how to get these metrics value.

And I can not find any metric reporter, so I dont know where these metrics are emitted to.

So how can I get these metrics to monitor my RSS cluster?

spark 3.1/3.2?

hi all, I saw there is a spark30 branch for spark 3.0.x supported in the readme. there also seems to be a spark31 branch but wondering is there any plans to support spark 3.2 or could it work out of the box with spark31 branch?

spark 3.0

Hi!
I get an error when running mvn clean package -Pserver -DskipTests

[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  50.673 s
[INFO] Finished at: 2022-04-04T14:11:21+01:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:3.1.1:shade (default) on project remote-shuffle-service: Error creating shaded jar: Problem shading JAR /home/alatau/.m2/repository/org/glassfish/jersey/core/jersey-common/2.30/jersey-common-2.30.jar entry META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class: java.lang.IllegalArgumentException -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

Is there any idea how this can be fixed?

Ubuntu 18
maven-3.6.3

It has error when using Spark 3.1 with RSS Spark-30 branch

When I using Spark 3.1 with RSS Spark-3.0 branch, it has the following error. It seems SparkEnv.get.mapOutputTracker.getMapSizesByRange is no longer existed in Spark 3.1. Does anyone know how to fix it.

java.lang.NoSuchMethodError: org.apache.spark.MapOutputTracker.getMapSizesByRange(IIIII)Lscala/collection/Iterator;
	at org.apache.spark.shuffle.rss.RssUtils$$anon$1.get(RssUtils.scala:93)
	at org.apache.spark.shuffle.rss.RssUtils$$anon$1.get(RssUtils.scala:91)
	at com.uber.rss.util.RetryUtils.retry(RetryUtils.java:118)
	at org.apache.spark.shuffle.rss.RssUtils$.getRssInfoFromMapOutputTracker(RssUtils.scala:91)
	at org.apache.spark.shuffle.rss.BlockDownloaderPartitionRangeRecordIterator.getPartitionRssInfo(BlockDownloaderPartitionRangeRecordIterator.scala:209)
	at org.apache.spark.shuffle.rss.BlockDownloaderPartitionRangeRecordIterator.createBlockDownloaderPartitionRecordIteratorWithoutRetry(BlockDownloaderPartitionRangeRecordIterator.scala:100)
	at org.apache.spark.shuffle.rss.BlockDownloaderPartitionRangeRecordIterator.createBlockDownloaderPartitionRecordIteratorWithRetry(BlockDownloaderPartitionRangeRecordIterator.scala:84)
	at org.apache.spark.shuffle.rss.BlockDownloaderPartitionRangeRecordIterator.<init>(BlockDownloaderPartitionRangeRecordIterator.scala:58)
	at org.apache.spark.shuffle.RssShuffleReader.read(RssShuffleReader.scala:75)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1866)
	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)```

Rss shuffle data size is much larger than external shuffle service

Hi team,

I noticed that the RSS genereates almost 2 times more shuffle data than external shuffle service. For example, a job stage with rss has a shuffle write size of 720 TB, but external shuffle service only has 370 TB, and they are using the same inputs and codes. We also tried with some other job, and got the similar result.

We first suspect this may cause by no compression, but I took a look at the RssShuffleManager, and seems like there is compression/decompression on client side using LZ4(https://github.com/uber/RemoteShuffleService/blob/master/src/main/scala/org/apache/spark/shuffle/RssShuffleWriter.scala#L205, https://github.com/uber/RemoteShuffleService/blob/master/src/main/scala/org/apache/spark/shuffle/rss/BlockDownloaderPartitionRecordIterator.scala#L167).

Any ideas about possible causes of such a big margin of shuffle data? Thanks.

Compilation error caused by scala-maven-plugin

When I tried to compile the program on spark31 branch with mvn clean package -Pserver -DskipTests I got an error

[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile) on project remote-shuffle-service: Execution scala-compile of goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed: Cannot invoke "java.lang.CharSequence.length()" because "this.text" is null -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException

Is there any good solution?

Not support Storing data on HDFS and dynamicAllocation?

Hi,
I found from the shuffle server log that shuffle data are stored on StreamServer:/tmp/StreamServer_xxxx,
Does Uber RSS support to store shuffle data on HDFS now?
And when dynamicAllocation is true, It thrown Exceptions

spark.shuffle.service.enabled false
spark.dynamicAllocation.enabled false

works fine

spark.shuffle.service.enabled true
spark.dynamicAllocation.enabled false or true

ERROR YarnScheduler: Lost executor 1 on 10.174.18.9: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo

spark.shuffle.service.enabled false
spark.dynamicAllocation.enabled true

org.apache.spark.SparkException: Dynamic allocation of executors requires the external shuffle service. You may enable this through spark.shuffle.service.enabled.

Unable to register with external shuffle server

Hi, I am running a WordCount on Spark 2.4.5.
Uber RSS is built from the master branch and deployed with zookeeper mode referring the README

  1. Spark is running on YARN/Hadoop
    some important configs that I didn't change in spark-default.conf
spark.speculation       false
spark.dynamicAllocation.enabled   false
  1. YARN/Hadoop has two versions: 2.7.2 and 3.2.1,only 1 RM and 1 NodeManager running on different nodes
    in yarn-site.xml of both versions:
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle,spark_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>

Only 1 version of YARN/Hadoop is running each time, spark-2.4.5-yarn-shuffle.jar is in their classpath

then I change some configs in spark-default.conf

1. YARN/Hadoop 2.7.2

spark.shuffle.service.enabled=false, it works fine

but when it is true:

# sparkUI
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/MYUSER/appcache/application_1644546216413_0287/blockmgr-ac297d09-e68d-4133-b677-6c6a82e2c8d2], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
#container
[Stage 0:>                                                          (0 + 0) / 2]
[Stage 0:>                                                          (0 + 2) / 2]22/02/21 21:55:07 ERROR YarnClusterScheduler: Lost executor 2 on 10.163.4.4: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/MYUSER/appcache/application_1644546216413_0287/blockmgr-ac297d09-e68d-4133-b677-6c6a82e2c8d2], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

22/02/21 21:55:08 ERROR YarnClusterScheduler: Lost executor 5 on 10.163.4.4: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/MYUSER/appcache/application_1644546216413_0287/blockmgr-c88e9413-9580-45eb-996c-e5d63eab2436], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

2. YARN/Hadoop 3.2.1

when spark.shuffle.service.enabled=true,

# container
22/02/21 21:35:13 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 1000, initial allocation : 200) intervals
Exception in thread "dag-scheduler-event-loop" java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService;
	at org.apache.curator.framework.listen.ListenerContainer.addListener(ListenerContainer.java:40)
	at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:246)
	at com.uber.rss.metadata.ZooKeeperServiceRegistry.<init>(ZooKeeperServiceRegistry.java:80)
	at com.uber.rss.metadata.ZooKeeperServiceRegistry.createTimingInstance(ZooKeeperServiceRegistry.java:52)
	at org.apache.spark.shuffle.RssServiceRegistry$.createServiceRegistry(RssServiceRegistry.scala:64)
	at org.apache.spark.shuffle.RssServiceRegistry$.executeWithServiceRegistry(RssServiceRegistry.scala:79)
	at org.apache.spark.shuffle.RssShuffleManager.getRssServers(RssShuffleManager.scala:395)
	at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.scala:109)
	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:93)
	at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:256)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:252)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.dependencies(RDD.scala:252)
	at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:513)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:462)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:449)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:963)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2069)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
22/02/21 21:35:30 ERROR YarnClusterScheduler: Lost executor 1 on god03v.test.lycc.qihoo.net: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/MYUSER/appcache/application_1644546216413_0283/blockmgr-5cec5330-0e9b-4389-9c93-c3ae1f076ee4], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

when it is false:
it throws com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService; as above

OR, spark.shuffle.service.enabled should always be false because UberRSS has done the job?

Disk damage causes failure

Hi,RSS_1 runs with -rootDir /data_01/rss-data/ ... RSS_i runs with -rootDir /data_i/rss-data/ .
When a disk eg./data_03 is damaged, Spark app failed.

The code shows that spark.shuffle.rss.replicas (default=1) is used to avoid issues like this.But why the app still failed

Is decreasing spark.shuffle.rss.serverRatio (default=20) and increasing spark.shuffle.rss.replicas(eg. 2 ) will work?
Or what should I do when one or some of these disks damage ?

# exception
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing heading bytes 104, DataBlockSyncWriteClient 32 [/xxxx:50682 -> rss03.xxxx.net/xxxx:12222 (rss03.xxxx.net)], SocketException (Connection reset)
	at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:133)
	at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
	at 
 // code of the configs
  val replicas: ConfigEntry[Int] =
    ConfigBuilder("spark.shuffle.rss.replicas")
      .doc("number of replicas for replicated shuffle client.")
      .intConf
      .createWithDefault(1)
  val serverRatio: ConfigEntry[Int] =
    ConfigBuilder("spark.shuffle.rss.serverRatio")
      .doc("how many executors mapping to one shuffle server.")
      .intConf
      .createWithDefault(20)

Corrupted block detected during decompression

Hi, we are seeing some zstd corruption error during shuffle read recently.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 300 in stage 7.0 failed 4 times, most recent failure: Lost task 300.3 in stage 7.0 (TID 5866) (100.65.134.162 executor 200): com.github.luben.zstd.ZstdException: Corrupted block detected
	at com.github.luben.zstd.ZstdDecompressCtx.decompressByteArray(ZstdDecompressCtx.java:216)
	at com.github.luben.zstd.Zstd.decompressByteArray(Zstd.java:409)
	at org.apache.spark.shuffle.rss.BlockDownloaderPartitionRecordIterator.fetchNextDeserializationIterator(BlockDownloaderPartitionRecordIterator.scala:178)

It seems not related to the input files since the spark job succeeded after we retry. Any ideas why and if this is related to rss client/server? Thanks

Spark 3.1/3.2 failed sql skew and local reader tests

Hi, I ran the SparkSqlOptimizeSkewedJoinTest and SparkSqlOptimizeLocalShuffleReaderTest using spark3.1 and spark3.2, and both Rss test failed with assertion error with duplicate output rows.

For example, the expected output of SparkSqlOptimizeLocalShuffleReaderTest has 2 records
1 100, 1 101 however, the rss output has 8 records
1 100, 1 100, 1 100, 1 100, 1 101, 1 101, 1 101, 1 101

I also ran with spark 3.0, and the test passed. Wondering if you have any idea why there is such a issue with spark 3.1 and 3.2

what may cause RssInvalidServerVersionException?

Hi, I am wondering:

Q1. if RssInvalidServerVersionException will occur when RSS-i is restarted by a shell script as soon as it crashes due to some reasons meanwhile some applications are still using it. clients still stores the former RSS-i version but actually the version of the newly registered RSS-i is already changed.

# also the other exception may be caused by the same reason?
org.apache.spark.shuffle.FetchFailedException: Detected server restart, current server: Server{rss04.xxx:12203, 1675897753258, rss04xxx:/data/}, previous server: Server{rss04.xxxx:12203, 1675895945858, rss04xxx:/data/} at org.apache.spark.shuffle.RssShuffleManager$$anon$2.resolveConnection(RssShuffleManager.scala:220) at com.uber.rss.clients.ServerConnectionCacheUpdateRefresher.refreshConnection(ServerConnectionCacheUpdateRefresher.java:49) at com.uber.rss.clients.ServerIdAwareSyncWriteClient.connectImpl(ServerIdAwareSyncWriteClient.java:133) at

Q2. What may cause this exception :

org.apache.spark.shuffle.FetchFailedException: Cannot fetch shuffle 0 partition 362 due to RssAggregateException (RssShuffleStageNotStartedException (Shuffle not started: DataBlockSocketReadClient 274 [/10.2xxx44973 -> /10.20xxx:12212 (1xxxx28)])
com.uber.rss.exceptions.RssShuffleStageNotStartedException: Shuffle not started: DataBlockSocketReadClient 274 [/10.2xxxx:44973 -> /10.2xxx12212 (10.xxxx)]
	at com.uber.rss.clients.ClientBase.checkOKResponseStatus(ClientBase.java:291)
	at com.uber.rss.clients.ClientBase.readResponseStatus(ClientBase.java:275)
	at ...

fault tolerance of restarting server

I'm a little confused, shouldn't a server using local storage that restarts be able to accept/handle downloads if the service can restart in 15 seconds or so, saving its state? it looks like on restart, the server will update its 'running version' to the current time so any existing write clients will fail at this line of code

throw new RssInvalidServerVersionException(msg);

Reading control message may throw an exception in StreamServerMessageDecoder

In master branch StreamServerMessageDecoder.decodeImpl
case when READ_CONTROL_MESSAGE_LEN: if requiredBytes == 0, the control message will be read directly and will not check bytebuf readable length .

I am a little confused, When does requiredBytes equal to 0? In this case, I think reading control message may throw an exception if there is not enough data for bytebuf to read.

      case READ_MESSAGE_TYPE:
        if (in.readableBytes() < Integer.BYTES) {
          return;
        }
        int messageType = in.readInt();
        if (messageType < 0) {
          controlMessageType = messageType;
          state = State.READ_CONTROL_MESSAGE_LEN;
        } else {
          partitionId = messageType;
          state = State.READ_TASK_ATTEMPT_ID;
        }
        return;
      case READ_CONTROL_MESSAGE_LEN:
        if (in.readableBytes() < Integer.BYTES) {
          return;
        }
        requiredBytes = in.readInt();
        if (requiredBytes < 0) {
          throw new RssInvalidDataException(String.format(
              "Invalid control message length: %s, %s",
              requiredBytes, NettyUtils.getServerConnectionInfo(ctx)));
        }
        if (requiredBytes == 0) {
          // requiredBytes == 0 mean?
          Object controlMessage = getControlMessage(ctx, controlMessageType, in);
          out.add(controlMessage);
          resetData();
          state = State.READ_MESSAGE_TYPE;
        } else {
          state = State.READ_CONTROL_MESSAGE_BYTES;
        }
        return;

hit exception writing heading bytes XXXXX

Running a 1TB~3TB Spark Application,it always failed after running several hours.
blow is the Exception

Stage 0:>                                                       (0 + 0) / 1000]22/08/06 13:07:28 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: 
Aborting TaskSet 0.0 because task 886 (partition 886)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Lost task 107.1 in stage 0.0 (TID 1219, 10.203.23.201, executor 463): com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing heading bytes 13586, DataBlockSyncWriteClient 82 [/XXXXXX.201:47560 -> MY_RSS_HOST/10.XXXXX.230:12202 (XXXXXXX)], SocketException (Broken pipe)
	at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:133)
	at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
	at com.uber.rss.clients.ServerIdAwareSyncWriteClient.writeDataBlock(ServerIdAwareSyncWriteClient.java:73)
	at com.uber.rss.clients.ReplicatedWriteClient.lambda$writeDataBlock$2(ReplicatedWriteClient.java:82)
	at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
	at com.uber.rss.clients.ReplicatedWriteClient.writeDataBlock(ReplicatedWriteClient.java:78)
	at com.uber.rss.clients.MultiServerSyncWriteClient.writeDataBlock(MultiServerSyncWriteClient.java:124)
	at com.uber.rss.clients.LazyWriteClient.writeDataBlock(LazyWriteClient.java:99)
	at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:166)
	at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:161)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.shuffle.RssShuffleWriter.sendDataBlocks(RssShuffleWriter.scala:161)
	at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:108)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Broken pipe
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
	at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:131)

Does zeus only support jdk 11 +

when I use a spark image built by myself with jdk8, I met following error:
image
I found a reasonable answer In StackOverflow
image
So if there's a way to make it compatible with jdk8?

Metrics in ScheduledMetricCollector

Hi, I am puzzle with monitoring the RSS cluster.
In StreamServer.java there is code in main

ScheduledMetricCollector scheduledMetricCollector = new ScheduledMetricCollector(server.serviceRegistry);
scheduledMetricCollector.scheduleCollectingMetrics(...);

this scheduledMetricCollector has Gauges including activeNodeCountnumFileDescriptorsunreachableHosts

How should I send these metrics above to DB or else where If we want to monitor the RSS cluster ?

how to run on k8s

I follow the readme on rss-k8 branch,but it seems no difference between just submiting with local shuffle and has log
image

what is the difference between k8 and rss-k8 branch, and how can run on k8s, any help is appreciated

Can Rss have stage retry when one server is down?

Hi, I just found out my spark job got killed with this error:

Caused by: com.uber.rss.exceptions.RssException: Failed to get node data for zookeeper node: /spark_rss/{cluster}/default/nodes/{server_host_name}
    at com.uber.rss.metadata.ZooKeeperServiceRegistry.getServerInfo(ZooKeeperServiceRegistry.java:231)
    ...
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /spark_rss/{cluster}/default/nodes/{server_host_name}
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
    ...
    at com.uber.rss.metadata.ZooKeeperServiceRegistry.getServerInfo(ZooKeeperServiceRegistry.java:228)

I then checked the zookeeper and didn't see this {server_host_name} registered there. So I suspect that it was already removed from zk due to some internal issues with the node, but was picked up by Rss before this happend. When Rss tried to connect, it was no longer on zk, and caused 'NoNodeException'. It retried and failed for 4 times, so then killed the job.

If this was the reason, maybe Rss needs to allow the connection process to skip nodes that are no longer on zookeepr, and pick a current available one? Any thoughts would be appreciated, thanks!

How to gracefully retention one RSS server?

We have several RSS servers, if we want to gracefully retention one RSS server which need to be transparent to user and user's shuffle replica is 1. Do we have any solution, like only output data but not write new data?

[Proposal] Unsafe memory management in RSS mappers

Mappers in RSS send shuffle data for any given partition to a single RSS servers, so that reducers can read the shuffle data from a single location. To incorporate this, a hashmap of (partition Id -> shuffle data for that partition) is maintained in mappers. It works as follows:
image

  • Maintain a hashmap of the partition id to a buffer.
  • After a new record is received, get the shuffle partition it must be sent to using the shuffle dependency.
  • If the hashmap already has the entry corresponding to this partition ID, data is serialized and then appended to the buffer. Buffer is created first if not already present.
  • If after the insertion, the size of the buffer is greater than 32 KB (configurable), send the contents of that partition ID to the appropriate RSS server.
  • If the size of the entire hashmap becomes greater than a configured value, entire data in the hashmap is sent to the RSS servers

This approach leads to perf degradation when the number of records and/or amount of shuffle data is very high mainly because:

  • Cost of hashmap lookups -
    Even though the hashmap size is not large (hashmap per mapper will have upto number of shuffle partitions keys), but one hashmap look up per record adds to a significant overhead. It is very common for a spark application to read billion plus records, which is why hashmap look-ups become an overhead even though each call takes a constant amount of time.

  • High serialization cost -
    Individual buffers per key are allowed to grow only till 32KB, this leads to a very high number of spills per mapper. For a 1TB shuffle, we have seen cases of it being as high as 10e9.
    Within a single mapper task, one buffer and one serializer stream to write to this buffer is created per hashmap key. Because data to one partition can be sent over multiple spills, more than one buffer and its accompanying serializer stream might be created per partition. In the worst case, if a record size is large, there could be as many as the number of record allocation happening within each map task.
    Such large numbers of small java object allocations and buffer extension causes memory pressure and there by GC, viz, frequent and larger pauses which also adds to the latencies.

  • Excessive internal copying of data -
    Current implementation, copies over the data from the hashmap to a new buffer before spilling. This not only adds to the latency but also creates memory pressure, which in itself can degrade the application performance further.

image

Proposal
Use unsafe memory management for RSS in lines with Spark's unsafe memory management.
image

  • Stores the map output data in serialized form
  • Buffers the data in memory as much as possible. Chunks the data before sending it to RSS servers.
  • Avoids any extra copying of the data before spill
  • Uses Java’s unsafe apis to acquire large chunks of memory for storing the serialized record
  • After receiving a record, it is serialized and stored at an offset in the already acquired page in memory.
  • A tuple of (PartitionId, Memory location where record is stored) is maintained in an Array
  • Array is sorted by the partition before spilling. Once sorted, data is read from the accompanying pointer location

For implementation, components from the Spark's implementation of unsafe shuffle writer can be used.
Based on the POC, here are the performance numbers
image

How to evaluate rss cluster size?

Hi, we have about 20,000 daily spark applications. All these apps produce 100TB shuffle writes/reads data respectively. A peak app produces 6TB, but most produce less than 100GB.

We ran some online spark apps on test rss cluster and found that rss node consume obviously more memory than cpu/disk/etc which even cause some rss/node break down .

Now we would like to run all apps on rss, any suggestions on rss cluster size and machine selection with replicas=2
eg.memory, cpu, node num,disk,memory-cpu ratio, etc. Any suggesion will help.

One more: mappers send data to StreamServer,StreamServer stores in memory,then flush to disk, not many calculations, so rss consumes memory a lot more than cpu, is that right?

How long the shuffle data of each ShuffleStage will be stored in RSS nodes?

The char shows the shuffle stages of an application

After Stage-1 ShuffleWriting 349GB data, next 2 shuffle stages have no ShuffleWrite data,
is the 349GB data will be cleaned when stage-6 starts ShuffleWriting or already be cleaned when stage-2 starts shuffleWriting though it has no data to write?

stage-897 is the final stage and stage-896 has the latest ShuffleWrite with about 501GB .So the 501GB will be kept for 36 hours according to DEFAULT_APP_FILE_RETENTION_MILLIS(default 36h)?

StageId Input (GB) Output (GB) ShuffleRead (GB) ShuffleWrite (GB)
0 503.59 0.0 0.0 1723.94
1 0.0 0.0 1721.11 349.34
2 0.0 0.0 349.05 0.0
5 0.0 0.0 349.05 0.0
6 575.31 0.0 0.0 3045.01
... ... ... ... ...
889 2439.36 0.0 269.53 1885.78
896 0.0 0.0 922.01 501.73
897 0.0 2900.98 2386.41 0.0

Does RSS support multiple StreamServers on the same node?

RSS not support multiple disk directories so can I run multiple StreamServers on one node and each is specified with one disk directory? Or I can only use LVM to map multiple disk into one directory and point it to the only StreamServer?

One more question
I run a Spark-SQL using No-RSS and RSS,both applications have the same input
But why the data size of Stage 10 Shuffle Read differ from each other so much?

image

Does Rss support YARN executor preemption?

We use Rss in our adhoc cluster with preemption enabled, which mean some tasks may fail due to resource being preempted while writing shuffle data.

I am wondering if the shuffle writer has written only with part of the shuffle data when it is preempted, and the server doesn't mark it as corrupted since the stream has been closed properly. Will this cause any shuffle data inconsistency in the partition file? Thanks.

Unsupported shuffle manager

Hi,
I'm trying to use the RSS client with Spark 2.4.7 but it fails to register:

Error in SQL statement: SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.0.227.167, executor 118): ExecutorLostFailure (executor 118 exited caused by one of the running tasks) Reason: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/local_disk0/spark-7fab8fdc-44a9-4259-ad5d-fd2c0551c2ec/executor-eeb9da1f-0869-4ecb-8374-6ededd031062/blockmgr-3e832fe3-cfd5-446a-bec0-5659f34d9be5], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:144)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:115)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:83)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)

write amplification

i'm noticing running some spark apps that produce 11TB of shuffle data on external shuffle service, that they produce closer to 18TB of shuffle data on remote shuffle service. is some write amplification expected?

Failed to get all RSS servers

Hi,
I would like to test JavaWordCount using Uber Remote Shuffle Service, I followed https://github.com/uber/RemoteShuffleService README to:
1. BUILD RSS client and server then got xx-client.jar and xx-server.jar.
2. RUN java -Dxxx -cp com.uber.rss.StreamServer -port 12222 -serviceRegistry standalone -dataCenter dc1 on one of my client node where i used to submit spark application
3. set my spark-default.cnf referring the README
4. ./spark-submit xxxx to submit an application

then Exception returned:Failed to get all RSS servers

My puzzles are:
1. Is your RSS compatible with YARN ? since I only saw -serviceRegistry supports Standalone and zookeeper
2. If not, what should i still need to do to run on YARN with your RSS

enviroment:
all of our spark applications are running on YARN

command to submit:
$SPARK_HOME/bin/spark-submit
--class org.apache.spark.examples.JavaWordCount
--conf spark.speculation=false
--jars <MY_PATH>/remote-shuffle-service-0.0.9-client.jar
--conf spark.driver.extraClassPath=remote-shuffle-service-0.0.9-client.jar
--conf spark.executor.extraClassPath=remote-shuffle-service-0.0.9-client.jar
--conf spark.shuffle.service.enabled=false
--conf spark.dynamicAllocation.enabled=false
$SPARK_HOME/examples/jars/spark-examples_2.11-2.4.5.jar
hdfs://<MY_HDFS_PATH>/datas.txt

part of Exceptions:
Exception in thread "main" com.uber.rss.exceptions.RssException: Failed to get all RSS servers
at com.uber.rss.metadata.ServiceRegistryUtils.getReachableServers(ServiceRegistryUtils.java:82)
at org.apache.spark.shuffle.RssShuffleManager$$anonfun$8.apply(RssShuffleManager.scala:396)
at org.apache.spark.shuffle.RssShuffleManager$$anonfun$8.apply(RssShuffleManager.scala:395)
at org.apache.spark.shuffle.RssServiceRegistry$.executeWithServiceRegistry(RssServiceRegistry.scala:80)
at org.apache.spark.shuffle.RssShuffleManager.getRssServers(RssShuffleManager.scala:395)
at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.scala:109)
at org.apache.spark.ShuffleDependency.(Dependency.scala:93)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:256)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:252)
at scala.Option.getOrElse(Option.scala:121)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.