Giter VIP home page Giter VIP logo

flink-spark-submiter's Introduction

Flink任务、Spark任务提交到集群,通常需要将可执行Jar上传到集群,手动执行任务提交指令,如果有配套的大数据平台则需要上传Jar,由调度系统进行任务提交。 对开发者来说,本地IDEA调试Flink、Spark任务不涉及对象的序列化及反序列化,任务在本地调试通过后,执行在分布式环境下也可能会出错。 而将任务提交到集群进行调试还要走那些繁琐的流程太影响效率了。

因此,为方便大数据开发人员进行快速开发调试,开发了从本地IDEA提交Flink/Spark任务到集群的工具类。任务提交代码稍加改造后也可以和上层调度系统进行集成,替代脚本模式进行任务提交的方式。

  • 支持Flink yarnPerJob模式下的任务提交。

  • 支持Spark任务以Yarn Cluster模式提交到YARN,支持自动上传用户Jar包,依赖的Spark Jars需要提前上传到HDFS。

  • 支持Spark任务提交到K8s Cluster,执行的jar需要包含在镜像中,任务执行时需要传递镜像名称及可执行文件路径。如果需要操作hive表,则需要传递集群所在文件夹,以及HADOOP_USER_NAME,系统进行Hadoop文件的挂载及环境变量的设置。

任务类型提交入口类:

Spark-Yarn-Submit-Client

Spark-K8s-Submit-Client

Flink-Yarn-Submit-Client

Flink 多执行模式任务提交

  • 需要填写Flink任务运行时参数配置,任务运行所在的集群配置路径,本地Flink根路径。项目依赖flink1.10版本。
  • 支持以YarnSession、YarnPerjob、Standalone模式进行任务提交,返回ApplicationId。
  • example模块下包含一个FlinkDemo,打包后会转移到项目的examplJars中,可以尝试进行任务提交。
  • 任务提交后,根据ApplicationId获取任务执行使用的jm、tm日志基本信息,包含日志访问URL,日志总字节大小,根据日志基本信息可以做日志滚动展示,防止Yarn日志过大导致日志读取卡死。
  • 提供任务取消、任务状态获取、已完成任务日志获取接口。

V2版本增加flink-yarn-submiter-service,用来做多版本的隔离。新版本任务执行的入口类为 ClusterClient。

任务操作示例:

   public static JobParamsInfo buildJobParamsInfo() {

       //        System.setProperty("java.security.krb5.conf", "/Users/maqi/tmp/hadoopconf/cdh514/krb5.conf");
       // 可执行jar包路径
       String runJarPath = "/Users/maqi/code/ClustersSubmiter/exampleJars/flink-kafka-reader/flink-kafka-reader.jar";
       // 任务参数
       String[] execArgs = new String[]{"-jobName", "flink110Submit", "--topic", "mqTest01", "--bootstrapServers", "172.16.8.107:9092"};
       // 任务名称
       String jobName = "Flink perjob submit";
       // flink 文件夹路径
       String flinkConfDir = "/Users/maqi/tmp/flink/flink-1.10.0/conf";
       // flink lib包路径
       String flinkJarPath = "/Users/maqi/tmp/flink/flink-1.10.0/lib";
       //  yarn 文件夹路径
       String yarnConfDir = "/Users/maqi/tmp/hadoopconf/195";
       // perjob 运行流任务
       String runMode = "yarn_perjob";
       //  作业依赖的外部文件
       String[] dependFile = new String[]{"/Users/maqi/tmp/flink/flink-1.10.0/README.txt"};
       // 任务提交队列
       String queue = "c";
       // yarnsession appid配置
       Properties yarnSessionConfProperties = new Properties();
       yarnSessionConfProperties.setProperty("yid", "application_1594265598097_5425");

       // savepoint 及并行度相关
       Properties confProperties = new Properties();
       confProperties.setProperty("parallelism", "1");


       JobParamsInfo jobParamsInfo = JobParamsInfo.builder()
               .setExecArgs(execArgs)
               .setName(jobName)
               .setRunJarPath(runJarPath)
               .setDependFile(dependFile)
               .setFlinkConfDir(flinkConfDir)
               .setYarnConfDir(yarnConfDir)
               .setConfProperties(confProperties)
               .setYarnSessionConfProperties(yarnSessionConfProperties)
               .setFlinkJarPath(flinkJarPath)
               .setQueue(queue)
               .setRunMode(runMode)
               .build();

       return jobParamsInfo;
   }


   public static void main(String[] args) throws Exception {
       JobParamsInfo jobParamsInfo = buildJobParamsInfo();
       // job submit 
       Optional<Pair<String, String>> appIdAndJobId = submitFlinkJob(jobParamsInfo);

       // running log info
       appIdAndJobId.ifPresent((pair) -> printRollingLogBaseInfo(jobParamsInfo, pair));

       // cancel job
       Pair<String, String> job = new Pair<>("application_1594265598097_2688", "35a679c9f94311a8a8084e4d8d06a95d");
       cancelFlinkJob(jobParamsInfo, job);

       // flink job status
       ETaskStatus jobStatus = getJobStatus(jobParamsInfo, new Pair<>("application_1594265598097_5425", "fa4ae50441c5d5363e8abbe5623e115a"));
       System.out.println("job status is : " + jobStatus.toString());

       // print finished Log
       printFinishedLog(jobParamsInfo,"application_1594961717891_0103");
   }

