Giter VIP home page Giter VIP logo

firestorm's Introduction

firestorm's People

Contributors

cxzl25 avatar dependabot[bot] avatar dgzdot avatar duanmeng avatar frankliee avatar jerqi avatar kaijchen avatar toujours33 avatar wforget avatar xunxunmimi5577 avatar zuston avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

firestorm's Issues

Error with patch spark-3.1.2_dynamic_allocation_support.patch

Hi
I tried to apply this patch /spark-patches/spark-3.1.2_dynamic_allocation_support.patch for version 3.2.1
I get an error
[ERROR] [Error] /home/alatau/spark-3.2.1-src/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala:180: wrong number of arguments for pattern org.apache.spark.sql.execution.CoalescedPartitionSpec(startReducerIndex: Int,endReducerIndex: Int,dataSize: Option[Long])

180: case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>

Can you help with fixing the patch for version 3.2.1 ?

"Blocks read inconsistent" happened when shuffle read

你好, 我在执行spark 作业时, 在shuffle read阶段,显示blocks read inconsistent错误, 请问是什么原因呢?

server.conf:

rss.server.flush.thread.alive=2
rss.server.flush.threadPool.size=4
rss.server.buffer.capacity=6g
rss.server.read.buffer.capacity=3g
rss.server.disk.capacity=180g

server_rss_env.sh:
....
XMX_SIZE="12g"
....

Storage path must be set by the client or fetched from coordinators.

hello, I tried in MEMORY_HDFS mode and reported the following error:

WARN shuffle.RssShuffleUtils: Empty conf items
ERROR shuffle.RssShuffleUtils: Storage path must be set by the client or fetched from coordinators.
ERROR spark.SparkContext: Error initializing SparkContext.
.
.
Caused by: java.lang.IllegalArgumentException: Storage path must be set by the client or fetched from coordinators.
at org.apache.spark.shuffle.RssShuffleUtils.validateRssClientConf(RssShuffleUtils.java:220)
at org.apache.spark.shuffle.RssShuffleManager.(RssShuffleManager.java:154)
.

where can i set the Storage path

Failed when Spark ESS is enabled

Hi,
I am running a WordCount on Spark-2.4.5(on YARN) + FireStorm-0.2.0,
Spark External Shuffle Service is running on NodeManager with spark-2.4.5-yarn-shuffle.jar in its classpath.
When spark.shuffle.service.enabled=false, it works fine, but when it is true, it failed.
In both cases coordinator and shuffle server detect the application.
So Firestorm doesn't need to enable ESS ? if not, what should I do
Thank You

########  ESS enabled #########

# container log
22/02/22 10:52:33 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 1000, initial allocation : 200) intervals

[Stage 0:>                                                          (0 + 0) / 2]
[Stage 0:>                                                          (0 + 2) / 2]22/02/22 10:53:20 ERROR YarnClusterScheduler: Lost executor 2 on 10.163.4.4: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/MYUSER/appcache/application_1644546216413_0288/blockmgr-a9f411a8-baac-4dfd-9044-7902fc9ebbd9], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

22/02/22 10:53:21 ERROR YarnClusterScheduler: Lost executor 1 on 10.163.4.4: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/MYUSER/appcache/application_1644546216413_0288/blockmgr-44708e30-2a4c-4063-a31e-5b9db7194f04], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)


[Stage 0:>                                                          (0 + 1) / 2]22/02/22 10:53:25 ERROR YarnClusterScheduler: Lost executor 5 on 10.163.4.4: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/MYUSER/appcache/application_1644546216413_0288/blockmgr-5ddc98e8-2935-4805-9c16-2c8a09a4d9e7], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

22/02/22 10:53:25 ERROR YarnClusterScheduler: Lost executor 3 on 10.163.4.4: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/MYUSER/appcache/application_1644546216413_0288/blockmgr-443ec3cb-e1f4-40d1-8954-58c115527d93], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)


[Stage 0:>                                                          (0 + 0) / 2]22/02/22 10:54:06 ERROR YarnClusterScheduler: Lost executor 6 on 10.163.4.4: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.UnsupportedOperationException: Unsupported shuffle manager of executor: ExecutorShuffleInfo{localDirs=[/home/yarn/nm-local-dir/usercache/MYUSER/appcache/application_1644546216413_0288/blockmgr-9b801e2a-30b7-4178-bfeb-c3c694ac6691], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.RssShuffleManager}
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.registerExecutor(ExternalShuffleBlockResolver.java:149)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:113)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at org.spark_project.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.spark_project.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

# coordinator log:

[INFO] 2022-02-22 11:03:06,229 Grpc-181 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[application_1644546216413_0292_1645498981354], shuffleId[0], partitionNum[2], partitionNumPerRange[1], replica[1]
[WARN] 2022-02-22 11:03:06,231 Grpc-181 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [5] and found only [3]
[INFO] 2022-02-22 11:03:06,231 Grpc-181 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[application_1644546216413_0292_1645498981354], shuffleId[0] are [10.163.4.11-19999, 10.163.4.9-19999]
[INFO] 2022-02-22 11:03:26,338 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 1 applications
 [INFO] 2022-02-22 11:03:56,338 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 1 applications
[INFO] 2022-02-22 11:04:26,338 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 1 applications


# shuffle server log
   [INFO] 2022-02-22 11:03:07,064 Grpc-529 ShuffleServerGrpcService registerShuffle - Get register request for appId[application_1644546216413_0292_1645498981354], shuffleId[0] with 1 partition ranges
[INFO] 2022-02-22 11:03:12,153 Grpc-531 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0292_1645498981354
 [INFO] 2022-02-22 11:03:22,095 Grpc-533 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0292_1645498981354
[INFO] 2022-02-22 11:03:32,085 Grpc-535 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0292_1645498981354
########### ESS disabled ############

# coordinator log
[INFO] 2022-02-22 11:09:30,033 Grpc-566 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[application_1644546216413_0293_1645499365247], shuffleId[0], partitionNum[2], partitionNumPerRange[1], replica[1]
[WARN] 2022-02-22 11:09:30,034 Grpc-566 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [5] and found only [3]
[INFO] 2022-02-22 11:09:30,035 Grpc-566 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[application_1644546216413_0293_1645499365247], shuffleId[0] are [10.163.4.11-19999, 10.163.4.9-19999]
 [INFO] 2022-02-22 11:09:56,338 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 1 applications
 [INFO] 2022-02-22 11:10:26,338 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 1 applications
 [INFO] 2022-02-22 11:10:56,338 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 1 applications
 [INFO] 2022-02-22 11:11:26,338 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 1 applications
 [INFO] 2022-02-22 11:11:56,338 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 1 applications
[INFO] 2022-02-22 11:11:56,338 ApplicationManager-0 ApplicationManager statusCheck - Remove expired application:application_1644546216413_0293_1645499365247


# shuffle server log

[INFO] 2022-02-22 11:07:21,981 expiredAppCleaner ShuffleTaskManager checkResourceStatus - Detect expired appId[application_1644546216413_0292_1645498981354] according to rss.server.app.expired.withoutHeartbeat
[INFO] 2022-02-22 11:07:21,981 clearResourceThread ShuffleTaskManager removeResources - Start remove resource for appId[application_1644546216413_0292_1645498981354]
[INFO] 2022-02-22 11:07:21,981 clearResourceThread ShuffleTaskManager removeResources - Finish remove resource for appId[application_1644546216413_0292_1645498981354] cost 0 ms
    [INFO] 2022-02-22 11:09:30,458 Grpc-558 ShuffleServerGrpcService registerShuffle - Get register request for appId[application_1644546216413_0293_1645499365247], shuffleId[0] with 1 partition ranges
[INFO] 2022-02-22 11:09:35,497 Grpc-560 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0293_1645499365247
[INFO] 2022-02-22 11:09:45,469 Grpc-562 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0293_1645499365247
[INFO] 2022-02-22 11:09:55,467 Grpc-564 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0293_1645499365247
[INFO] 2022-02-22 11:10:05,469 Grpc-566 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0293_1645499365247
 [INFO] 2022-02-22 11:10:15,467 Grpc-580 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0293_1645499365247
[INFO] 2022-02-22 11:10:15,677 Grpc-584 ShuffleServerGrpcService reportShuffleResult - Report 1 blocks as shuffle result for the task of appId[application_1644546216413_0293_1645499365247], shuffleId[0], taskAttemptId[0]
[INFO] 2022-02-22 11:10:16,547 Grpc-590 ShuffleServerGrpcService finishShuffle - Get finishShuffle request for appId[application_1644546216413_0293_1645499365247], shuffleId[0]
[WARN] 2022-02-22 11:10:16,549 Grpc-590 ShuffleFlushManager getCommittedBlockIds - Unexpected value when getCommittedBlockIds for appId[application_1644546216413_0293_1645499365247]
[INFO] 2022-02-22 11:10:16,552 pool-5-thread-9 LocalStorageMeta createMetadataIfNotExist - Create metadata of shuffle application_1644546216413_0293_1645499365247/0.
[INFO] 2022-02-22 11:10:17,550 Grpc-590 ShuffleTaskManager commitShuffle - Checking commit result for appId[application_1644546216413_0293_1645499365247], shuffleId[0], expect committed[2], remain[2]
[INFO] 2022-02-22 11:10:17,553 Grpc-590 ShuffleTaskManager commitShuffle - Finish commit for appId[application_1644546216413_0293_1645499365247], shuffleId[0] with expectedCommitted[2], cost 1006 ms to check
[INFO] 2022-02-22 11:10:18,462 Grpc-592 ShuffleServerGrpcService reportShuffleResult - Report 1 blocks as shuffle result for the task of appId[application_1644546216413_0293_1645499365247], shuffleId[0], taskAttemptId[1]
[INFO] 2022-02-22 11:10:25,471 Grpc-594 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0293_1645499365247
[INFO] 2022-02-22 11:10:35,469 Grpc-596 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0293_1645499365247
 [INFO] 2022-02-22 11:10:45,471 Grpc-598 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0293_1645499365247
[INFO] 2022-02-22 11:10:47,055 Grpc-604 ShuffleServerGrpcService getLocalShuffleIndex - Successfully getShuffleIndex cost 0 ms for 80 bytes with appId[application_1644546216413_0293_1645499365247], shuffleId[0], partitionId[1]
[INFO] 2022-02-22 11:10:47,076 Grpc-607 ShuffleServerGrpcService getLocalShuffleData - Successfully getShuffleData cost 1 ms for shuffle data with appId[application_1644546216413_0293_1645499365247], shuffleId[0], partitionId[1]offset[0]length[1473]
[INFO] 2022-02-22 11:10:55,972 Grpc-609 ShuffleServerGrpcService appHeartbeat - Get heartbeat from application_1644546216413_0293_1645499365247
     [INFO] 2022-02-22 11:13:21,981 expiredAppCleaner ShuffleTaskManager checkResourceStatus - Detect expired appId[application_1644546216413_0293_1645499365247] according to rss.server.app.expired.withoutHeartbeat
[INFO] 2022-02-22 11:13:21,981 clearResourceThread ShuffleTaskManager removeResources - Start remove resource for appId[application_1644546216413_0293_1645499365247]
[INFO] 2022-02-22 11:13:21,984 clearResourceThread MultiStorageManager removeResources - Start to remove resource of appId: application_1644546216413_0293_1645499365247, shuffles: [0]
[INFO] 2022-02-22 11:13:21,984 clearResourceThread LocalStorage removeResources - Start to remove resource of application_1644546216413_0293_1645499365247/0

Coordinator HA problem

It looks like all coordinators will serve for all spark jobs instead of the only one serve for online(like master/slave). And the spark job will pick one coordinator to get assignment according to quorm sequence.

And the coordinator is only responsible for partition assignment.

If we have two coordinator serve for online, it means when one coordinator dead, the RSS still works. And when all coordinators dead, the existing shuffleIDs still work, and the new-coming shuffle request will fail.

If i am wrong ,feel free to let me know.
Anyone could help provide more detailed architecture design info?

Errors in settings

Hi,

Please help me figure out the RSS settings
My Cluster
hadoop 3.1.3
spark 3.2

Now YARN Shuffle Service
I want to set up an external RSS

sandbox02 – coordinator1
sandbox03 – coordinator2
sandbox04 – server1 RSS
sandbox05 – service2 RSS

My settings are like this now

Coordinator

rss.rpc.server.port 19999
rss.jetty.http.port 19998
rss.coordinator.server.heartbeat.timeout 30000
rss.coordinator.app.expired 60000
rss.coordinator.shuffle.nodes.max 3
rss.coordinator.exclude.nodes.file.path /opt/rss_fs/conf/exclude_nodes

Server

rss.rpc.server.port 19999
rss.jetty.http.port 19998
rss.storage.basePath /srv/data/01/rssdata,/srv/data/02/rssdata,/srv/data/03/rssdata
rss.storage.type MEMORY_HDFS
rss.coordinator.quorum sandbox02:19999,sandbox03:19999
rss.server.buffer.capacity 5gb
rss.server.buffer.spill.threshold 2gb
rss.server.partition.buffer.size 50mb
rss.server.read.buffer.capacity 5gb
rss.server.flush.thread.alive 50
rss.server.flush.threadPool.size 100
rss.server.hdfs.base.path hdfs://sandbox-test/rss

Spark - spark-defaults.conf

...
spark.master                                          yarn
spark.jars                                            hdfs://sandbox-test/spark3/jars/rss-client-spark3-0.3.0-shaded.jar
spark.yarn.archive                                    hdfs://sandbox-test/spark3/yarn/archive.zip

spark.shuffle.service.enabled                         false
spark.dynamicAllocation.enabled                       false

spark.shuffle.manager                                 org.apache.spark.shuffle.RssShuffleManager
spark.rss.coordinator.quorum                          10.3.118.10:19999,10.3.118.11:19999
spark.rss.storage.type                                MEMORY_HDFS
spark.rss.base.path                                   hdfs://sandbox-test/rss

YARN - yarn-site.xml

...
<property>
  <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
  <value>org.apache.spark.shuffle.RssShuffleManager</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.spark_shuffle.classpath</name>
  <value>/opt/rss_fs/jars/client/spark3/rss-client-spark3-0.3.0-shaded.jar</value>
</property>
...

With such settings, I can't even run spark-shell
I have tried different options for settings, but so far without result.
There are no errors in the Coordinator and Server logs.

Coordinator logs:

[INFO] 2022-04-07 20:22:22,532 main CoordinatorServer main - Start to init coordinator server using config ./conf/coordinator.conf
[INFO] 2022-04-07 20:22:22,542 main RssUtils getPropertiesFromFile - Load config from ./conf/coordinator.conf
[INFO] 2022-04-07 20:22:22,680 main CoordinatorServer registerMetrics - Register metrics
[INFO] 2022-04-07 20:22:22,728 main CoordinatorServer registerMetrics - Add metrics servlet
[INFO] 2022-04-07 20:22:22,748 main CoordinatorServer addServlet - Add metrics servlet
[INFO] 2022-04-07 20:22:23,016 main Server doStart - jetty-9.0.2.v20130417
[INFO] 2022-04-07 20:22:23,058 main ContextHandler doStart - started o.e.j.s.ServletContextHandler@4a003cbe{/,null,AVAILABLE}
[INFO] 2022-04-07 20:22:23,076 main ServerConnector doStart - Started ServerConnector@7ea28149{HTTP/1.1}{0.0.0.0:19998}
[INFO] 2022-04-07 20:22:23,076 main JettyServer start - Jetty http server started, listening on port 19998
[INFO] 2022-04-07 20:22:23,149 main GrpcServer start - Grpc server started, listening on 19999.
[INFO] 2022-04-07 20:22:52,587 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 0 applications
[INFO] 2022-04-07 20:23:22,587 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 0 applications
[INFO] 2022-04-07 20:23:22,598 UpdateExcludeNodes-0 SimpleClusterManager parseExcludeNodesFile - Update exclude nodes and 0 nodes was marked as exclude nodes
[INFO] 2022-04-07 20:23:52,587 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 0 applications

Server logs:

[INFO] 2022-04-07 20:22:37,032 main ShuffleServer main - Start to init shuffle server using config ./conf/server.conf
[INFO] 2022-04-07 20:22:37,042 main RssUtils getPropertiesFromFile - Load config from ./conf/server.conf
[INFO] 2022-04-07 20:22:37,078 main ShuffleServer initialization - Start to initialize server 172.18.0.1-19999
[INFO] 2022-04-07 20:22:37,136 main ShuffleServer registerMetrics - Register metrics
[INFO] 2022-04-07 20:22:37,183 main ShuffleServer registerMetrics - Add metrics servlet
[INFO] 2022-04-07 20:22:37,254 main CoordinatorClientFactory createCoordinatorClient - Start to create coordinator clients from sandbox02:19999,sandbox03:19999
[INFO] 2022-04-07 20:22:37,501 main CoordinatorClientFactory createCoordinatorClient - Add coordinator client Coordinator grpc client ref to sandbox02:19999
[INFO] 2022-04-07 20:22:37,505 main CoordinatorClientFactory createCoordinatorClient - Add coordinator client Coordinator grpc client ref to sandbox03:19999
[INFO] 2022-04-07 20:22:37,511 main CoordinatorClientFactory createCoordinatorClient - Finish create coordinator clients Coordinator grpc client ref to sandbox02:19999, Coordinator grpc client ref to sandbox03:19999
[INFO] 2022-04-07 20:22:37,613 main RegisterHeartBeat startHeartBeat - Start heartbeat to coordinator sandbox02:19999,sandbox03:19999 after 10000ms and interval is 10000ms
[INFO] 2022-04-07 20:22:37,617 main Server doStart - jetty-9.0.2.v20130417
[INFO] 2022-04-07 20:22:37,656 main ContextHandler doStart - started o.e.j.s.ServletContextHandler@205d38da{/,null,AVAILABLE}
[INFO] 2022-04-07 20:22:37,675 main ServerConnector doStart - Started ServerConnector@73eedaf0{HTTP/1.1}{0.0.0.0:19998}
[INFO] 2022-04-07 20:22:37,676 main JettyServer start - Jetty http server started, listening on port 19998
[INFO] 2022-04-07 20:22:37,752 main GrpcServer start - Grpc server started, listening on 19999.
[INFO] 2022-04-07 20:22:37,753 main ShuffleServer start - Shuffle server start successfully!

Can you point out my mistakes in the settings?

Support shuffle data replica?

Thanks for your great work on it.

I have a question that I dont see any info about data replica support in README or other doc. But i see the config in codebase.

So the data replica is stable feature in firestorm?

kill 1 rss which is being shuffle read/wirte, application will be re-run by yarn with new attempt id

Test it by Terasort job.

If an application needs to run for 5 hours and the one RSS is abnormal halfway, it will waste a lot of time.

Four cases were tested,

  1. shuffle write & 1replicas: app re-run
  2. shuffle write & 2replicas: app re-run
  3. shuffle read & 1replicas: app re-run
  4. shuffle read & 2replicas: no problem

Shuffle read can be run normally by config 2 replicas. But for shuffle write, there is no way to avoid application re-run.
For shuffle write, is it possible to make driver re-registershuffle to get new reachable rss?

使用firestorm-0.4.0 运行spark3.1.1官方的JavaWordCount报如下错误,并且在yarn-client模式下driver端进程一直不退出

