Flink1.15源码解析–任务提交流程—-flink run

文章目录

零、前言

任务提交方式:运行命令行flink脚本
使用flink脚本提交任务示例:

flink run ...

从 flink 脚本可以看到 org.apache.flink.client.cli.CliFrontend入口类

Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"" org.apache.flink.client.cli.CliFrontend "$@"

主要功能是 接收并解析命令行传入的命令,调用相应工具类执行命令
提供以下actions:

  • run:编译并运行程序
  • cancel:取消正在运行的程序(官方不推荐使用该方式)
  • stop:使用保存点停止正在运行的程序(仅用于流作业)
  • savepoint:触发正在运行的作业的保存点或处置现有的保存点
  • info:显示程序执行计划(JSON)
  • list:列出正在运行和计划的程序

一、CliFrontend

接下来我们看 mian() 如何执行 run 流程

  • 1、获取flink的conf目录的路径
  • 2、根据conf路径,加载配置
  • 3、封装命令行接口:按顺序Generic、Yarn、Default

    public static void main(final String[] args) {

        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        int retCode = 31;
        try {

            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

            SecurityUtils.install(new SecurityConfiguration(cli.configuration));

            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
        } catch (Throwable t) {
        }finally {
            System.exit(retCode);
        }
    }

详细流程分析如下

1.1、打印基本的环境信息

EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

logEnvironmentInfo 具体实现:


    public static void logEnvironmentInfo(Logger log, String componentName, String[] commandLineArgs) {
        if (log.isInfoEnabled()) {

            RevisionInformation rev = getRevisionInformation();

            String version = getVersion();

            String jvmVersion = getJvmVersion();

            String[] options = getJvmStartupOptionsArray();

            String javaHome = System.getenv("JAVA_HOME");

            long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20;

            log.info("--------------------------------------------------------------------------------");
            log.info(" Starting " + componentName + " (Version: " + version + ", "
                    + "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
            log.info(" OS current user: " + System.getProperty("user.name"));
            log.info(" Current Hadoop/Kerberos user: " + getHadoopUser());
            log.info(" JVM: " + jvmVersion);
            log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
            log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome));

            String hadoopVersionString = getHadoopVersionString();
            if (hadoopVersionString != null) {
                log.info(" Hadoop version: " + hadoopVersionString);
            } else {
                log.info(" No Hadoop Dependency available");
            }

            if (options.length == 0) {
                log.info(" JVM Options: (none)");
            }
            else {
                log.info(" JVM Options:");
                for (String s: options) {
                    log.info("    " + s);
                }
            }

            if (commandLineArgs == null || commandLineArgs.length == 0) {
                log.info(" Program Arguments: (none)");
            }
            else {
                log.info(" Program Arguments:");
                for (String s: commandLineArgs) {
                    log.info("    " + s);
                }
            }

            log.info(" Classpath: " + System.getProperty("java.class.path"));

            log.info("--------------------------------------------------------------------------------");
        }
    }

1.2、获取 flink 配置文件目录


        final String configurationDirectory = getConfigurationDirectoryFromEnv();

根据环境变量 FLINK_CONF_DIR 获取 flink 配置文件目录

1.3、加载 flink 配置文件解析成 Configuration 对象

调用 GlobalConfiguration 的 loadConfiguration 方法加载 flink 配置文件 flink-conf.yaml 中的配置,解析后转成 Configuration 对象


        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

loadConfiguration 具体实现:


  public static Configuration loadConfiguration(
      final String configDir, @Nullable final Configuration dynamicProperties) {

    if (configDir == null) {
      throw new IllegalArgumentException(
          "Given configuration directory is null, cannot load configuration");
    }

    final File confDirFile = new File(configDir);
    if (!(confDirFile.exists())) {
      throw new IllegalConfigurationException(
          "The given configuration directory name '"
              + configDir
              + "' ("
              + confDirFile.getAbsolutePath()
              + ") does not describe an existing directory.");
    }

    final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);

    if (!yamlConfigFile.exists()) {
      throw new IllegalConfigurationException(
          "The Flink config file '"
              + yamlConfigFile
              + "' ("
              + confDirFile.getAbsolutePath()
              + ") does not exist.");
    }

    Configuration configuration = loadYAMLResource(yamlConfigFile);

    if (dynamicProperties != null) {
      configuration.addAll(dynamicProperties);
    }

    return enrichWithEnvironmentVariables(configuration);
  }

