Giter VIP home page Giter VIP logo

flink-remote-shuffle's Introduction

Remote Shuffle Service for Flink

Overview

This project implements a remote shuffle service for batch data processing of Flink. By adopting the storage and compute separation architecture, it brings several important benefits:

  1. The scale up/down of computing resources and storage resources is now decoupled which means you can scale each up/down on demand freely.

  2. Compute and storage stability never influence each other anymore. The remote shuffle service is free of user-code which can improve shuffle stability. For example, the termination of TaskExecutors will not lead to data loss and the termination of remote ShuffleWorkers is tolerable.

  3. By offloading the data shuffle work to the remote shuffle service, the computation resources can be released immediately after the upstream map tasks finish which can save resources.

In addition, the remote shuffle implementation borrows some good designs from Flink which can benefit both stability and performance, for example:

  1. Managed memory is preferred. Both the storage and network memory are managed which can significantly solve the OutOfMemory issue.

  2. The credit-based backpressure mechanism is adopted which is good for both network stability and performance.

  3. The zero-copy network data transmission is implemented which can save memory and is also good for stability and performance.

Besides, there are other important optimizations like load balancing and better sequential IO ( benefiting from the centralized service per node), tcp connection reuse, shuffle data compression, adaptive execution (together with FLIP-187), etc.

Before going open source, this project has been used in production widely and behaves well on both stability and performance. Hope you enjoy it.

Supported Flink Version

The remote shuffle service works together with Flink 1.14+. Some patches are needed to be applied to Flink to support lower Flink versions. If you need any help on that, please let us know, we can offer some help to prepare the patches for the Flink version you use.

Document

The remote shuffle service supports standalone, yarn and k8s deployment. You can find the full documents here. In the future, more internal implementation detail specifications will be supplemented.

Building from Source

To build this flink remote shuffle project from source, you should first clone the project:

git clone [email protected]:flink-extended/flink-remote-shuffle.git

Then you can build the project using maven (Maven and Java 8 required):

cd flink-remote-shuffle # switch to the remote shuffle project home directory
mvn package -DskipTests

After finish, you can find the target distribution in the build-target folder. Note that if you want to run tests locally, we suggest you to run mvn install -DskipTests first to avoid potential failures.

For k8s deployment, you can run the following command to build the docker image (Docker required):

cd flink-remote-shuffle # switch to the remote shuffle project home directory
sh ./tools/build_docker_image.sh

You can also publish the docker image by running the following command. The script that publishes the docker image takes three arguments: the first one is the registry address (default value is 'docker.io'), the second one is the namespace (default value is 'flinkremoteshuffle'), the third one is the repository name (default value is 'flink-remote-shuffle').

cd flink-remote-shuffle # switch to the remote shuffle project home directory
sh ./tools/publish_docker_image.sh REGISTRY NAMESPACE REPOSITORY

Example

After building the code from source, you can start and run a demo flink batch job using the remote shuffle service locally (Flink 1.14+ required):

As the first step, you can download the Flink distribution from the Flink's download page, for example, Apache Flink 1.14.0 for Scala 2.11:

wget https://dlcdn.apache.org/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.11.tgz
tar zxvf flink-1.14.0-bin-scala_2.11.tgz

Then after building the remote shuffle project from source, you can copy the shuffle plugin jar file from build-target/lib directory (for example, build-target/lib/shuffle-plugin-1.0-SNAPSHOT.jar) to the Flink lib directory and copy the build-in example job jar file to the flink home directory ( flink-1.14.0):

cp flink-remote-shuffle/build-target/lib/shuffle-plugin-1.0-SNAPSHOT.jar flink-1.14.0/lib/
cp flink-remote-shuffle/build-target/examples/BatchJobDemo.jar flink-1.14.0/

After that, you can start a local remote shuffle cluster by running the following command:

cd flink-remote-shuffle # switch to the remote shuffle project home directory
cd build-target # run after building from source
./bin/start-cluster.sh -D remote-shuffle.storage.local-data-dirs="[HDD]/tmp/" -D remote-shuffle.memory.data-writing-size=256m -D remote-shuffle.memory.data-reading-size=256m

Then you can start a local Flink cluster and config Flink to use the remote shuffle service by running the following command:

