Flink1.15源码解析–安全模块及安全上下文

文章目录

*
1.6、通过 SPI 加载安全配置模块

+ 1.6.1、SecurityConfiguration 初始化
+ 1.6.2、SecurityUtils 的 install 逻辑
+
* 1.6.2.1、installModules
* 1.6.2.2、installContext
* 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)

1.6、通过 SPI 加载安全配置模块

 SecurityUtils.install(new SecurityConfiguration(cli.configuration));

1.6.1、SecurityConfiguration 初始化


    public SecurityConfiguration(Configuration flinkConf) {
        this(
                flinkConf,
                flinkConf.get(SECURITY_CONTEXT_FACTORY_CLASSES),
                flinkConf.get(SECURITY_MODULE_FACTORY_CLASSES));
    }

    public static final ConfigOption<List<String>> SECURITY_CONTEXT_FACTORY_CLASSES =
            key("security.context.factory.classes")
                    .stringType()
                    .asList()
                    .defaultValues(
                            "org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory",
                            "org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory")
                    .withDescription(
                            "List of factories that should be used to instantiate a security context. "
                                    + "If multiple are configured, Flink will use the first compatible "
                                    + "factory. You should have a NoOpSecurityContextFactory in this list "
                                    + "as a fallback.");

    public static final ConfigOption<List<String>> SECURITY_MODULE_FACTORY_CLASSES =
            key("security.module.factory.classes")
                    .stringType()
                    .asList()
                    .defaultValues(
                            "org.apache.flink.runtime.security.modules.HadoopModuleFactory",
                            "org.apache.flink.runtime.security.modules.JaasModuleFactory",
                            "org.apache.flink.runtime.security.modules.ZookeeperModuleFactory")
                    .withDescription(
                            "List of factories that should be used to instantiate security "
                                    + "modules. All listed modules will be installed. Keep in mind that the "
                                    + "configured security context might rely on some modules being present.");

继续进入


    public SecurityConfiguration(
            Configuration flinkConf,
            List<String> securityContextFactory,
            List<String> securityModuleFactories) {

        this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
        this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
        this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
        this.loginContextNames =
                parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
        this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
        this.zkLoginContextName =
                flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);

        this.securityModuleFactories = Collections.unmodifiableList(securityModuleFactories);
        this.securityContextFactory = securityContextFactory;
        this.flinkConfig = checkNotNull(flinkConf);

        validate();
    }

进一步看下validate的逻辑:

private void validate() {
        if (!StringUtils.isBlank(keytab)) {

            if (StringUtils.isBlank(principal)) {
                throw new IllegalConfigurationException(
                        "Kerberos login configuration is invalid: keytab requires a principal.");
            }

            File keytabFile = new File(keytab);
            if (!keytabFile.exists() || !keytabFile.isFile()) {
                throw new IllegalConfigurationException(
                        "Kerberos login configuration is invalid: keytab ["
                                + keytab
                                + "] doesn't exist!");
            } else if (!keytabFile.canRead()) {
                throw new IllegalConfigurationException(
                        "Kerberos login configuration is invalid: keytab ["
                                + keytab
                                + "] is unreadable!");
            }
        }
    }

如果全局配置(flink-conf.yaml)里配置了security.kerberos.login.keytab这个参数。那么要校验这个配置所指定的目录存在以及可读。这里其实有必要对kerberos的安全认证相关知识了解下。

1.6.2、SecurityUtils 的 install 逻辑


    public static void install(SecurityConfiguration config) throws Exception {

        installModules(config);
        installContext(config);
    }

1.6.2.1、installModules

这里安装的安全模板主要包括了Java认证与授权服务(JAAS),Hadoop用户组信息(UGI)和Zookeeper的全过程安全设置。

Flink1.15源码解析--安全模块及安全上下文
    static void installModules(SecurityConfiguration config) throws Exception {

        List<SecurityModule> modules = new ArrayList<>();

        for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
            SecurityModuleFactory moduleFactory = null;
            try {

                moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
            } catch (NoMatchSecurityFactoryException ne) {
                LOG.error("Unable to instantiate security module factory {}", moduleFactoryClass);
                throw new IllegalArgumentException("Unable to find module factory class", ne);
            }
            SecurityModule module = moduleFactory.createModule(config);

            if (module != null) {
                module.install();
                modules.add(module);
            }
        }
        installedModules = modules;
    }

findModuleFactory 实现


    public static SecurityModuleFactory findModuleFactory(String securityModuleFactoryClass)
            throws NoMatchSecurityFactoryException {
        return findFactoryInternal(
                securityModuleFactoryClass,
                SecurityModuleFactory.class,
                SecurityModuleFactory.class.getClassLoader());
    }

继续, 通过 ServiceLoader 构建 服务
ServiceLoader是jdk6里面引进的一个特性。它用来实现 SPI(Service Provider Interface),一种服务发现机制,很多框架用它来做来做服务的扩展发现。

Flink1.15源码解析--安全模块及安全上下文
    private static <T> T findFactoryInternal(
            String factoryClassCanonicalName, Class<T> factoryClass, ClassLoader classLoader)
            throws NoMatchSecurityFactoryException {

        Preconditions.checkNotNull(factoryClassCanonicalName);

        ServiceLoader<T> serviceLoader;

        if (classLoader != null) {
            serviceLoader = ServiceLoader.load(factoryClass, classLoader);
        } else {
            serviceLoader = ServiceLoader.load(factoryClass);
        }

        List<T> matchingFactories = new ArrayList<>();
        Iterator<T> classFactoryIterator = serviceLoader.iterator();

        classFactoryIterator.forEachRemaining(
                classFactory -> {
                    if (factoryClassCanonicalName.matches(
                            classFactory.getClass().getCanonicalName())) {
                        matchingFactories.add(classFactory);
                    }
                });

        if (matchingFactories.size() != 1) {
            throw new NoMatchSecurityFactoryException(
                    "zero or more than one security factory found",
                    factoryClassCanonicalName,
                    matchingFactories);
        }
        return matchingFactories.get(0);
    }

1.6.2.2、installContext

流程与 installModules 类似,通过 SecurityFactoryServiceLoader 找到 SecurityContextFactory

 static void installContext(SecurityConfiguration config) throws Exception {

        for (String contextFactoryClass : config.getSecurityContextFactories()) {
            try {
                SecurityContextFactory contextFactory =
                        SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
                if (contextFactory.isCompatibleWith(config)) {
                    try {
                        installedContext = contextFactory.createContext(config);

                        break;
                    } catch (SecurityContextInitializeException e) {
                        LOG.error(
                                "Cannot instantiate security context with: " + contextFactoryClass,
                                e);
                    } catch (LinkageError le) {
                        LOG.error(
                                "Error occur when instantiate security context with: "
                                        + contextFactoryClass,
                                le);
                    }
                } else {
                    LOG.debug("Unable to install security context factory {}", contextFactoryClass);
                }
            } catch (NoMatchSecurityFactoryException ne) {
                LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass);
            }
        }
        if (installedContext == null) {
            LOG.error("Unable to install a valid security context factory!");
            throw new Exception("Unable to install a valid security context factory!");
        }
    }

Original: https://blog.csdn.net/wuxintdrh/article/details/127824062
Author: 宝哥大数据
Title: Flink1.15源码解析–安全模块及安全上下文

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/660215/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球