1.4、加载自定义命令行(CustomCommandLine)

调用loadCustomCommandLines方法,加载 自定义命令行(CustomCommandLine)


        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

调用 loadCustomCommandLines 方法,加载自定义命令行

  • 1、创建一个 GenericCLI
    *
  • 通过反射 添加yarn模式命令行
  • 添加异常时, 添加 FallbackYarnSessionCli
  • 3、添加 DefaultCLI
    public static List<CustomCommandLine> loadCustomCommandLines(
            Configuration configuration, String configurationDirectory) {
        List<CustomCommandLine> customCommandLines = new ArrayList<>();

        customCommandLines.add(new GenericCLI(configuration, configurationDirectory));

        final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
        try {

            customCommandLines.add(
                    loadCustomCommandLine(
                            flinkYarnSessionCLI,
                            configuration,
                            configurationDirectory,
                            "y",
                            "yarn"));
        } catch (NoClassDefFoundError | Exception e) {
            final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
            try {
                LOG.info("Loading FallbackYarnSessionCli");

                customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
            } catch (Exception exception) {
                LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
            }
        }

        customCommandLines.add(new DefaultCLI());

        return customCommandLines;
    }

类图关系如下:

Flink1.15源码解析--任务提交流程----flink run

后面章节讲解获取活跃状态的命令行客户端,就是下面封装的GenericCLI、FlinkYarnSessionCli、DefaultCLI 三个客户端。按顺序判断那个是活跃,谁活跃就使用谁,然后跳出判断,返回结果。下面介绍其判断逻辑。

  • GenericCLI:存在execution.target、-e 、–executor、-t、–target这几个配置或参数,且值不为null,则使用GenericCLI。
  • FlinkYarnSessionCli:-m –jobmanager的值等于yarn-cluster 或 参数中传入的yarn applicationId值存在 或 execution.target的值为yarn-session或yarn-pre-job
  • DefaultCLI:默认返回true,standalone模式使用

1.4.1、创建一个 GenericCLI


        customCommandLines.add(new GenericCLI(configuration, configurationDirectory));

1.4.2、 通过反射 添加yarn模式命令行

Flink1.15源码解析--任务提交流程----flink run
通过反射构建 yarn 命令行

private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {

    Class<? extends CustomCommandLine> customCliClass =
        Class.forName(className).asSubclass(CustomCommandLine.class);

    Class<?>[] types = new Class<?>[params.length];
    for (int i = 0; i < params.length; i++) {
        Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
        types[i] = params[i].getClass();
    }

    Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);

    return constructor.newInstance(params);
}


    public FlinkYarnSessionCli(
            Configuration configuration,
            ClusterClientServiceLoader clusterClientServiceLoader,
            String configurationDirectory,
            String shortPrefix,
            String longPrefix,
            boolean acceptInteractiveInput)
            throws FlinkException {

        super(configuration, shortPrefix, longPrefix);
        this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
        this.configurationDirectory = checkNotNull(configurationDirectory);
        this.acceptInteractiveInput = acceptInteractiveInput;

        query =
                new Option(
                        shortPrefix + "q",
                        longPrefix + "query",
                        false,
                        "Display available YARN resources (memory, cores)");
        queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
        shipPath =
                new Option(
                        shortPrefix + "t",
                        longPrefix + "ship",
                        true,
                        "Ship files in the specified directory (t for transfer)");
        flinkJar =
                new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
        jmMemory =
                new Option(
                        shortPrefix + "jm",
                        longPrefix + "jobManagerMemory",
                        true,
                        "Memory for JobManager Container with optional unit (default: MB)");
        tmMemory =
                new Option(
                        shortPrefix + "tm",
                        longPrefix + "taskManagerMemory",
                        true,
                        "Memory per TaskManager Container with optional unit (default: MB)");
        slots =
                new Option(
                        shortPrefix + "s",
                        longPrefix + "slots",
                        true,
                        "Number of slots per TaskManager");
        dynamicproperties =
                Option.builder(shortPrefix + "D")
                        .argName("property=value")
                        .numberOfArgs(2)
                        .valueSeparator()
                        .desc("use value for given property")
                        .build();
        name =
                new Option(
                        shortPrefix + "nm",
                        longPrefix + "name",
                        true,
                        "Set a custom name for the application on YARN");
        applicationType =
                new Option(
                        shortPrefix + "at",
                        longPrefix + "applicationType",
                        true,
                        "Set a custom application type for the application on YARN");
        zookeeperNamespace =
                new Option(
                        shortPrefix + "z",
                        longPrefix + "zookeeperNamespace",
                        true,
                        "Namespace to create the Zookeeper sub-paths for high availability mode");
        nodeLabel =
                new Option(
                        shortPrefix + "nl",
                        longPrefix + "nodeLabel",
                        true,
                        "Specify YARN node label for the YARN application");
        help =
                new Option(
                        shortPrefix + "h",
                        longPrefix + "help",
                        false,
                        "Help for the Yarn session CLI.");

        allOptions = new Options();
        allOptions.addOption(flinkJar);
        allOptions.addOption(jmMemory);
        allOptions.addOption(tmMemory);
        allOptions.addOption(queue);
        allOptions.addOption(query);
        allOptions.addOption(shipPath);
        allOptions.addOption(slots);
        allOptions.addOption(dynamicproperties);
        allOptions.addOption(DETACHED_OPTION);
        allOptions.addOption(YARN_DETACHED_OPTION);
        allOptions.addOption(name);
        allOptions.addOption(applicationId);
        allOptions.addOption(applicationType);
        allOptions.addOption(zookeeperNamespace);
        allOptions.addOption(nodeLabel);
        allOptions.addOption(help);

        this.yarnPropertiesFileLocation =
                configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
        final File yarnPropertiesLocation = getYarnPropertiesLocation(yarnPropertiesFileLocation);

        yarnPropertiesFile = new Properties();

        if (yarnPropertiesLocation.exists()) {
            LOG.info(
                    "Found Yarn properties file under {}.",
                    yarnPropertiesLocation.getAbsolutePath());

            try (InputStream is = new FileInputStream(yarnPropertiesLocation)) {
                yarnPropertiesFile.load(is);
            } catch (IOException ioe) {
                throw new FlinkException(
                        "Could not read the Yarn properties file "
                                + yarnPropertiesLocation
                                + ". Please delete the file at "
                                + yarnPropertiesLocation.getAbsolutePath()
                                + '.',
                        ioe);
            }

            final String yarnApplicationIdString =
                    yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);

            if (yarnApplicationIdString == null) {
                throw new FlinkException(
                        "Yarn properties file found but doesn't contain a "
                                + "Yarn application id. Please delete the file at "
                                + yarnPropertiesLocation.getAbsolutePath());
            }

            try {

                yarnApplicationIdFromYarnProperties =
                        ConverterUtils.toApplicationId(yarnApplicationIdString);
            } catch (Exception e) {
                throw new FlinkException(
                        "YARN properties contain an invalid entry for "
                                + "application id: "
                                + yarnApplicationIdString
                                + ". Please delete the file at "
                                + yarnPropertiesLocation.getAbsolutePath(),
                        e);
            }
        } else {
            yarnApplicationIdFromYarnProperties = null;
        }
    }

1.4.2.1、添加异常时, 添加 FallbackYarnSessionCli

            final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
            try {
                LOG.info("Loading FallbackYarnSessionCli");

                customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
            } catch (Exception exception) {
                LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
            }

1.4.3、添加 DefaultCLI

默认命令行, standalone模式使用

1.5、初始化 CliFrontend 对象

    public CliFrontend(
            Configuration configuration,
            ClusterClientServiceLoader clusterClientServiceLoader,
            List<CustomCommandLine> customCommandLines) {

        this.configuration = checkNotNull(configuration);
        this.customCommandLines = checkNotNull(customCommandLines);
        this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);

        FileSystem.initialize(
                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

        this.customCommandLineOptions = new Options();

        for (CustomCommandLine customCommandLine : customCommandLines) {
            customCommandLine.addGeneralOptions(customCommandLineOptions);
            customCommandLine.addRunOptions(customCommandLineOptions);
        }

        this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);

        this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
    }

1.6、通过 SPI 加载安全配置模块

更详细的内容见 Flink1.15源码解析–安全模块及安全上下文

 SecurityUtils.install(new SecurityConfiguration(cli.configuration));

1.7、根据命令行参数执行对应的action、回调,并返回状态码

这块是主要逻辑

retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));

可能需要具有的安全上下文才能运行可调用的.