cd flink-1.14.0 # switch to the flink home directory
./bin/start-cluster.sh -D shuffle-service-factory.class=com.alibaba.flink.shuffle.plugin.RemoteShuffleServiceFactory -D remote-shuffle.manager.rpc-address=127.0.0.1

Finally, you can run the demo batch job:

cd flink-1.14.0 # switch to the flink home directory
bin/flink run -c com.alibaba.flink.shuffle.examples.BatchJobDemo ./BatchJobDemo.jar

To stop the local cluster, you can just run the stop-cluster.sh script in the bin directory:

cd flink-1.14.0 # switch to the flink home directory
bin/stop-cluster.sh
cd flink-remote-shuffle # switch to the remote shuffle project home directory
bin/stop-cluster.sh

How to Contribute

Any feedback of this project is highly appreciated. You can report a bug by opening an issue on GitHub. You can also contribute any new features or improvements. See the contribution guide for more information.

Support

We provide free support for users using this project. You can join the Slack channel or scan the following QR code to join the DingTalk user support group for further help and collaboration:

English:

Please join the Slack channel by clicking this invitation.

Chinese:

Acknowledge

This is a Flink ecosystem project. Apache Flink is an excellent unified stateful data processing engine. This project borrows some good designs (e.g. the credit-based backpressure) and building blocks (e.g. rpc and high availability) from Flink.

flink-remote-shuffle's People

Contributors

aitozi avatar gaoyunhaii avatar reswqa avatar tanyuxin-tyx avatar wsry avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-remote-shuffle's Issues

Support standby ShuffleManager

Motivation

Currently, the high availability of ShuffleManager depends on the support of external services when it hangs up. In essence, ShuffleManager has a single point problem.

We can introduce one or more standby ShuffleManagers to solve the problem. When active ShuffleManager hangs up occasionally, standby ShuffleManager should automatically switch to the active mode, which could improve the stability and high availability.

Changes

Introduce standby ShuffleManager to improve the stability and high availability.

Test

Unit test.
E2E test.
Test manually on a cluster.

Support data replication

Motivation

For some jobs, data loss and reproduction is not acceptable, data replication is needed to handle data loss in this scenario.

Changes

Allow to config the replication factor and only produce the data when all replicas are lost.

Test

  • Unit test.
  • Test manually on a cluster.

Support dynamic log level

Motivation

Support to change log level dynamically can help to debug the shuffle system.

Changes

Add a rest API to both ShuffleManager and ShuffleWorker together with the corresponding handler.

Test

  • Test manually on a cluster.

Introduce a simple disk health checker

Motivation

If a disk is unhealthy, we need to remove it automatically, otherwise, the job can fail repeatedly and never succeed.

Changes

Implement a simple disk health checker which simply checks whether new file can be created and is readable and writable. If not, remove the unhealthy disk automatically.

Test

  • Unit test.
  • Test manually on a cluster.

Not trigger fatal error if Flink job has finished when notifying new shuffle manager address

Motivation

Currently, if a job has finished (either error or not) and at the same time, the shuffle manager leader changes, the new leader notification may cause a fatal error which may lead to JM failover. It mainly influence session cluster when more than one jobs share the same cluster.

Changes

Not trigger fatal error if Flink job has finished when notifying new shuffle manager address.

Test

  • Unit test.
  • Test manually on a cluster.

Handle network issues gracefully

Motivation

Currently, network issues like unstable network may cause task failover which may further lead to reproducing of data. In fact, we can improve the behavior by reconnecting and retransmitting to the remote ShuffleWorker.

Changes

For network issues, the client and server should not fail immediately, instead, if the client can reconnect to the server in some timeout, no failover or data reproducing should be triggered. At the same time, we may need a switch which can disable this feature.

Test

  • Unit test.
  • Test manually on a cluster.

flink 1.15.2 compile failed

flink 1.15.2
org.apache.flink.core.classloading.ComponentClassLoader#ComponentClassLoader add a new parameter, need support.
AkkaRpcServiceUtils compile failed

image

Support Kubernetes-based high-availability services

When the deployment mode is Kubernetes, can we consider supporting Kubernetes-based high-availability services?
In this way, zk is not our only choice.
All the HA information relevant for a specific component will be stored in a single ConfigMap.