jobmanager日志格式:

{
   "logs":[
       {
           "name":"jobmanager.err ",
           "totalBytes":"555",
           "url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.err/"
       },
       {
           "name":"jobmanager.log ",
           "totalBytes":"31944",
           "url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.log/"
       },
       {
           "name":"jobmanager.out ",
           "totalBytes":"0",
           "url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.out/"
       }
   ],
   "typeName":"jobmanager"
}

taskmanager日志格式:

{
    "logs":[
        {
            "name":"taskmanager.err ",
            "totalBytes":"560",
            "url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.err/"
        },
        {
            "name":"taskmanager.log ",
            "totalBytes":"35937",
            "url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.log/"
        },
        {
            "name":"taskmanager.out ",
            "totalBytes":"0",
            "url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.out/"
        }
    ],
    "otherInfo":"{"dataPort":36218,"freeSlots":0,"hardware":{"cpuCores":4,"freeMemory":241172480,"managedMemory":308700779,"physicalMemory":8201641984},"id":"container_e27_1593571725037_0170_01_000002","path":"akka.tcp://flink@node03:36791/user/taskmanager_0","slotsNumber":1,"timeSinceLastHeartbeat":1593659561129}",
    "typeName":"taskmanager"
}

Spark on yarn 任务提交

  • 填写用户程序包路径、执行参数、集群配置文件夹、安全认证等相关配置。
  • Spark任务提交使用Yarn cluster模式,使用的Spark Jar需要提前上传到HDFS并作为archive的参数。
  • 针对SparkSQL任务,通过提交examples中的spark-sql-proxy程序包来直接操作hive表。

提交示例:

public static void main(String[] args) throws Exception {
        boolean openKerberos = true;
        String appName = "todd spark submit";
        String runJarPath = "/Users/maqi/code/ClustersSubmiter/exampleJars/spark-sql-proxy/spark-sql-proxy.jar";
        String mainClass = "cn.todd.spark.SparksqlProxy";
        String yarnConfDir = "/Users/maqi/tmp/hadoopconf";
        String principal = "hdfs/[email protected]";
        String keyTab = "/Users/maqi/tmp/hadoopconf/hdfs.keytab";
        String jarHdfsDir = "sparkproxy2";
        String archive = "hdfs://nameservice1/sparkjars/jars";
        String queue = "root.users.hdfs";
        String execArgs = getExampleJobParams();

        Properties confProperties = new Properties();
        confProperties.setProperty("spark.executor.cores","2");

        JobParamsInfo jobParamsInfo = JobParamsInfo.builder()
                .setAppName(appName)
                .setRunJarPath(runJarPath)
                .setMainClass(mainClass)
                .setYarnConfDir(yarnConfDir)
                .setPrincipal(principal)
                .setKeytab(keyTab)
                .setJarHdfsDir(jarHdfsDir)
                .setArchivePath(archive)
                .setQueue(queue)
                .setExecArgs(execArgs)
                .setConfProperties(confProperties)
                .setOpenKerberos(BooleanUtils.toString(openKerberos, "true", "false"))
                .build();

        YarnConfiguration yarnConf = YarnConfLoaderUtil.getYarnConf(yarnConfDir);
        String applicationId = "";

        if (BooleanUtils.toBoolean(openKerberos)) {
            UserGroupInformation.setConfiguration(yarnConf);
            UserGroupInformation userGroupInformation = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
            applicationId = userGroupInformation.doAs((PrivilegedExceptionAction<String>) () -> LauncherMain.run(jobParamsInfo, yarnConf));
        } else {
            LauncherMain.run(jobParamsInfo, yarnConf);
        }

        System.out.println(applicationId);
    }

