Commit 3098cf29 authored by wangxing's avatar wangxing

增加提交Flink任务到YARN时的Kerberos认证

parent 251a52b9
...@@ -14,12 +14,15 @@ import org.apache.flink.configuration.DeploymentOptions; ...@@ -14,12 +14,15 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterClientFactory; import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil; import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
...@@ -65,6 +68,16 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -65,6 +68,16 @@ public abstract class YarnGateway extends AbstractGateway {
if(Asserts.isNotNullString(config.getFlinkConfig().getJobName())) { if(Asserts.isNotNullString(config.getFlinkConfig().getJobName())) {
configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getFlinkConfig().getJobName()); configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getFlinkConfig().getJobName());
} }
try {
SecurityUtils.install(new SecurityConfiguration(configuration));
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
logger.info("安全认证结束,用户和认证方式:" + currentUser.toString());
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
}
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getClusterConfig().getFlinkConfigPath()); YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getClusterConfig().getFlinkConfigPath());
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment