Unverified Commit 0b200f50 authored by aiwenmo's avatar aiwenmo Committed by GitHub

Merge pull request #78 from wxing89/feature-kerberos

增加提交Flink任务到YARN时的Kerberos认证
parents 251a52b9 3098cf29
...@@ -14,12 +14,15 @@ import org.apache.flink.configuration.DeploymentOptions; ...@@ -14,12 +14,15 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterClientFactory; import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil; import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
...@@ -65,6 +68,16 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -65,6 +68,16 @@ public abstract class YarnGateway extends AbstractGateway {
if(Asserts.isNotNullString(config.getFlinkConfig().getJobName())) { if(Asserts.isNotNullString(config.getFlinkConfig().getJobName())) {
configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getFlinkConfig().getJobName()); configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getFlinkConfig().getJobName());
} }
try {
SecurityUtils.install(new SecurityConfiguration(configuration));
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
logger.info("安全认证结束,用户和认证方式:" + currentUser.toString());
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
}
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getClusterConfig().getFlinkConfigPath()); YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getClusterConfig().getFlinkConfigPath());
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment