文章目录
- 零、前言
- 一、CliFrontend
* - 1.1、打印基本的环境信息
- 1.2、获取 flink 配置文件目录
- 1.3、加载 flink 配置文件解析成 Configuration 对象
- 1.4、加载自定义命令行(CustomCommandLine)
– - 1.5、初始化 CliFrontend 对象
- 1.6、通过 SPI 加载安全配置模块
- 1.7、根据命令行参数执行对应的action、回调,并返回状态码
– - 1.8、获取执行返回状态码,关闭提交程序
- 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)
零、前言
任务提交方式:运行命令行flink脚本
使用flink脚本提交任务示例:
flink run ...
从 flink 脚本可以看到 org.apache.flink.client.cli.CliFrontend
是 入口类
Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"
" org.apache.flink.client.cli.CliFrontend "$@"
主要功能是 接收并解析命令行传入的命令,调用相应工具类执行命令
提供以下actions:
- run:编译并运行程序
- cancel:取消正在运行的程序(官方不推荐使用该方式)
- stop:使用保存点停止正在运行的程序(仅用于流作业)
- savepoint:触发正在运行的作业的保存点或处置现有的保存点
- info:显示程序执行计划(JSON)
- list:列出正在运行和计划的程序
一、CliFrontend
接下来我们看 mian()
如何执行 run 流程
- 1、获取flink的conf目录的路径
- 2、根据conf路径,加载配置
- 3、封装命令行接口:按顺序Generic、Yarn、Default
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
final String configurationDirectory = getConfigurationDirectoryFromEnv();
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
}finally {
System.exit(retCode);
}
}
详细流程分析如下
1.1、打印基本的环境信息
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
logEnvironmentInfo 具体实现:
public static void logEnvironmentInfo(Logger log, String componentName, String[] commandLineArgs) {
if (log.isInfoEnabled()) {
RevisionInformation rev = getRevisionInformation();
String version = getVersion();
String jvmVersion = getJvmVersion();
String[] options = getJvmStartupOptionsArray();
String javaHome = System.getenv("JAVA_HOME");
long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20;
log.info("--------------------------------------------------------------------------------");
log.info(" Starting " + componentName + " (Version: " + version + ", "
+ "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
log.info(" OS current user: " + System.getProperty("user.name"));
log.info(" Current Hadoop/Kerberos user: " + getHadoopUser());
log.info(" JVM: " + jvmVersion);
log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome));
String hadoopVersionString = getHadoopVersionString();
if (hadoopVersionString != null) {
log.info(" Hadoop version: " + hadoopVersionString);
} else {
log.info(" No Hadoop Dependency available");
}
if (options.length == 0) {
log.info(" JVM Options: (none)");
}
else {
log.info(" JVM Options:");
for (String s: options) {
log.info(" " + s);
}
}
if (commandLineArgs == null || commandLineArgs.length == 0) {
log.info(" Program Arguments: (none)");
}
else {
log.info(" Program Arguments:");
for (String s: commandLineArgs) {
log.info(" " + s);
}
}
log.info(" Classpath: " + System.getProperty("java.class.path"));
log.info("--------------------------------------------------------------------------------");
}
}
1.2、获取 flink 配置文件目录
final String configurationDirectory = getConfigurationDirectoryFromEnv();
根据环境变量 FLINK_CONF_DIR 获取 flink 配置文件目录
1.3、加载 flink 配置文件解析成 Configuration 对象
调用 GlobalConfiguration 的 loadConfiguration 方法加载 flink 配置文件 flink-conf.yaml 中的配置,解析后转成 Configuration 对象
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
loadConfiguration 具体实现:
public static Configuration loadConfiguration(
final String configDir, @Nullable final Configuration dynamicProperties) {
if (configDir == null) {
throw new IllegalArgumentException(
"Given configuration directory is null, cannot load configuration");
}
final File confDirFile = new File(configDir);
if (!(confDirFile.exists())) {
throw new IllegalConfigurationException(
"The given configuration directory name '"
+ configDir
+ "' ("
+ confDirFile.getAbsolutePath()
+ ") does not describe an existing directory.");
}
final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
if (!yamlConfigFile.exists()) {
throw new IllegalConfigurationException(
"The Flink config file '"
+ yamlConfigFile
+ "' ("
+ confDirFile.getAbsolutePath()
+ ") does not exist.");
}
Configuration configuration = loadYAMLResource(yamlConfigFile);
if (dynamicProperties != null) {
configuration.addAll(dynamicProperties);
}
return enrichWithEnvironmentVariables(configuration);
}
1.4、加载自定义命令行(CustomCommandLine)
调用loadCustomCommandLines方法,加载 自定义命令行(CustomCommandLine)
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
调用 loadCustomCommandLines 方法,加载自定义命令行
- 1、创建一个 GenericCLI
* - 通过反射 添加yarn模式命令行
- 添加异常时, 添加 FallbackYarnSessionCli
- 3、添加 DefaultCLI
public static List<CustomCommandLine> loadCustomCommandLines(
Configuration configuration, String configurationDirectory) {
List<CustomCommandLine> customCommandLines = new ArrayList<>();
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
customCommandLines.add(
loadCustomCommandLine(
flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}
customCommandLines.add(new DefaultCLI());
return customCommandLines;
}
类图关系如下:
后面章节讲解获取活跃状态的命令行客户端,就是下面封装的GenericCLI、FlinkYarnSessionCli、DefaultCLI 三个客户端。按顺序判断那个是活跃,谁活跃就使用谁,然后跳出判断,返回结果。下面介绍其判断逻辑。
- GenericCLI:存在execution.target、-e 、–executor、-t、–target这几个配置或参数,且值不为null,则使用GenericCLI。
- FlinkYarnSessionCli:-m –jobmanager的值等于yarn-cluster 或 参数中传入的yarn applicationId值存在 或 execution.target的值为yarn-session或yarn-pre-job
- DefaultCLI:默认返回true,standalone模式使用
1.4.1、创建一个 GenericCLI
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
1.4.2、 通过反射 添加yarn模式命令行
通过反射构建 yarn 命令行
private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
Class<? extends CustomCommandLine> customCliClass =
Class.forName(className).asSubclass(CustomCommandLine.class);
Class<?>[] types = new Class<?>[params.length];
for (int i = 0; i < params.length; i++) {
Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
types[i] = params[i].getClass();
}
Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
return constructor.newInstance(params);
}
public FlinkYarnSessionCli(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
String configurationDirectory,
String shortPrefix,
String longPrefix,
boolean acceptInteractiveInput)
throws FlinkException {
super(configuration, shortPrefix, longPrefix);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
this.configurationDirectory = checkNotNull(configurationDirectory);
this.acceptInteractiveInput = acceptInteractiveInput;
query =
new Option(
shortPrefix + "q",
longPrefix + "query",
false,
"Display available YARN resources (memory, cores)");
queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
shipPath =
new Option(
shortPrefix + "t",
longPrefix + "ship",
true,
"Ship files in the specified directory (t for transfer)");
flinkJar =
new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
jmMemory =
new Option(
shortPrefix + "jm",
longPrefix + "jobManagerMemory",
true,
"Memory for JobManager Container with optional unit (default: MB)");
tmMemory =
new Option(
shortPrefix + "tm",
longPrefix + "taskManagerMemory",
true,
"Memory per TaskManager Container with optional unit (default: MB)");
slots =
new Option(
shortPrefix + "s",
longPrefix + "slots",
true,
"Number of slots per TaskManager");
dynamicproperties =
Option.builder(shortPrefix + "D")
.argName("property=value")
.numberOfArgs(2)
.valueSeparator()
.desc("use value for given property")
.build();
name =
new Option(
shortPrefix + "nm",
longPrefix + "name",
true,
"Set a custom name for the application on YARN");
applicationType =
new Option(
shortPrefix + "at",
longPrefix + "applicationType",
true,
"Set a custom application type for the application on YARN");
zookeeperNamespace =
new Option(
shortPrefix + "z",
longPrefix + "zookeeperNamespace",
true,
"Namespace to create the Zookeeper sub-paths for high availability mode");
nodeLabel =
new Option(
shortPrefix + "nl",
longPrefix + "nodeLabel",
true,
"Specify YARN node label for the YARN application");
help =
new Option(
shortPrefix + "h",
longPrefix + "help",
false,
"Help for the Yarn session CLI.");
allOptions = new Options();
allOptions.addOption(flinkJar);
allOptions.addOption(jmMemory);
allOptions.addOption(tmMemory);
allOptions.addOption(queue);
allOptions.addOption(query);
allOptions.addOption(shipPath);
allOptions.addOption(slots);
allOptions.addOption(dynamicproperties);
allOptions.addOption(DETACHED_OPTION);
allOptions.addOption(YARN_DETACHED_OPTION);
allOptions.addOption(name);
allOptions.addOption(applicationId);
allOptions.addOption(applicationType);
allOptions.addOption(zookeeperNamespace);
allOptions.addOption(nodeLabel);
allOptions.addOption(help);
this.yarnPropertiesFileLocation =
configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
final File yarnPropertiesLocation = getYarnPropertiesLocation(yarnPropertiesFileLocation);
yarnPropertiesFile = new Properties();
if (yarnPropertiesLocation.exists()) {
LOG.info(
"Found Yarn properties file under {}.",
yarnPropertiesLocation.getAbsolutePath());
try (InputStream is = new FileInputStream(yarnPropertiesLocation)) {
yarnPropertiesFile.load(is);
} catch (IOException ioe) {
throw new FlinkException(
"Could not read the Yarn properties file "
+ yarnPropertiesLocation
+ ". Please delete the file at "
+ yarnPropertiesLocation.getAbsolutePath()
+ '.',
ioe);
}
final String yarnApplicationIdString =
yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
if (yarnApplicationIdString == null) {
throw new FlinkException(
"Yarn properties file found but doesn't contain a "
+ "Yarn application id. Please delete the file at "
+ yarnPropertiesLocation.getAbsolutePath());
}
try {
yarnApplicationIdFromYarnProperties =
ConverterUtils.toApplicationId(yarnApplicationIdString);
} catch (Exception e) {
throw new FlinkException(
"YARN properties contain an invalid entry for "
+ "application id: "
+ yarnApplicationIdString
+ ". Please delete the file at "
+ yarnPropertiesLocation.getAbsolutePath(),
e);
}
} else {
yarnApplicationIdFromYarnProperties = null;
}
}
1.4.2.1、添加异常时, 添加 FallbackYarnSessionCli
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
1.4.3、添加 DefaultCLI
默认命令行, standalone模式使用
1.5、初始化 CliFrontend 对象
public CliFrontend(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
List<CustomCommandLine> customCommandLines) {
this.configuration = checkNotNull(configuration);
this.customCommandLines = checkNotNull(customCommandLines);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
this.customCommandLineOptions = new Options();
for (CustomCommandLine customCommandLine : customCommandLines) {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
1.6、通过 SPI 加载安全配置模块
更详细的内容见 Flink1.15源码解析–安全模块及安全上下文
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
1.7、根据命令行参数执行对应的action、回调,并返回状态码
这块是主要逻辑
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
可能需要具有的安全上下文才能运行可调用的.
public interface SecurityContext {
<T> T runSecured(Callable<T> securedCallable) throws Exception;
}
1.7.1、具体执行逻辑是 CliFrontend.parseAndRun(args)
public int parseAndRun(String[] args) {
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
String action = args[0];
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
case ACTION_INFO:
info(params);
return 0;
case ACTION_CANCEL:
cancel(params);
return 0;
case ACTION_STOP:
stop(params);
return 0;
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(
commitID.equals(EnvironmentInformation.UNKNOWN)
? ""
: ", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println(
"Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println(
"Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println(
"Specify the help option (-h or --help) to get help on the command.");
return 1;
}
} catch (CliArgsException ce) {
return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {
return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {
return handleMissingJobException();
} catch (Exception e) {
return handleError(e);
}
}
1.7.2、执行 run 操作
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
}
}
1.7.2.1、验证并获取一个活跃的 自定义命令行
从 1.4 初始化的CustomCommandLine 列表中获取活跃的
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
LOG.debug("Custom commandlines: {}", customCommandLines);
for (CustomCommandLine cli : customCommandLines) {
LOG.debug(
"Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No valid command-line found.");
}
1.7.2.2、创建一个 ProgramOptions 区分 python or jar
create
public static ProgramOptions create(CommandLine line) throws CliArgsException {
if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
return createPythonProgramOptions(line);
} else {
return new ProgramOptions(line);
}
}
1.7.2.3、获取 jar和其他依赖
private List<URL> getJobJarAndDependencies(ProgramOptions programOptions)
throws CliArgsException {
String entryPointClass = programOptions.getEntryPointClassName();
String jarFilePath = programOptions.getJarFilePath();
try {
File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
} catch (FileNotFoundException | ProgramInvocationException e) {
throw new CliArgsException(
"Could not get job jar and dependencies from JAR file: " + e.getMessage(), e);
}
}
1.7.2.4、 获取有效配置
HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数…
1.7.2.5、执行程序
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
}
调用的是 ClientUtils.executeProgram
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(), configuration, program, false, false);
}
1.8、获取执行返回状态码,关闭提交程序
Original: https://blog.csdn.net/wuxintdrh/article/details/127810949
Author: 宝哥大数据
Title: Flink1.15源码解析–任务提交流程—-flink run
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/655998/
转载文章受原作者版权保护。转载请注明原作者出处!