public interface SecurityContext {

    <T> T runSecured(Callable<T> securedCallable) throws Exception;
}

1.7.1、具体执行逻辑是 CliFrontend.parseAndRun(args)


    public int parseAndRun(String[] args) {

        if (args.length < 1) {
            CliFrontendParser.printHelp(customCommandLines);
            System.out.println("Please specify an action.");
            return 1;
        }

        String action = args[0];

        final String[] params = Arrays.copyOfRange(args, 1, args.length);

        try {

            switch (action) {
                case ACTION_RUN:
                    run(params);
                    return 0;
                case ACTION_RUN_APPLICATION:
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    list(params);
                    return 0;
                case ACTION_INFO:
                    info(params);
                    return 0;
                case ACTION_CANCEL:
                    cancel(params);
                    return 0;
                case ACTION_STOP:
                    stop(params);
                    return 0;
                case ACTION_SAVEPOINT:
                    savepoint(params);
                    return 0;
                case "-h":
                case "--help":
                    CliFrontendParser.printHelp(customCommandLines);
                    return 0;
                case "-v":
                case "--version":
                    String version = EnvironmentInformation.getVersion();
                    String commitID = EnvironmentInformation.getRevisionInformation().commitId;
                    System.out.print("Version: " + version);
                    System.out.println(
                            commitID.equals(EnvironmentInformation.UNKNOWN)
                                    ? ""
                                    : ", Commit ID: " + commitID);
                    return 0;
                default:
                    System.out.printf("\"%s\" is not a valid action.\n", action);
                    System.out.println();
                    System.out.println(
                            "Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
                    System.out.println();
                    System.out.println(
                            "Specify the version option (-v or --version) to print Flink version.");
                    System.out.println();
                    System.out.println(
                            "Specify the help option (-h or --help) to get help on the command.");
                    return 1;
            }
        } catch (CliArgsException ce) {
            return handleArgException(ce);
        } catch (ProgramParametrizationException ppe) {
            return handleParametrizationException(ppe);
        } catch (ProgramMissingJobException pmje) {
            return handleMissingJobException();
        } catch (Exception e) {
            return handleError(e);
        }
    }

1.7.2、执行 run 操作


    protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRun(customCommandLines);
            return;
        }

        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));

        final ProgramOptions programOptions = ProgramOptions.create(commandLine);

        final List<URL> jobJars = getJobJarAndDependencies(programOptions);

        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            executeProgram(effectiveConfiguration, program);
        }
    }

1.7.2.1、验证并获取一个活跃的 自定义命令行

1.4 初始化的CustomCommandLine 列表中获取活跃的


    public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
        LOG.debug("Custom commandlines: {}", customCommandLines);
        for (CustomCommandLine cli : customCommandLines) {
            LOG.debug(
                    "Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
            if (cli.isActive(commandLine)) {
                return cli;
            }
        }
        throw new IllegalStateException("No valid command-line found.");
    }

1.7.2.2、创建一个 ProgramOptions 区分 python or jar

create

    public static ProgramOptions create(CommandLine line) throws CliArgsException {
        if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {

            return createPythonProgramOptions(line);
        } else {

            return new ProgramOptions(line);
        }
    }

1.7.2.3、获取 jar和其他依赖


    private List<URL> getJobJarAndDependencies(ProgramOptions programOptions)
            throws CliArgsException {

        String entryPointClass = programOptions.getEntryPointClassName();

        String jarFilePath = programOptions.getJarFilePath();

        try {
            File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
            return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
        } catch (FileNotFoundException | ProgramInvocationException e) {
            throw new CliArgsException(
                    "Could not get job jar and dependencies from JAR file: " + e.getMessage(), e);
        }
    }

1.7.2.4、 获取有效配置

HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数…

1.7.2.5、执行程序


        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            executeProgram(effectiveConfiguration, program);
        }

调用的是 ClientUtils.executeProgram

    protected void executeProgram(final Configuration configuration, final PackagedProgram program)
            throws ProgramInvocationException {
        ClientUtils.executeProgram(
                new DefaultExecutorServiceLoader(), configuration, program, false, false);
    }

1.8、获取执行返回状态码,关闭提交程序

Original: https://blog.csdn.net/wuxintdrh/article/details/127810949
Author: 宝哥大数据
Title: Flink1.15源码解析–任务提交流程—-flink run

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/655998/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球