Spark on k8s 任务提交

  • 基于Spark2.4.4进行开发,通过将spark-sql-proxy.jar包打入镜像来执行Sparksql并操作Hive表,无其他特殊操作。
  • 操作Hive时需要传递hadoopConfDir,程序会自动将.xml文件内容进行挂载,如果非root用户操作Hive,需要设置HADOOP_USER_NAME。
  • 通过读取kubeConfig配置文件进行Kuberclient的创建,而非官方提供的master url方式。
  • 任务提交后立即返回spark-app-selector id,从而进行POD状态获取。
  public static void main(String[] args) throws Exception {
      String appName = "todd spark submit";
      // 镜像内的jar路径
      String runJarPath = "local:///opt/dtstack/spark/spark-sql-proxy.jar";
      String mainClass = "cn.todd.spark.SparksqlProxy";
      String hadoopConfDir = "/Users/maqi/tmp/hadoopconf/";
      String kubeConfig = "/Users/maqi/tmp/conf/k8s.config";
      String imageName = "mqspark:2.4.4";
      String execArgs = getExampleJobParams();

      Properties confProperties = new Properties();
      confProperties.setProperty("spark.executor.instances", "2");
      confProperties.setProperty("spark.kubernetes.namespace", "default");
      confProperties.setProperty("spark.kubernetes.authenticate.driver.serviceAccountName", "spark");
      confProperties.setProperty("spark.kubernetes.container.image.pullPolicy", "IfNotPresent");


      JobParamsInfo jobParamsInfo = JobParamsInfo.builder()
              .setAppName(appName)
              .setRunJarPath(runJarPath)
              .setMainClass(mainClass)
              .setExecArgs(execArgs)
              .setConfProperties(confProperties)
              .setHadoopConfDir(hadoopConfDir)
              .setKubeConfig(kubeConfig)
              .setImageName(imageName)
              .build();

      String id = run(jobParamsInfo);
      System.out.println(id);
  }

flink-spark-submiter's People

Contributors

dependabot[bot] avatar todd5167 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

flink-spark-submiter's Issues

求助:打包运行报错-buildJobGraph

hey,todd5176:
我现在的runJar和hadoop,flink都在同一台机器A上,--bootstrapServer配置的是3台kafka机器B,C,D,然后在A上运行java -cp xxx.jar mainClass会报错(报错信息如下),能请教一下如何解决么?