java.io.StreamCorruptedException: invalid stream header: 74001673 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:64) at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:64) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:123) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.createKVIterator(RssShuffleDataIterator.java:71) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:118) at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:213) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:125) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.scheduler.Task.run(Task.scala:134) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:535) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:545) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

quorum write/read bug

图片

这段代码的逻辑感觉有问题啊,还是我的理解有问题? 正常不应该是判断 每一个 partition 成功 report 的次数 达到 replicaWrite 阈值时,才能算最终 report 成功么? 现在的逻辑判断的好像是,成功 report 的 shuffle server数 达到 replicaWrite 阈值时,就算是最终 report 成功。

yarn-client模式下driver端进程一直不退出

图片

如上图 任务结束后 driver端一直在上报心跳

我测试了下 把 scheduledExecutorService 改成守护线程可以解决该问题
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("rss-heartbeat-%d").build());

duplicate servlets map in Coordinator Server

exception as blow:
Exception in thread "main" java.lang.IllegalStateException: Multiple servlets map to path: /prometheus/metrics/jvm: com.tencent.rss.common.web.CommonMetricsServlet-50a638b5,com.tencent.rss.common.web.CommonMetricsServlet-31368b99
at org.eclipse.jetty.servlet.ServletHandler.updateMappings(ServletHandler.java:1558)
at org.eclipse.jetty.servlet.ServletHandler.doStart(ServletHandler.java:157)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:131)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:105)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.handler.ScopedHandler.doStart(ScopedHandler.java:120)
at org.eclipse.jetty.server.handler.ContextHandler.startContext(ContextHandler.java:809)
at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:345)
at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:778)
at org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:262)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:131)
at org.eclipse.jetty.server.Server.start(Server.java:427)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:105)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.Server.doStart(Server.java:394)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at com.tencent.rss.common.web.JettyServer.start(JettyServer.java:146)
at com.tencent.rss.coordinator.CoordinatorServer.start(CoordinatorServer.java:75)
at com.tencent.rss.coordinator.CoordinatorServer.main(CoordinatorServer.java:70)

shuffle write is too slow

When I use firestorm as the external shuffle service, I found that the execution of spark tasks(with 2 executors, 2c4g resource limit for each) using firestorm is much slower than using the built-in spark shuffle service. You can see through spark UI, mainly because the shuffle write time is too large.

With built-in shuffle service:

image

With firestorm:

image

I added some debugging code to the logic of firestorm writing shuffle data to the shuffle server. For example, the 2294 task in the screenshot, shuffle write takes 47ms, of which:

From spark logs I found these stats:

{"type":"service","category":"org.apache.spark.shuffle.writer.RssShuffleWriter","level":"info","message":"Finish write shuffle for appId[spark-94d48661c6ff4b9f8f83ef860a509ee61639469487297], shuffleId[33], taskId[2294_0] with write 42 ms, include checkSendResult[6], commit[36], WriteBufferManager cost copyTime[0], writeTime[0], serializeTime[0], compressTime[0], estimateTime[0], requireMemoryTime[0], uncompressedDataLen[528]","context":{},"reqId":"d20a2b6c-042c-4697-aad5-484455ca6cb7"}

Below are coordinator and shuffle server configurations(with 1 coordinator and 1 shuffle server instance for testing)

# java -versioon
openjdk version "1.8.0_292"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_292-b10)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.292-b10, mixed mode)

rss-env.sh

set -o pipefail
set -e
XMX_SIZE="2g"
RUNNER="${JAVA_HOME}/bin/java"
JPS="${JAVA_HOME}/bin/jps"

coordinator.conf

rss.rpc.server.port 19999
rss.jetty.http.port 19998
rss.coordinator.server.heartbeat.timeout 30000
rss.coordinator.app.expired 60000
rss.coordinator.shuffle.nodes.max 13
rss.coordinator.exclude.nodes.file.path /data/exclude_nodes

server.conf

rss.rpc.server.port 20001
rss.jetty.http.port 20000
rss.storage.basePath /data/rss
rss.storage.type LOCALFILE
rss.coordinator.quorum xxx:19999
rss.server.buffer.capacity 40gb
rss.server.buffer.spill.threshold 22gb
rss.server.partition.buffer.size 150mb
rss.server.read.buffer.capacity 20gb
rss.server.flush.thread.alive 50
rss.server.flush.threadPool.size 100

Compared with the built-in shuffle service, the task execution time is more than 3 times higher. Please see the reason for the slow shuffle write.

fault tolerance

hi all, a few questions.

1 -- does restarting the process on a server cause spark tasks to fail, or will they wait and retry once server is up and running again?

2 -- for localfile storage, does it support multiple disk drives?

hardcoded relative paths

Hi, I noticed that the coordinator runner seems to have hardcoded locations of various jars relative to the HADOOP_HOME directory. This could work for some hadoop installations, but my installation has various jars located in different directories than the hardcoded relative paths used (i.e. $HADOOP_HOME/share/hadoop/yarn/ wouldn't work for me, but /hadoop/yarn/ would be the corresponding path for me). Would there be anyway to add an option for explicit references to the specific libraries required instead of having everything hardcoded relative to HADOOP_HOME? If this is not possible, would it be possible to describe all of the Hadoop dependencies so that I can sym link everything to a synthetic directory? Thanks!

Example error from trying to run:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
at com.tencent.rss.coordinator.CoordinatorServer.initialization(CoordinatorServer.java:116)
at com.tencent.rss.coordinator.CoordinatorServer.(CoordinatorServer.java:53)
at com.tencent.rss.coordinator.CoordinatorServer.main(CoordinatorServer.java:68)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 3 more

Error happened when getShuffleAssignments with appId XXX to coordinator

Hi,
I had built Firestorm successfully and got rss-0.2.0-SNAPSHOT.tgz, then I

  1. Deployed coordinator on nodeA whose IP is IP_A

      1. unzip package to RSS_HOME
      1. update RSS_HOME/bin/rss-env.sh
        XMX_SIZE="2g"
      1. update RSS_HOME/conf/coordinator.conf
        *4. start coordaiantor: ./bin/start-shuffle-server.sh, then the coordinator process is running on nodeA
  2. Deployed shuffle server on nodeB whose IP is IP_B

    • the first two steps are the same as 1)
      1. update RSS_HOME/conf/server.conf
        rss.coordinator.quorum IP_A:19999
        rss.storage.basePath /data1/rssdata
        (didnt change other configs in server.conf )
      1. start shuffle server: /bin/start-shuffle-server.sh , then the shuffle server process is running on nodeB
  3. Deploy spark client

      1. add rss-client-spark2-0.2.0-SNAPSHOT-shaded.jar to my $SPARK_HOME/jars
      1. update spark-default.cnf:
        spark.rss.coordinator.quorum IP_A:19999
        unabled the dynamicAllocation etc.
      1. Run a java word count examle:
        $SPARK_HOME/bin/spark-submit
        --name MYNAME
        --class org.apache.spark.examples.JavaWordCount
        $SPARK_HOME/examples/jars/spark-examples_VERSION.jar
        hdfs://MY_HDFS_PATH/datas.txt

Exception retured:

1/12/20 17:54:37 ERROR ShuffleWriteClientImpl: Error happened when getShuffleAssignments with appId[application_1637568296779_621930_1639994076918], shuffleId[0], numMaps[1], partitionNumPerRange[1] to coordinator
21/12/20 17:54:37 WARN DAGScheduler: Creating new stage failed due to exception - job: 0
com.tencent.rss.common.exception.RssException: Error happened when getShuffleAssignments with appId[application_1637568296779_621930_1639994076918], shuffleId[0], numMaps[1], partitionNumPerRange[1] to coordinator
at com.tencent.rss.client.impl.ShuffleWriteClientImpl.throwExceptionIfNecessary(ShuffleWriteClientImpl.java:381)
at com.tencent.rss.client.impl.ShuffleWriteClientImpl.getShuffleAssignments(ShuffleWriteClientImpl.java:257)
at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.java:177)
at org.apache.spark.ShuffleDependency.(Dependency.scala:93)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:256)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:252)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:252)
at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:513)
at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:462)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:449)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:963)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2069)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
21/12/20 17:54:37 INFO DAGScheduler: Job 0 failed: collect at JavaWordCount.java:53, took 0.525986 s
Exception in thread "main" com.tencent.rss.common.exception.RssException: Error happened when getShuffleAssignments with appId[application_1637568296779_621930_1639994076918], shuffleId[0], numMaps[1], partitionNumPerRange[1] to coordinator

shuffle server OOM

I deployed the shuffle server release 0.1.0 version on a 16c64g machine with XMX_SIZE="55g" configuration. When running a spark application, the shuffle server memory will continue to grow, and eventually, it will grow to about 60g to trigger the OOM and exit.

server.conf

rss.rpc.server.port 20001
rss.jetty.http.port 20000
rss.rpc.executor.size 2000
rss.storage.basePath /data/rss
rss.storage.type LOCALFILE
rss.coordinator.quorum xxx:19999,xxx:19999,xxx:19999
rss.server.buffer.capacity 20000000000
rss.server.buffer.spill.threshold 5000000000
rss.server.partition.buffer.size 157200000
rss.server.read.buffer.capacity 10000000000
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
rss.server.commit.timeout 3600000
rss.storage.data.replica 1
rss.server.flush.thread.alive 5
rss.server.flush.threadPool.size 20
rss.server.app.expired.withoutHeartbeat 30000

rss-env.sh

set -o pipefail
set -e

XMX_SIZE="55g"

RUNNER="${JAVA_HOME}/bin/java"
JPS="${JAVA_HOME}/bin/jps"

image

image

image

Is my configuration incorrect or there is a memory leak in the program?

[QUESTION] Executor在shuffle write/read 过程中是否落本地盘?

每个节点只配置了一块磁盘,且性能很差。没上RSS的时候,shuffle阶段时,多个executor对这个盘进行大量随机I/O,性能很差

后来上了RSS,观测发现 shuffle write 阶段的性能非常好,但是在 shuffle read 阶段,性能下降很快。

请教下:

在shuffle write时,executor直接将shuffle数据传至rss-server?还是落本地盘后再传?
在shuffle read时,executor从rss-server获取数据后,直接算?还是先落本地盘后再算?

spark.rss.storage.type=MEMORY_LOCAL时,这里的LOCAL本地盘是指 shuffle write 还是 shuffle read 阶段的落盘?

Blocks read inconsistent

spark version: 3.2.1
rss version: master
sql: tpc-ds[10T] query17

spark parameters:
conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager
spark.rss.storage.type=HDFS
spark.rss.base.path=hdfs://ns/tmp/rss/hdfs_base_path
park.rss.data.replica=2
spark.dynamicAllocation.enabled=false
spark.shuffle.service.enabled=false
spark.rss.coordinator.quorum=coordinator1:19999,coordinator2:19999

`
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2455)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2404)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2403)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2403)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2643)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2585)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2574)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: com.tencent.rss.common.exception.RssException: Blocks read inconsistent: expected 310 blocks, actual 0 blocks
at com.tencent.rss.client.impl.ShuffleReadClientImpl.checkProcessedBlockIds(ShuffleReadClientImpl.java:222)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:126)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:213)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.smj_findNextJoinRows_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.writer.RssShuffleWriter.write(RssShuffleWriter.java:138)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

`

[QUESTION] 依赖Hadoop环境?

我看到需要配置一个HADOOP_HOME环境变量,Firestorm只能在hadoop环境下部署么?有没有配置可以关掉?

coordinator不需要是高可用的吗

看shuffleWriteClientImpl的getShuffleAssignment方法就是遍历coordinatorClients,请求成功则返回分配结果,这似乎没有做到coodinator的负载均衡 ?

To support more tasks with Firestorm

The current blockId is designed as following:

 // BlockId is long and composed by partitionId, executorId and AtomicInteger
 // AtomicInteger is first 19 bit, max value is 2^19 - 1
 // partitionId is next 24 bit, max value is 2^24 - 1
 // taskAttemptId is rest of 20 bit, max value is 2^20 - 1

Why we need blockId?
It's designed for data check, filter, memory data read, etc.

Why blockId is designed as above?
BlockId will be stored in Shuffle server, to reduce memory cost. Roaringbitmap is used to cache it.
According to implementation of Roaringbitmap, the design of BlockId is target to use BitmapContainer instead of ArrayContainer for memory saving.

What's the problem of blockId?
It can't support taskId which is greater than 2^20 - 1

Proposal
I think the first 19 bit is too much for atomic int, and we can leverage some of them for taskId.

Clear buffered data when acquiring memory failed and then retry

String message = "Can't get memory to cache shuffle data, request[" + askExecutorMemory

In writeBufferManager, when acquiring memory failed, it will directly throw exception. We could clear the buffered data to release memory when requiring memory from task memory manager failed.

If this optimization is needed, i will submit PR later.

ShuffleServer died abnormally

During the creation of DFSClient, ShuffleServer will always exit directly, resulting in a data push failure, and there is no exception log in rss.log.
What am I supposed to do?

Not fast as expected

Hi,
I'm comparing Firestorm with original spark, I run a wordcount script but firestorm seems to cost more time than the orginal spark.
What am I supposed to do to achieve my target? Thank you

ENV

wordcount.py

a*_wordcount_10G.txt has 10 .txt file, each 10G, so totally 100G
b*_wordcount_10G.txt has 10 .txt file, each 10G, so totally 100G
c*_wordcount_10G.txt has 2 .txt file, each 5G, so totally 10G

# -*- coding: UTF-8 -*-
from __future__ import print_function
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from operator import add
from pyspark.sql import SparkSession


if len(sys.argv) != 3:
  print("Wrong Parameter! Please run X.py app_name datasize(eg. 100G)")
  exit()

app_name=sys.argv[1]
datasize=sys.argv[2]
spark = SparkSession.builder.appName(app_name).getOrCreate()

def read_data(datasize="100G"):
  if datasize == "100G":
    # read multi-txts by pattern *
    a = spark.read.text("HDFSPATH/data/a*_wordcount_10G.txt").rdd.map(lambda r: r[0])
    b = spark.read.text("HDFSPATH/data/b*_wordcount_10G.txt").rdd.map(lambda r: r[0])
    c = spark.read.text("HDFSPATH/data/c*_wordcount_10G.txt").rdd.map(lambda r: r[0])
    d = spark.read.text("HDFSPATH/wordcount_1G.txt").rdd.map(lambda r: r[0])
    return a,b,c,d
  else:
    # for test
    a = spark.read.text("HDFSPATH/wordcount_10m.txt").rdd.map(lambda r: r[0])
    b = spark.read.text("HDFSPATH/wordcount_10m.txt").rdd.map(lambda r: r[0])
    c = spark.read.text("HDFSPATH/wordcount_10m.txt").rdd.map(lambda r: r[0])
    d = spark.read.text("HDFSPATH/wordcount_10m.txt").rdd.map(lambda r: r[0])
    return a,b,c,d