Support to restrict the maximum storage space can be used per disk

Motivation

For shared storage, we may need to restrict the maximum storage space can be used to reserve enough storage spaces for other applications.

Changes

Support to restrict the maximum storage space can be used per disk.

Test

  • Unit test.
  • Test manually on a cluster.

ShuffleReadClient is blocked when create channel

Description

We maybe encounter a dead lock

  • netty threads is waiting for lock <0x00000006b40e8898>

Task stack

"HashJoin(joinType=[LeftOuterJoin], where=[(user_id = user_id0)], select=[user_id, user_id0, tag], build=[right]) [Source: HiveSource-smcdrisk_antifraud.th_dp_whitelist_raw_tab] -> Calc(select=[add_salt(user_id) AS user_id, MyRow(_UTF-16LE'{"fields":[{"user_id":"BIGINT","dp_abtesting_user_tag":"string"}]}', CASE(tag IS NOT NULL, CAST(tag), _UTF-16LE'none':VARCHAR(8) CHARACTER SET "UTF-16LE"), _UTF-16LE'dp_abtesting_user_tag') AS EXPR$1]) -> NotNullEnforcer(fields=[user_id]) -> Sink: Sink(table=[shopee_catalog.fs_database.seamoney_antifraud_buyer_th__dp_abtesting_user_tag_th], fields=[user_id, EXPR$1]) (7/100)#3" #397 prio=5 os_prio=0 tid=0x00007feb2c069800 nid=0xc9eb in Object.wait() [0x00007fea3dad9000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:252)
	- locked <0x00000007ee659160> (a org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:131)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:30)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:403)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:119)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:30)
	at com.alibaba.flink.shuffle.transfer.ConnectionManager.createChannel(ConnectionManager.java:197)
	at com.alibaba.flink.shuffle.transfer.ConnectionManager.getOrCreateChannel(ConnectionManager.java:128)
	at com.alibaba.flink.shuffle.transfer.ConnectionManager.getChannel(ConnectionManager.java:114)
	at com.alibaba.flink.shuffle.transfer.ShuffleReadClient.connect(ShuffleReadClient.java:158)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate.setup(RemoteShuffleInputGate.java:250)
	at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
	at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:955)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:658)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)


"HashJoin(joinType=[LeftOuterJoin], where=[(user_id = user_id0)], select=[user_id, user_id0, tag], build=[right]) [Source: HiveSource-smcdrisk_antifraud.th_dp_whitelist_raw_tab] -> Calc(select=[add_salt(user_id) AS user_id, MyRow(_UTF-16LE'{"fields":[{"user_id":"BIGINT","dp_abtesting_user_tag":"string"}]}', CASE(tag IS NOT NULL, CAST(tag), _UTF-16LE'none':VARCHAR(8) CHARACTER SET "UTF-16LE"), _UTF-16LE'dp_abtesting_user_tag') AS EXPR$1]) -> NotNullEnforcer(fields=[user_id]) -> Sink: Sink(table=[shopee_catalog.fs_database.seamoney_antifraud_buyer_th__dp_abtesting_user_tag_th], fields=[user_id, EXPR$1]) (4/100)#2" #263 prio=5 os_prio=0 tid=0x00007feb2c08c800 nid=0xcded waiting on condition [0x00007fec581eb000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007ee75b7b8> (a java.util.concurrent.CompletableFuture$Signaller)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
	at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
	at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
	at com.alibaba.flink.shuffle.transfer.ConnectionManager.releaseChannel(ConnectionManager.java:165)
	at com.alibaba.flink.shuffle.transfer.ShuffleReadClient.close(ShuffleReadClient.java:276)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate.close(RemoteShuffleInputGate.java:331)
	- locked <0x00000006b40e8898> (a java.lang.Object)
	at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.close(InputGateWithMetrics.java:119)
	at org.apache.flink.runtime.taskmanager.Task.closeAllInputGates(Task.java:1013)
	at org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:978)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:904)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)


Netty stack

"Remote Shuffle Netty Client (10086)-thread-8" #112 daemon prio=5 os_prio=0 tid=0x00007feb78037000 nid=0x8712 waiting for monitor entry [0x00007fec388fa000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate.onBuffer(RemoteShuffleInputGate.java:466)
	- waiting to lock <0x00000006b40e8898> (a java.lang.Object)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate.lambda$getDataListener$2(RemoteShuffleInputGate.java:498)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate$$Lambda$622/1265568361.accept(Unknown Source)
	at com.alibaba.flink.shuffle.transfer.ShuffleReadClient.dataReceived(ShuffleReadClient.java:206)
	at com.alibaba.flink.shuffle.transfer.ReadClientHandler.channelRead(ReadClientHandler.java:174)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at com.alibaba.flink.shuffle.transfer.DecoderDelegate.channelRead(DecoderDelegate.java:133)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)

"Remote Shuffle Netty Client (10086)-thread-7" #111 daemon prio=5 os_prio=0 tid=0x00007feb78036000 nid=0x8711 runnable [0x00007fec38bfd000]
   java.lang.Thread.State: RUNNABLE
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait0(Native Method)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native.java:132)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)

"Remote Shuffle Netty Client (10086)-thread-6" #110 daemon prio=5 os_prio=0 tid=0x00007feb78035000 nid=0x8710 waiting for monitor entry [0x00007fec38afb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate.lambda$getFailureListener$3(RemoteShuffleInputGate.java:517)
	- waiting to lock <0x00000006b40e8898> (a java.lang.Object)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate$$Lambda$623/1441833118.accept(Unknown Source)
	at com.alibaba.flink.shuffle.transfer.ShuffleReadClient.channelInactive(ShuffleReadClient.java:226)
	at com.alibaba.flink.shuffle.transfer.ReadClientHandler.channelInactive(ReadClientHandler.java:129)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at com.alibaba.flink.shuffle.transfer.DecoderDelegate.channelInactive(DecoderDelegate.java:114)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)

"Remote Shuffle Netty Client (10086)-thread-5" #109 daemon prio=5 os_prio=0 tid=0x00007feb78034000 nid=0x870f waiting for monitor entry [0x00007fec584ed000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate.lambda$getFailureListener$3(RemoteShuffleInputGate.java:517)
	- waiting to lock <0x00000006b40e8898> (a java.lang.Object)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate$$Lambda$623/1441833118.accept(Unknown Source)
	at com.alibaba.flink.shuffle.transfer.ShuffleReadClient.channelInactive(ShuffleReadClient.java:226)
	at com.alibaba.flink.shuffle.transfer.ReadClientHandler.channelInactive(ReadClientHandler.java:129)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at com.alibaba.flink.shuffle.transfer.DecoderDelegate.channelInactive(DecoderDelegate.java:114)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)

"Remote Shuffle Netty Client (10086)-thread-4" #108 daemon prio=5 os_prio=0 tid=0x00007feb78032800 nid=0x870e runnable [0x00007fec386f8000]
   java.lang.Thread.State: RUNNABLE
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native Method)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native.java:148)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native.java:141)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)

"Remote Shuffle Netty Client (10086)-thread-3" #107 daemon prio=5 os_prio=0 tid=0x00007feb78031800 nid=0x870c runnable [0x00007fec53ffe000]
   java.lang.Thread.State: RUNNABLE
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native Method)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native.java:148)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native.java:141)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)

"Remote Shuffle Netty Client (10086)-thread-2" #106 daemon prio=5 os_prio=0 tid=0x00007feb780d0800 nid=0x870b runnable [0x00007fec389fb000]
   java.lang.Thread.State: RUNNABLE
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native Method)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native.java:148)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native.java:141)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)

"Remote Shuffle Netty Client (10086)-thread-1" #101 daemon prio=5 os_prio=0 tid=0x00007feb78005800 nid=0x7d5c waiting for monitor entry [0x00007fec582eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate.lambda$getFailureListener$3(RemoteShuffleInputGate.java:517)
	- waiting to lock <0x00000006b40e8898> (a java.lang.Object)
	at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleInputGate$$Lambda$623/1441833118.accept(Unknown Source)
	at com.alibaba.flink.shuffle.transfer.ShuffleReadClient.channelInactive(ShuffleReadClient.java:226)
	at com.alibaba.flink.shuffle.transfer.ReadClientHandler.channelInactive(ReadClientHandler.java:129)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at com.alibaba.flink.shuffle.transfer.DecoderDelegate.channelInactive(DecoderDelegate.java:114)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)

