Flink1.15源码解析–启动脚本—-start-cluster.sh

[root@chb1 bin]

bin=dirname "$0"
bin=cd "$bin"; pwd

. "$bin"/config.sh

shopt -s nocasematch
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then

    readMasters

    echo "Starting HA cluster with ${#MASTERS[@]} masters."

    for ((i=0;i<${#MASTERS[@]};++i)); do
        master=${MASTERS[i]}
        webuiport=${WEBUIPORTS[i]}

        if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
            "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"

        else
            ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"
        fi
    done

else
    echo "Starting cluster."

    "$FLINK_BIN_DIR"/jobmanager.sh start
fi
shopt -u nocasematch

TMWorkers start

整个流程如下:

  • 1、加载 config.sh,环境变量,以及后面需要调用的方法
  • 2、启动jobmanager
  • 如果是 HA 模式, 调用 config.sh 中的 readMasters 获取 master ,一一启动 jobmanager
    • 本机是master,直接调用 "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
    • 否则通过 ssh 调用远程 jobmanager
  • 3、启动taskmanager
  • TMWorkers start

1.1、config.sh

这个文件很长,定义一些变量 、方法

1.2、jobmanager.sh

[root@chb1 bin]

USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"

STARTSTOP=$1
HOST=$2
WEBUIPORT=$3

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi

bin=dirname "$0"
bin=cd "$bin"; pwd

. "$bin"/config.sh

ENTRYPOINT=standalonesession

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then

    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
    parseJmArgsAndExportLogs "${ARGS[@]}"

    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
    if [ ! -z $HOST ]; then
        args+=("--host")
        args+=("${HOST}")
    fi

    if [ ! -z $WEBUIPORT ]; then
        args+=("--webui-port")
        args+=("${WEBUIPORT}")
    fi

    if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
        args+=(${DYNAMIC_PARAMETERS[@]})
    fi
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else

    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi

执行案例命令如下

&#x4E3A;&#x4E86;&#x663E;&#x793A;&#x597D;&#x770B;&#xFF0C;&#x5C06;&#x4E00;&#x884C;&#x8F6C;&#x4E3A;&#x591A;&#x884C;&#x663E;&#x793A;
/uardata1/soft/flink-1.15.2/bin/flink-daemon.sh start standalonesession
--configDir /uardata1/soft/flink-1.15.2/conf
--executionMode cluster
-D jobmanager.memory.off-heap.size=134217728b
-D jobmanager.memory.jvm-overhead.min=201326592b
-D jobmanager.memory.jvm-metaspace.size=268435456b
-D jobmanager.memory.heap.size=1073741824b
-D jobmanager.memory.jvm-overhead.max=201326592b

这个脚本里最重要的内容,就是配置了 ENTRYPOINT=standalonesession 这个变量,确定了集群启动模式,从这里我们也可以看出,如果使用 start-cluster.sh 脚本启动 flink,那么我们启动的就是一个 standalonesession 模式的集群。

我们继续往下看,在配置完 ENTRYPOINT 的值后,脚本去调用了 flink-daemon.sh 脚本,并将 ENTRYPOINT 的值传入。flink-daemon.sh 脚本的内容我们稍等下去看,先来看看 taskmanager.sh 脚本做了什么。

1.3、taskmanager.sh

start-cluster.sh 中 TMWorkers start 启动 taskmanager

Start TaskManager instance(s)
&#x542F;&#x52A8; taskmanager
TMWorkers start

实际调用是config.sh中的 TMWorkers()


TMWorkers() {
    CMD=$1

    readWorkers

    if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then

        for worker in ${WORKERS[@]}; do
            "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
        done
    else

        command -v pdsh >/dev/null 2>&1
        if [[ $? -ne 0 ]]; then
            for worker in ${WORKERS[@]}; do
                ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
            done
        else
            PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \
                "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
        fi
    fi
}

taskmanager.sh 脚本里有很多内容,我们来看最重要的两点:

1、为ENTRYPOINT赋值taskexecutor

2、调用flink-daemon.sh脚本,并将ENTRYPOINT传入

#!/usr/bin/env bash

USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"

STARTSTOP=$1

ARGS=("${@:2}")

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi

bin=dirname "$0"
bin=cd "$bin"; pwd

. "$bin"/config.sh

ENTRYPOINT=taskexecutor

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then

    if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
        export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
    fi

    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

    parseTmArgsAndExportLogs "${ARGS[@]}"

    if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
        ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
    fi

    ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
else
    if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then

        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
    else

        read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
        for NODE_ID in "${NODE_LIST[@]:1}"; do

            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
        done
    fi
fi

这里我们看到,再一次的调用了flink-daemon.sh脚本,现在我们去看看flink-daemon.sh做了什么

1.4、flink-daemon.sh

这个脚本很长,但是里面主要分为两部分:

在前面的jobmanager.sh我们将standalonesession作为参数传入了该脚本,在taskmanager.sh脚本中我们将taskexecutor作为参数传入了该脚本,可以看到这个操作就是为了确定主节点和从节点的入口类分别为什么

case $DAEMON in
    (taskexecutor)
        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    ;;

    (zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

    (historyserver)
        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
    ;;

    (standalonesession)
        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    ;;

    (standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
    ;;

    (*)
        echo "Unknown daemon '${DAEMON}'. $USAGE."
        exit 1
    ;;
esac
case $STARTSTOP in

    (start)

        if [ -f "$pid" ]; then
          active=()
          while IFS='' read -r p || [[ -n "$p" ]]; do
            kill -0 $p >/dev/null 2>&1
            if [ $? -eq 0 ]; then
              active+=($p)
            fi
          done < "${pid}"

          count="${#active[@]}"

          if [ ${count} -gt 0 ]; then
            echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
          fi
        fi

        FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})

        echo "Starting $DAEMON daemon on host $HOSTNAME."

        "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200- 2>&1 < /dev/null &

        mypid=$!

        if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then
            echo $mypid >> "$pid"
        else
            echo "Error starting $DAEMON daemon."
            exit 1
        fi
    ;;

到此为止,我们找到了真正启动主节点和从节点的地方,也找到了

  • 主节点的入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint,
  • 从节点的入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner。

接下来我们就可以将这两个类作为Flink源码的入口来一探究竟了。

Original: https://blog.csdn.net/wuxintdrh/article/details/127816141
Author: 宝哥大数据
Title: Flink1.15源码解析–启动脚本—-start-cluster.sh

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/654646/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球