Flink 作业提交流程

大家好,我是小寒~

今天给大家带来一篇 flink 作业提交相关的文章。

我们都知道,在开发完一个 flink 应用程序后,打包成 jar 包,然后通过 FLink CLI 或者 Web UI 提交作业到 FLink 集群。其实,Flink 的 jar 文件并不是 FLink 集群的可执行文件,需要经过转换之后提交给集群。其转换过程分为两个大的步骤。

  1. 在 FLink Client 中通过反射启动 Jar 中的 main 函数,生成 Flink StreamGraph、JobGraph,将 JobGraph 提交给 Flink 集群。
  2. FLink 集群收到 JobGraph 之后,将 JobGraph 翻译成 ExecutionGraph,然后开始调度执行,启动成功之后开始消费数据。

总的来说,对用户API的调用, 可以转换为 StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行拓扑(Task DAG)

提交流程

FLink 作业在开发完毕之后,需要提交到 FLink 集群执行。ClientFrontend 是入口,触发用户开发的 Flink 应用 jar 文件中的 main 方法,然后交给 PipelineExecutor#execue 方法,最终会触发一个具体的 PipelineExecutor 执行,如下图所示。

Flink 作业提交流程

作业执行可以选择 Session 和 Per-Job 模式两种集群。

  • Session 模式的集群,一个集群中运行多个作业。
  • Per-Job 模式的集群,一个集群中只运行一个作业,作业执行完毕则集群销毁。

Flink 作业提交流程

流水线执行器 PipelineExecutor

流水线执行器在 FLink 中叫作 PipelineExecutor,是 FLink Client 生成 JobGraph 之后,将作业提交给集群的重要环节。

集群有 Session 和 Per-Job 两种模式。在这两种模式下,集群的启动时机、提交作业的方式不同,所以在生产环境中有两种 PipelineExecutor。Session 模式对应于 AbstractSessionClusterExecutor,Per-Job 模式对应于 AbstractJobClusterExecutor。

  1. Session 模式

该模式下,作业共享集群资源,作业通过 Http 协议进行提交。

在 Flink 1.10 版本中提供了三种会话模式:Yarn 会话模式、K8s 会话模式、Standalone。Standalone 模式比较特别,Flink 安装在物理机上,不能像在资源集群上一样,可以随时启动一个新集群,所有的作业共享 Standalone 集群。

在 Session 模式下, Yarn 作业提交使用 yarn-session.sh 脚本, K8s 作业提交使用 kubernetes-session.sh 脚本。两者的具体实现不同 ,但逻辑是类似的 ,在启动脚本的时候就会检查是否存在已经启动好的 Flink Session 模式集群,如果没有,则启动一个 Flink Session 模式集群,然后在 PipelineExecutor 中,通过 Dispatcher 提供的 Rest 接口提交 JobGraph,Dispatcher 为每一个作业启动一个 JobMaster,然后进入作业执行阶段。

  1. Per-Job 模式

该模式下,一个作业一个集群,作业之间相互隔离。

在 FLink 1.10 版本中,只有 Yarn 上实现了 Per-Job 模式。

Per-Job 模式下,因为不需要共享集群,所以在 PipelineExecutor 中执行作业提交的时候,可以创建集群并将 JobGraph 以及所需要的文件等一同交给 Yarn 集群,Yarn 集群在容器中启动 JobManager 进程,进行一系列的初始化动作,初始化完毕之后,从文件系统中获取 JobGraph ,交给 Dispatcher。 之后的执行流程与 Session 模式下的执行流程相同。

yarn session 的提交流程

从总体上来说,在 Yarn 集群上使用 Session 模式提交 Flink 作业的过程分为 3 个阶段。首先在 Yarn 上启动 Flink Session 模式的集群;其次通过 Flink Client 提交作业 ,最后进行作业的调度执行。

Flink 作业提交流程
  1. 启动集群

(1) 使用 yarn-session.sh 提交会话模式的作业

如果提交到已经存在的集群, 则获取 Yarn 集群信息、应用 ID,并准备提交作业。

如果是启动新的 Yarn Session 集群,则进入到步骤 (2)。

(2)Yarn 启动新的 Flink 集群

如果没有集群,则创建一个新的 Session 模式的集群。首先,将应用的配置文件(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink jar、用户 jar 文件、JobGraph 对象等)上传至分布式存储(如 HDFS)的应用暂存目录。

然后通过 Yarn Client 向 Yarn 提交 Flink 创建集群的申请,Yarn 分配资源,在申请的 Yarn Container 中初始化并启动 FLink JobManager 进程,在 JobManager 进程中运行 YarnSessionClusterEntrypoint 作为集群启动的入口(不同的集群部署模式有不同的 ClusterEntrypoint 的实现),初始化 Dispatcher、ResourceManager。启动相关的 RPC 服务,等待 Client 通过 Rest 接口提交作业。

2、作业提交

Yarn 集群准备好后,开始作业提交。

(1)Flink Client 通过 Rest 向 Dispatcher 提交 JobGraph。

(2)Dispatcher 是 Rest 接口,不负责实际的调度、执行方面的工作,当收到 JobGraph 后,为作业创建一个 JobMaster,将工作交给 JobMaster(负责作业调度、管理作业和 Task 的生命周期 ),构建 ExecutionGraph(Job Graph的并行化版本)

  1. 作业调度执行

(1)JobMaster 向 YarnResourceManager 申请资源,开始调度 ExecutionGraph 的执行;初次提交作业,集群尚没有 TaskManager,此时资源不足,开始申请资源。

(2)YarnResourceManager 收到 JobMaster 的资源请求,如果当前有空闲的 Slot,则将 Slot 分配给 JobMaster,否则 YarnResourceManager 将向 Yarn Master(Yarn 集群的 ResourceManager) 请求创建 TaskManager。

(3)YarnResourceManager 将资源请求加入等待请求队列,并通过心跳向 YARN RM 申请新的 Container 资源来启动 TaskManager 进程;Yarn 分配新的 Container 给 TaskManager。

(4)YarnResourceManager 从 HDFS 加载 Jar 文件等所需的相关资源,在容器中启动 TaskManager。

(5)TaskManager 启动之后,向 YarnResourceManager 进行注册,并把自己的 Slot 资源情况汇报给 YarnResourceManager 。

(6)YarnResourceManager 从等待队列中取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给了哪个 JobMaster。

(7)TaskManager 向 JobMaster 提供 Slot,JobMaster 调度 Task 到 TaskManager 的此 Slot 上执行。

至此,作业进入执行阶段。

Yarn Per-Job 提交流程

Yarn Per-Job 模式提交作业与 Yarn-Session 模式提交作业基本类似。Per-Job 模式下,JobGraph 和集群资源请求一起提交给 Yarn。

Flink 作业提交流程
  1. 启动集群 (1)使用 flink run -m yarn-cluster 提交 Per-Job 模式的作业。 (2)Yarn 启动 Flink 集群。该模式下 Flink 集群的启动入口是 YarnJobClusterEntryPoint,其它与 Yarn-Session 模式启动类似。
  2. 作业提交 该步骤与 Session 模式下的不同之处在于,Client 并不会通过 Rest 向 Dispacher 提交 JobGraph,由 Dispacher 从本地文件系统获取 JobGraph,其后的步骤与 Session 模式一样。
  3. 作业调度执行 与 Yarn-Session 模式下一致。

流处理的转换过程

StreamGraph

使用 DataStream API 开发的应用程序,首先被转换为 Transformation,然后被映射为 StreamGraph。

我们以熟知的 WordCount 程序为例,它的 StreamGraph 如下图所示。

Flink 作业提交流程

从图中我们可以看到,StreamGraph 是由 StreamNode 和 StreamEdge 构成。

  • StreamNode StreamNode 是 StreamGraph 中的节点,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示为一个算子;从逻辑上来说,StreamNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StreamNode 可以有多个输入,也可以有多个输出。 实体的 StreamNode 会最终变为物理的算子。虚拟的 StreamNode 会附着在 StreamEdge 上。
  • StreamEdge StreamEdge 是 StreamGraph 中的边, 用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边。 StreamEdge 中包含了盘路输出、分区器、字段筛选输出等的信息。

作业图

JobGraph 可以由流计算的 StreamGraph 转换而来。
流计算中,在 StreamGraph 的基础上进行了一些优化,如通过 OperatorChain 机制将算子合并起来,在执行时,调度在同一个 Task 线程上,避免数据的跨线程、跨网络的传递。

Flink 作业提交流程

从 JobGraph 的图里可以看到,数据从上一个算子流到下一个算子的过程中,上游作为生产者提供了中间数据集(IntermediateDateSet),而下游作为消费者需要 JobEdge。JobEdge 是一个通信管道,连接了上游生产的中间数据集和 JobVertex 节点。