Add more metrics

Motivation

These metrics can help us better monitor the shuffle cluster.

Changes

Add more metrics.

Test

  • Unit test.
  • Test manually on a cluster.

Implement ReducePartition

Motivation

As described in the document, ReducePartition is a good supplement of the current MapPartition. It has several good features, for example, it can benefit streaming and hybrid shuffle (mixed streaming and batch) and it is good for performance for some scenarios.

Changes

It is big change including many aspects, including shuffle API enhancement (Flink side), ShuffleMaster change, shuffle write/read change, network protocol change, storage change, fault tolerance change and so on. We need to Implement it step by step.

Tasks

Test

  • UT test.
  • E2E test.
  • Stability test.
  • Test manually on a cluster.

Introduce rest APIs for dynamic ShuffleWorker online/offline

Motivation

Based on the rest API, we can remove or add ShuffleWorker dynamically without restarting the cluster. For example, we can remove a bad ShuffleWorker or we can add the removed ShuffleWorker back. (Note that start of new ShuffleWorker is already supported.)

Changes

New rest APIs need to be added together with the corresponding handler. In the handler, the ShuffleWorker can be remove from the list which means that new data will not be written to that ShuffleWorker. Furthermore, we may choose to remove the produced data at the same time. We can also offer a new API to kill a selected ShuffleWorker (Note that a new ShuffleWorker maybe started by the external system like K8s).

Test

  • Unit test.
  • Test manually on a cluster.

Adapt to Flink 1.16

Motivation

As Flink 1.16 is going to release, the remote shuffle project should also adapt to 1.16.

Changes

  • Adapt to Flink 1.16 which means adapting to some new changes in Flink 1.16, including adaptive execution and other API changes.

Test

Test manually on a cluster.

Support to config the shuffle cluster name prefix to use at Flink side

Motivation

  1. This enables a Flink job to select the specific shuffle cluster.
  2. If a cluster version is updated, the job can select the higher version with the same name prefix.

Changes

Support to config the shuffle cluster name prefix to use at Flink side.

Test

  • Unit test.
  • Test manually on a cluster.

SortBuffer supports reading data from the specified channel index

Motivation

SortBuffer can improve read performance significantly, but it doesn't support read data from a specific channel. The development of some new functions, for example, ReducePartition implementation, depends on the ability of SortBuffer to read data from the specified channel. This issue is to support the feature for SortBuffer.

Changes

SortBuffer supports reading data from the specified channel index.

Test

This change added tests.

Adapt to Flink 1.15

Motivation

As Flink 1.15 is going to release, the remote shuffle project should also adapt to 1.15.

Changes

Adapt to Flink 1.15 which means adapting to some new changes in Flink 1.15, including adaptive execution and other API changes.

Test

  • Test manually on a cluster.

Provide a simple rest endpoint for both metric/state query and dynamic configuration

Motivation

Currently, the metric query is supported by the metric server and no other state query or dynamic configuration is supported. A unified rest endpoint can be implemented to take care of all these requirements.

Changes

There are many existing rest framework or web service, for example, Jersey and Jetty. We can just wrap these framework and supply some simple API like register handler and lifecycle management. Other component like ShuffleWorker and ShuffleManager can use the rest endpoint directly. In this step, metric query should be also supported and the current metric server can be removed.

Test

  • Unit test.
  • Test manually on a cluster.

Support to write shuffle data to local shuffle worker first

Motivation

If the network bandwidth is the bottleneck, write the shuffle data to local shuffle worker can help to save the network bandwidth which is better for performance and stability.

Changes

Add a new config option to support to write shuffle data to local shuffle worker first.

Test

  • Unit test.
  • Test manually on a cluster.

Support Kubernetes HA Services

I think it's important to support K8S high availability, and I don't see any progress in the community. The FSR-9 seems to be dead, so I expect it to be assigned to me. I will post a simple design later and hope it can be reviewed and done. I've already contacted at DingDingTalk

Introduce rest APIs for dynamic disk online/offline

Motivation

Based on the rest API, we can remove or add disk dynamically without restarting the cluster. For example, we can remove a bad disk or we can add more new disks.

Changes

New rest APIs need to be added together with the corresponding handler. In the handler, disks are removed from or added to the disk list. Both removing one disk from all ShuffleWorkers or removing one disk of a specific ShuffleWorker should be supported.

Test

  • Unit test.
  • Test manually on a cluster.

Ensure all resources will be released for result partition and input gate when closed

Motivation

The BufferPacker may release one buffer twice which may cause the following exception:

org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
        at org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at com.alibaba.flink.shuffle.plugin.transfer.BufferPacker.close(BufferPacker.java:130) ~[shuffle-plugin-1.1-SNAPSHOT.jar:?]
        at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleOutputGate.close(RemoteShuffleOutputGate.java:139) ~[shuffle-plugin-1.1-SNAPSHOT.jar:?]
        at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleResultPartition.close(RemoteShuffleResultPartition.java:331) ~[shuffle-plugin-1.1-SNAPSHOT.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.closeAllResultPartitions(Task.java:1016) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:997) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:885) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:874) [?:1.8.0_242]

This exception may future cause other resources not be released. This can be fixed by:

  1. Fix the duplicate recycle issue;
  2. Try to release all resources even when release one resource fails. (This is not done before because we expected that there should be no exception)

Changes

  1. Fix the duplicate recycle issue;
  2. Try to release all resources even when release one resource fails.

Test

  • Unit test.
  • Test manually on a cluster.

Enable reading buffer floating for better fairness

Motivation

Currently, the reading buffers are allocated for a data partition when the first reader connects and are recycled when all readers finish. This strategy may cause starvation if some of the consumer tasks process very slowly and occupy reading buffers for too long, in which scenario, other data partitions may can not allocate enough buffers. We can solve this problem by recycling reading buffers before all readers finish and then allocate those buffers back. If there are other data partitions waiting for reading buffers, they can be fulfilled by the recycled buffers.

Changes

Enable reading buffer floating by recycling earlier before all readers finish.

Test

  • Unit test.
  • Test manually on a cluster.

Provide a simple web UI

Motivation

Web UI is a simple way to improve usability. It can offer some system state and information, like metrics, ShuffleManager/ShuffleWorker information and so on. It can also supply some useful tools for shuffle cluster management, for example, removing/adding workers and so on.

Changes

Add some simple web pages to show ShuffleManager/ShuffleWorker information and to manage shuffle cluster.

Test

  • Test manually on a cluster.

Job can switch to new shuffle cluster only when the old cluster is unavailable

Motivation

Currently, a running job will switch to the new shuffle cluster if the new one has higher version. However, this may cause the shuffle data of one job is managed by multiple clusters which is complicated and easy to cause problem. By only switching to new cluster when the old cluster is unavailable. We can simplify the logic and further support upgrading without influence the running job if the old cluster is kept until all running jobs finish.

Changes

Job can switch to new shuffle cluster only when the old cluster is unavailable.

Test

  • Unit test.
  • Test manually on a cluster.

Fix the bug that BaseMapPartition may not read data sequentially

When reading data from MapPartition files, It is common that some subpartitions are requested before others and their region indexes are ahead of others. If all region data of a subpartition can be read in one round, some subpartition readers will always ahead of others which will cause random IO. This patch fixes this case by polling one subpartition reader at a time.

Explicitly exit ShuffleWorker process when terminate future finished

In our usage, we encounter a case where the shuffle worker registers timeout and triggers a fatal error, but the shuffle worker process does not exit and this leads to no new worker being spawned to replace the current one .

The reason behind this is that the shuffle worker will execute closeAsync and shutdown all the component services. Obviously, the process will exit after all the non-daemon threads exit. But our metric client start extra thread not close rightly which cause this problem, this should fix by close these threads in the reporter#close method.

But I still think we should improve the shutdown logic a bit. We could explicitly exit the shuffle worker when the termination future completed. So that it will be safe for any situation when there are threads that can not be freed timely.

Incompatibility with the new version of Flink

The compile process will be failed with the new version of Flink because the following issue.

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  02:53 min
[INFO] Finished at: 2022-11-25T22:00:09+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project shuffle-plugin: Compilation failure
[ERROR] /home/github/flink-remote-shuffle/shuffle-plugin/src/main/java/com/alibaba/flink/shuffle/plugin/transfer/RemoteShuffleInputGate.java:[731,38] incompatible types: java.net.InetSocketAddress cannot be converted to org.apache.flink.runtime.taskmanager.TaskManagerLocation
[ERROR]
[ERROR] -> [Help 1]

The failed reason is that FLINK ConnectionID construction method is changed because of FLINK-29639.

Load balance improvement

Motivation

Currently, the load balance strategy is pretty simple, DataPartitions are evenly distributed to all ShuffleWorkers. A better way is to allocate shuffle resources based on the actual capacity of ShuffleWorker.

Changes

Allocate shuffle resources based on the actual capacity of ShuffleWorker. Each ShuffleWorker can also select disk based on disk throughput.

Test

  • Unit test.
  • Performance test.
  • Test manually on a cluster.

Add more information about the ShuffleWorker in the ShuffleManagerClient

By default, the shuffleManagerClient will print the log as below. But the worker Instance id is not human readable. It can not quick link to the target ShuffleWorker. We need to check the ShuffleManager.log first and then find the target problem worker's ip

2022-05-17 17:10:28,860 INFO  [shuffle-client-JobID{ID=F3918B137043C8D49EBDDBF68CD9D923}] com.alibaba.flink.shuffle.client.ShuffleManagerClientImpl    [] - Got unrelated shuffle worker: InstanceID{ID=31312E3132342E3137372E3136303A33353239392D43314436303544373745364330423843}
2022-05-17 17:10:28,860 INFO  [shuffle-client-JobID{ID=F3918B137043C8D49EBDDBF68CD9D923}] com.alibaba.flink.shuffle.client.ShuffleManagerClientImpl    [] - Got newly related shuffle worker: InstanceID{ID=31312E3132342E3136312E3232393A34353932352D43393734413030463338383239303638}

Optimize the round-robin load balance strategy and use it as the default

Motivation

  1. Currently, the computation complexity of round-robin partition placement strategy is O(n), we can optimize it to O(1).
  2. Currently, the default partition placement strategy is random whose load balance effect is not stable sometimes, especially when the number of data partitions is small.

Changes

To achieve the goal, we can make PartitionPlacementStrategy aware of the shuffle worker list and do the selection accordingly.

Test

  • Unit test.
  • Test manually on a cluster.

Fix the possible heartbeat issue caused by Zookeeper restart

Motivation

Currently, the shuffle manager may fail to remove a lost shuffle worker if the Zookeeper restart which will cause the change of RPC main thread executor.

Changes

Fix the possible heartbeat issue caused by Zookeeper restart and add test case to cover the scenario.

Test

  • Test manually on a cluster.
  • E2E test case.

Support to use StatefulSet + Pvc to deploy the shuffle service

In the current kubernetes mode, the rss use the hostPath or the emptyDir to store the shuffle data. But, In our company, the local disk resource is very rare and can not meet the storage requirements and It's not the complete storage computing separation architecture I think. I proposal to support to deploy the rss in the way of StatefulSet + PVC , So that we can leverage the cloud storage to store the shuffle data. what do you think of this ? cc @wsry

Failed to start ShuffleWorker

Error Message

2022-03-01 16:46:33,521 WARN [main-SendThread(10.X.X.X:2181)] org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Session 0x0 for server 10.130.13.114/<unresolved>:2181, unexpected error, closing socket connection and attempting reconnect java.lang.IllegalArgumentException: Unable to canonicalize address 10.130.13.114/<unresolved>:2181 because it's not resolvable at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.SaslServerPrincipal.getServerPrincipal(SaslServerPrincipal.java:65) ~[shuffle-dist-1.0.0.jar:?] at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.SaslServerPrincipal.getServerPrincipal(SaslServerPrincipal.java:41) ~[shuffle-dist-1.0.0.jar:?] at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1001) ~[shuffle-dist-1.0.0.jar:?] at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1060) [shuffle-dist-1.0.0.jar:?]

JDK

openjdk 15.0.1 2020-10-20 OpenJDK Runtime Environment (build 15.0.1+9-18) OpenJDK 64-Bit Server VM (build 15.0.1+9-18, mixed mode, sharing)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.