def op1(a,b,c,d):
  a1 = a.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
  b1 = b.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
  c1 = c.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
  d1 = d.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)

  a2 = a1.repartition(20)
  b2 = b1.repartition(20)
  c2 = c1.repartition(20)
  d2 = d1.repartition(20)

  a3 = a2.repartition(4)
  b3 = b2.repartition(4)
  c3 = c2.repartition(4)
  d3 = d2.repartition(4)

  a4 = a3.repartition(15)
  b4 = b3.repartition(15)
  c4 = c3.repartition(15)
  d4 = d3.repartition(15)

  res = a4.join(b4).join(c4).join(d4)
  res = res.reduceByKey(add).repartition(9).sortByKey()

  count = res.count()
  return count,res

job_num=3
for i in range(job_num):
  a,b,c,d = read_data(datasize)
  count1,res1=op1(a,b,c,d)
  res1.saveAsTextFile("HDFSPATH/out/"+app_name+"-"+str(i))
  a,b,c,d = None,None,None,None


spark.stop()

spark script

$SPARK_HOME/bin/spark-submit  \
--name $APP_NAME \
--master yarn \
--deploy-mode cluster \
--num-executors=5 \
--executor-cores=2 \
--executor-memory=8G \
--conf spark.yarn.executor.memoryOverhead=2g \
/MYPATH/wordcount.py 

part of spark-default.cnf

#main
spark.master            yarn-client
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.driver.memory              3g
spark.executor.memory   3g
#spark.executor.extraJavaOptions -XX:MaxPermSize=1024M
spark.yarn.am.memory    2g


#shuffle
spark.shuffle.service.enabled   false
spark.shuffle.io.maxRetries   6
spark.shuffle.io.retryWait   30s
#spark.shuffle.io.preferDirectBufs false


#dynamical resource allocate
spark.dynamicAllocation.enabled   false
#spark.executor.instances     100
spark.dynamicAllocation.initialExecutors 10
spark.dynamicAllocation.minExecutors  10
spark.dynamicAllocation.maxExecutors  1000
spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.executorIdleTimeout  30s
spark.files.fetchTimeout  600s

spark.network.timeout  300s

spark.sql.catalogImplementation hive
spark.shuffle.file.buffer 8192
spark.shuffle.sort.initialBufferSize 81920
spark.locality.wait 0

#spark.yarn.archive hdfs://XXXX.zip
spark.dynamicAllocation.maxExecutors.fraction 0.4
spark.dynamicAllocation.modify.maxExecutors.enabled true
spark.analysisMetrics.report.enabled true
spark.executorEnv.super_env=/XXX/hdfs/*
spark.yarn.appMasterEnv.super_env=/XXX/hdfs/*

spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
spark.rss.coordinator.quorum XX.7:19999,XX.8:19999
spark.rss.storage.type LOCALFILE_AND_HDFS

original spark 2.4.5 doesn't have the last three configs :

spark.shuffle.manager
spark.rss.coordinator.quorum
spark.rss.storage.type

FireStorm shuffle server
deployed on 3 nodes: XX.9, XX.10, XX.11
conf/server.conf

rss.rpc.server.port 19999
rss.jetty.http.port 19998
rss.rpc.executor.size 2000
rss.storage.type LOCALFILE_AND_HDFS
rss.coordinator.quorum XX.7:19999,xx.8:19999
rss.storage.basePath MY_LOCAL_PATH
rss.server.uploader.base.path MY_HDFS_PATH
rss.server.disk.capacity  40000000000
rss.server.flush.thread.alive 5
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40000000000
rss.server.buffer.spill.threshold 22000000000
rss.server.partition.buffer.size 157200000
rss.server.read.buffer.capacity 20000000000
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
rss.server.commit.timeout 600000
rss.server.app.expired.withoutHeartbeat 120000

FireStorm coordinator

deployed on 2 nodes: XX.7 , XX.8

rss.rpc.server.port 19999
rss.jetty.http.port 19998
rss.coordinator.server.heartbeat.timeout 30000
rss.coordinator.app.expired 60000
rss.coordinator.shuffle.nodes.max 5
rss.coordinator.exclude.nodes.file.path /MYPATH/conf/exclude_nodes

Web UI

It cost 7.2h and 8.9h to finish 1 spark application with 11 jobs for spark2.4.5 and spark2.4.5+firestorm, respectively

image

detailed job 0

image

image

image

The datasize of Shuffle read and write is much larger

hello, when I run the wordcount tesk in Hibench ,datasize is 5g, I found that the datasize of shuffle write and read using firestorm(Memory_HDFS mode) is much larger than using the external shuffle service with the same spark parameters
image

image

could you please analyze why it happened

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.