JobGraph 的核心对象是 JobVertex、JobEdge 和 IntermediateDateSet。

  • JobVertex 经过算子融合优化后符合条件的多个 StreamNode 可能会融合在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个算子,JobVertex 的输入是 JobEdge,输出是 IntermediateDateSet。
  • JobEdge JobEdge 是 JobGraph 中连接 IntermediateDateSet 和 JobVertex 的边,表示 JobGraph 中的一个数据流转通道,其上游数据源是 IntermediateDateSet,下游消费者是 JobVertex ,即数据通过 JobEdge 由 IntermediateDateSet 传递给目标 JobVertex 。 JobEdge 中的数据分发模式会直接影响执行时 Task 之间的数据连接关系,是点对点连接还是全连接。
  • IntermediateDateSet 中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出,即该 JobVertex 中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。 IntermediateDataSet 的个数与该 JobVertex 对应的 StreamNode 的出边数量相同,可以是一个或者多个。

执行图

ExecutionGraph 是调度 Flink 作业执行的核心数据结构,包含了作业中所有并行执行的 Task 的信息、Task 之间的关联关系、数据流转关系等。

StreamGraph、JobGraph 在 Flink 客户端中生成,然后提交给 Flink 集群。JobGraph 到 ExecutionGraph 的转换在 JobMaster 中完成。在转化过程中,有如下重要变化。

  • 加入了并行度的概念,成为真正可调度的图结构。
  • 生成了与 JobVertex 对应的 ExecutionJobVertex 和 ExecutionVertex,与IntermediateDataSet 对应的 IntermediateResult 和 IntermediateResultPartition 等。

生成的图如下图所示。

Flink 作业提交流程

ExecutionGraph 的核心对象有 ExecutionJobVertex 、ExecutionVertex、IntermediateResult 、IntermediateResultPartition、ExecutionEdge 和 Execution。

  • ExecutionJobVertex 该对象和 JobGraph 中的 JobVertex 一一对应。该对象还包含一组 ExecutionVertex,数量与该 JobVertex 中所包含的 StreamNode 的并行度一致,假设 StreamNode 的并行度为3,那么 ExecutionJobVertex 也会包含 3个 ExecutionVertex。
  • ExecutionVertex ExecutionJobVertex 中会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex。 构造 ExecutionVertex 的同时,也会构建 ExecutionVertex 的输出 IntermediateResult。
  • IntermediateResult IntermediateResult 又叫中间结果集,该对象是个逻辑概念,表示 ExecutionJobVertex 的输出,和 JobVertex 中的 IntermediateDataSet 一一对应,同样,一个ExecutionJobVertex 可以有多个中间结果,取决于当前 JobVertex 有几个出边(JobEdge) 一个中间结果包含多个中间结果分区 IntermediateResultPartition,其个数等于该 JobVertex 的并发度,或者叫作算子的并行度。
  • IntermediateResultPartition IntermediateResultPartition 又叫作中间结果分区,表示一个 ExecutionVertex 的输出结果,与 ExecutionEdge 相关联。
  • ExecutionEdge 表示 ExecutionVertex 的输入,连接到上游产生的 IntermediateResultPartition 。
  • Execution ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将 ExecutionVertex 中的信息包装为一个 Execution,执行一个 ExecutionVertex 的一次尝试。JobManager 和 TaskManager 之间关于 Task 的部署和 Task 的执行状态的更新都是通过 ExecutionAttemptID 来标识实例的。在发生故障或者数据需要重算的情况下,ExecutionVertex 可能会有多个ExecutionAttemptID 。一个 Execution 通过 ExecutionAttemptID 来唯一标识。

总结

Flink 作业执行前需要提交 Flink 集群, Flink 集群可以与不同的资源框架(Yarn、K8s、Mesos 等)进行集成,可以按照不同的模式(Session 模式和 Per-Job模式)运行,所以在 Flink 作业提交过程中,可能在资源框架上启动Flink集群。Flink 就绪之后,进入作业提交阶段, 在Flink客户端进行StreamGraph、JobGraph的转换,提交 JobGraph 到 Flink 集群,然后 Flink 集群负责将 JobGraph 转换为 ExecutionGraph,之后进入调度执行阶段。

Original: https://www.cnblogs.com/cxyxz/p/16469037.html
Author: 算法推荐管
Title: Flink 作业提交流程

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/713562/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 三维城市特效

    基于FreethEarth开发的Web三维数字孪生城市前端效果 饮水思源,不忘初心。 要面包,也要有诗和远方。 Original: https://www.cnblogs.com/…

    技术杂谈 2023年5月31日
    088
  • 【赵渝强老师】使用Weblogic的WLST工具

    一、什么是Weblogic WLST? WebLogic 脚本工具 (WebLogic Scripting Tool , WLST) 是一种命令行脚本界面,系统管理员和操作员用它来…

    技术杂谈 2023年7月24日
    088
  • Docker容器网络配置

    Docker容器网络配置 1、Linux内核实现名称空间的创建 1.1 ip netns命令 可以借助 ip netns命令来完成对 Network Namespace 的各种操作…

    技术杂谈 2023年6月21日
    071
  • SSM实战(57)在线教育(57)前端(25)课程管理(12)章节的小节的添加/删除

    博客园 :当前访问的博文已被密码保护 请输入阅读密码: Original: https://www.cnblogs.com/qiu-hua/p/16514705.htmlAutho…

    技术杂谈 2023年6月1日
    098
  • 【赵渝强老师】Kubernetes的探针

    Kubernetes提供了探针(Probe)对容器的健康性进行检测。实际上我们不仅仅要对容器进行健康检测,还要对容器内布置的应用进行健康性检测。 Probe有以下两种类型: liv…

    技术杂谈 2023年7月24日
    064
  • 初次使用vscode遇到的坑

    vscode使用时导入模块的一些问题 在编写完某一个模块的代码时,需要在当前的模块下进行调试运行,发现报”ImportError:cannot import name …

    技术杂谈 2023年7月25日
    095
  • 离散化

    3 -1 2 -2 这个数列有 5个逆序对 4 2 3 1 也是五个 我们把最小的-2视作1 第二的-1看做2 … 法一(推荐): 结构体保存数组num 和它在原数组里…

    技术杂谈 2023年6月21日
    0109
  • 基于netty实现的长连接,心跳机制及重连机制

    详细 本篇demo实现的功能是基于netty的心跳机制和长连接以及重连机制,最关键的就是通过netty中的 IdleStateHandler 的超时机制来实现心跳和重连 ,然后通过…

    技术杂谈 2023年5月31日
    0105
  • Are You OK?主键、聚集索引、辅助索引

    每张表都一定存在主键吗? 关于这个问题,各位小伙伴们不妨先自己想一想,再往下寻找答案。 首先公布结论: 对于 InnoDB 存储引擎来说,每张表都一定有个主键(Primary Ke…

    技术杂谈 2023年7月24日
    087
  • windows守护进程工具–nssm详解

    nssm详解 零、文章目录 一、nssm简介 nssm是一个服务封装程序,它可以将普通exe程序封装成服务,实现开机自启动,同类型的工具还有微软自己的srvany,不过nssm更加…

    技术杂谈 2023年5月31日
    0116
  • 设计模式 18 中介者模式

    中介者模式(Mediator Pattern)属于 行为型模式 一提到中介,大家都非常熟悉,生活中最常见的就是房屋中介。 虽然中介要收取一定费用,但却能给房东和租客都提供大量遍历,…

    技术杂谈 2023年7月25日
    082
  • 新书上市——Microsoft Teams 平台完全手册

    今年四月份左右开始写的这本书——《Microsoft Teams 平台完全手册》,经过多少个夜晚和周末的奋战,五一期间更是一刻都没有休息,键盘写坏一个,到今天也差不多可以完整地面世…

    技术杂谈 2023年5月31日
    081
  • Windows server 2008 域控制器

    Windows的网络架构 Windows的网络架构大致分为: 工作组架构 域架构 工作组架构:工作组是由一组通过网络连接在一起的计算机组成,组内的计算机可以共享本机的文件,打印机等…

    技术杂谈 2023年7月11日
    0105
  • springboot启动报错BeanCreationException

    springboot程序启动报错,数据库连接错误。检查了一下网络,发现是wiki连错了,改一下即可,嘿嘿。错误信息:Failed to initialize pool: Commu…

    技术杂谈 2023年7月11日
    063
  • C语言建立哈夫曼树编码译码

    #include <stdio.h> #include <string.h> #include <stdlib.h> //&#x8F93…

    技术杂谈 2023年7月24日
    065
  • 初来乍到,请多指教

    刚开通了博客园,会同步发一些csdn上的内容。 博客园的第一感觉就是,很朴实,很复古,有种非移动互联网时代的风格。 其次,编辑文章时,发现对导入的Markdown文档支持的很好,无…

    技术杂谈 2023年7月25日
    066
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球