文章目录
*
– 1.6、通过 SPI 加载安全配置模块
–
+ 1.6.1、SecurityConfiguration 初始化
+ 1.6.2、SecurityUtils 的 install 逻辑
+
* 1.6.2.1、installModules
* 1.6.2.2、installContext
* 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)
1.6、通过 SPI 加载安全配置模块
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
1.6.1、SecurityConfiguration 初始化
public SecurityConfiguration(Configuration flinkConf) {
this(
flinkConf,
flinkConf.get(SECURITY_CONTEXT_FACTORY_CLASSES),
flinkConf.get(SECURITY_MODULE_FACTORY_CLASSES));
}
public static final ConfigOption<List<String>> SECURITY_CONTEXT_FACTORY_CLASSES =
key("security.context.factory.classes")
.stringType()
.asList()
.defaultValues(
"org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory",
"org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory")
.withDescription(
"List of factories that should be used to instantiate a security context. "
+ "If multiple are configured, Flink will use the first compatible "
+ "factory. You should have a NoOpSecurityContextFactory in this list "
+ "as a fallback.");
public static final ConfigOption<List<String>> SECURITY_MODULE_FACTORY_CLASSES =
key("security.module.factory.classes")
.stringType()
.asList()
.defaultValues(
"org.apache.flink.runtime.security.modules.HadoopModuleFactory",
"org.apache.flink.runtime.security.modules.JaasModuleFactory",
"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory")
.withDescription(
"List of factories that should be used to instantiate security "
+ "modules. All listed modules will be installed. Keep in mind that the "
+ "configured security context might rely on some modules being present.");
继续进入
public SecurityConfiguration(
Configuration flinkConf,
List<String> securityContextFactory,
List<String> securityModuleFactories) {
this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
this.loginContextNames =
parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
this.zkLoginContextName =
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
this.securityModuleFactories = Collections.unmodifiableList(securityModuleFactories);
this.securityContextFactory = securityContextFactory;
this.flinkConfig = checkNotNull(flinkConf);
validate();
}
进一步看下validate的逻辑:
private void validate() {
if (!StringUtils.isBlank(keytab)) {
if (StringUtils.isBlank(principal)) {
throw new IllegalConfigurationException(
"Kerberos login configuration is invalid: keytab requires a principal.");
}
File keytabFile = new File(keytab);
if (!keytabFile.exists() || !keytabFile.isFile()) {
throw new IllegalConfigurationException(
"Kerberos login configuration is invalid: keytab ["
+ keytab
+ "] doesn't exist!");
} else if (!keytabFile.canRead()) {
throw new IllegalConfigurationException(
"Kerberos login configuration is invalid: keytab ["
+ keytab
+ "] is unreadable!");
}
}
}
如果全局配置(flink-conf.yaml)里配置了security.kerberos.login.keytab这个参数。那么要校验这个配置所指定的目录存在以及可读。这里其实有必要对kerberos的安全认证相关知识了解下。
1.6.2、SecurityUtils 的 install 逻辑
public static void install(SecurityConfiguration config) throws Exception {
installModules(config);
installContext(config);
}
1.6.2.1、installModules
这里安装的安全模板主要包括了Java认证与授权服务(JAAS),Hadoop用户组信息(UGI)和Zookeeper的全过程安全设置。
static void installModules(SecurityConfiguration config) throws Exception {
List<SecurityModule> modules = new ArrayList<>();
for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
SecurityModuleFactory moduleFactory = null;
try {
moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
} catch (NoMatchSecurityFactoryException ne) {
LOG.error("Unable to instantiate security module factory {}", moduleFactoryClass);
throw new IllegalArgumentException("Unable to find module factory class", ne);
}
SecurityModule module = moduleFactory.createModule(config);
if (module != null) {
module.install();
modules.add(module);
}
}
installedModules = modules;
}
findModuleFactory 实现
public static SecurityModuleFactory findModuleFactory(String securityModuleFactoryClass)
throws NoMatchSecurityFactoryException {
return findFactoryInternal(
securityModuleFactoryClass,
SecurityModuleFactory.class,
SecurityModuleFactory.class.getClassLoader());
}
继续, 通过 ServiceLoader 构建 服务
ServiceLoader是jdk6里面引进的一个特性。它用来实现 SPI(Service Provider Interface),一种服务发现机制,很多框架用它来做来做服务的扩展发现。
private static <T> T findFactoryInternal(
String factoryClassCanonicalName, Class<T> factoryClass, ClassLoader classLoader)
throws NoMatchSecurityFactoryException {
Preconditions.checkNotNull(factoryClassCanonicalName);
ServiceLoader<T> serviceLoader;
if (classLoader != null) {
serviceLoader = ServiceLoader.load(factoryClass, classLoader);
} else {
serviceLoader = ServiceLoader.load(factoryClass);
}
List<T> matchingFactories = new ArrayList<>();
Iterator<T> classFactoryIterator = serviceLoader.iterator();
classFactoryIterator.forEachRemaining(
classFactory -> {
if (factoryClassCanonicalName.matches(
classFactory.getClass().getCanonicalName())) {
matchingFactories.add(classFactory);
}
});
if (matchingFactories.size() != 1) {
throw new NoMatchSecurityFactoryException(
"zero or more than one security factory found",
factoryClassCanonicalName,
matchingFactories);
}
return matchingFactories.get(0);
}
1.6.2.2、installContext
流程与 installModules 类似,通过 SecurityFactoryServiceLoader 找到 SecurityContextFactory
static void installContext(SecurityConfiguration config) throws Exception {
for (String contextFactoryClass : config.getSecurityContextFactories()) {
try {
SecurityContextFactory contextFactory =
SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
if (contextFactory.isCompatibleWith(config)) {
try {
installedContext = contextFactory.createContext(config);
break;
} catch (SecurityContextInitializeException e) {
LOG.error(
"Cannot instantiate security context with: " + contextFactoryClass,
e);
} catch (LinkageError le) {
LOG.error(
"Error occur when instantiate security context with: "
+ contextFactoryClass,
le);
}
} else {
LOG.debug("Unable to install security context factory {}", contextFactoryClass);
}
} catch (NoMatchSecurityFactoryException ne) {
LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass);
}
}
if (installedContext == null) {
LOG.error("Unable to install a valid security context factory!");
throw new Exception("Unable to install a valid security context factory!");
}
}
Original: https://blog.csdn.net/wuxintdrh/article/details/127824062
Author: 宝哥大数据
Title: Flink1.15源码解析–安全模块及安全上下文
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/660215/
转载文章受原作者版权保护。转载请注明原作者出处!