......
INFO [main] - Loading configuration property: jobmanager.execution.failover-strategy, region
INFO [main] - Loading configuration property: rest.bind-port, 50100-50200
INFO [main] - Loading configuration property: io.tmp.dirs, /tmp
INFO [main] - Loading configuration property: historyserver.web.address, dataMiddle-93
INFO [main] - Loading configuration property: historyserver.archive.fs.dir, hdfs://master/flink/ha/completed-jobs/
INFO [main] - Loading configuration property: resourcemanager.taskmanager-timeout, 900000
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
Classpath: [file:/opt/runJar/flink-job-submit-test0105.jar]
System.out: (none)
System.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at com.yss.datamiddle.flinkYarnSubmiter.utils.JobGraphBuildUtil.buildJobGraph(JobGraphBuildUtil.java:75)
at com.yss.datamiddle.flinkYarnSubmiter.executor.YarnJobClusterExecutor.submit(YarnJobClusterExecutor.java:73)
at com.yss.datamiddle.flinkYarnSubmiter.launcher.LauncherMain.submitFlinkJob(LauncherMain.java:55)
at com.yss.datamiddle.flinkYarnSubmiter.launcher.LauncherMain.main(LauncherMain.java:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at com.yss.datamiddle.flinkYarnSubmiter.utils.JobGraphBuildUtil.buildJobGraph(JobGraphBuildUtil.java:75)
at com.yss.datamiddle.flinkYarnSubmiter.executor.YarnJobClusterExecutor.submit(YarnJobClusterExecutor.java:73)
at com.yss.datamiddle.flinkYarnSubmiter.launcher.LauncherMain.submitFlinkJob(LauncherMain.java:55)
at com.yss.datamiddle.flinkYarnSubmiter.launcher.LauncherMain.main(LauncherMain.java:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at com.yss.datamiddle.flinkYarnSubmiter.utils.JobGraphBuildUtil.buildJobGraph(JobGraphBuildUtil.java:75)
at com.yss.datamiddle.flinkYarnSubmiter.executor.YarnJobClusterExecutor.submit(YarnJobClusterExecutor.java:73)
at com.yss.datamiddle.flinkYarnSubmiter.launcher.LauncherMain.submitFlinkJob(LauncherMain.java:55)
at com.yss.datamiddle.flinkYarnSubmiter.launcher.LauncherMain.main(LauncherMain.java:188)
......

新手小白cry for help

todd5167,hello:
请教一下,flink-submiter项目和flink-java项目相互独立的是么?那flink-submiter项目还需打包么,直接在idea运行主类LauncherMain会找不到jar包,jar包所在机器和flink安装的机器和kafka集群(参数--bootstrapServers)都不一样,那buildJobParamsInfo()方法里面的参数runJarPath,flinkConfDir应该如何配置呢?还有我只需要flink-yarn-submiter的话,那examples和common目录是不是不需要了呢?

你好,构建出ParmasInfo后找不到提示ClassNotFound

"C:\Program Files\Java\jdk1.8.0_301\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2\lib\idea_rt.jar=23176:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_301\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\rt.jar;C:\pros\flink-submit\flink-spark-submiter-flink-1.12-v2\flink-yarn-submiter\target\classes;C:\pros\flink-submit\flink-spark-submiter-flink-1.12-v2\flink-yarn-submiter-service\target\classes;C:\mavenresp\com\google\guava\guava\19.0\guava-19.0.jar;C:\mavenresp\org\apache\kerby\kerb-core\2.0.1\kerb-core-2.0.1.jar;C:\mavenresp\org\apache\kerby\kerby-pkix\2.0.1\kerby-pkix-2.0.1.jar;C:\mavenresp\org\apache\kerby\kerby-asn1\2.0.1\kerby-asn1-2.0.1.jar;C:\mavenresp\org\apache\kerby\kerby-util\2.0.1\kerby-util-2.0.1.jar;C:\mavenresp\org\apache\kerby\kerb-util\2.0.1\kerb-util-2.0.1.jar;C:\mavenresp\org\apache\kerby\kerby-config\2.0.1\kerby-config-2.0.1.jar;C:\mavenresp\org\apache\kerby\kerb-crypto\2.0.1\kerb-crypto-2.0.1.jar;C:\mavenresp\com\alibaba\fastjson\1.2.76\fastjson-1.2.76.jar;C:\mavenresp\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar" cn.todd.flink.CliSubmitterMain
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at cn.todd.flink.CliSubmitterMain.main(CliSubmitterMain.java:54)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 1 more

How to support pyspark

你好,看了你的代码对于spark提交有了一些了解。但是我现在遇到一个问题就是无法很好的提交的pyspark应用会有奇奇怪怪的问题。想问一下作者有没有什么好的想法或者后续加入pyspark提交的支持呢,感谢

流任务可以正常运行,批任务程序中无法拿到结果

嗨,帅哥,请教个问题,我测的官网的自带的离线例子 wordcount,

1、flink run -class ... 命令行这样没问题,有结果输出。
2、用这个程序跑,任务提交了,看 ui 也执行了ui上有数据,但flink日志结果没输出。

我这边的场景是离线任务,一个程序含多个离线批作业A,B,C,先拿到A结果,然后做为参数传递给B,,,,实际拿不到A结果返回,

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.