Commit 67918f28 authored by wenmo's avatar wenmo

savepoint

parent 527e57d6
......@@ -5,7 +5,7 @@ import com.dlink.dto.SessionDTO;
import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.job.JobResult;
import com.dlink.result.IResult;
import com.dlink.service.StudioService;
......
......@@ -5,7 +5,7 @@ import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.ca.ColumnCANode;
import com.dlink.explainer.ca.TableCANode;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.job.JobResult;
import com.dlink.result.IResult;
......
......@@ -9,7 +9,7 @@ import com.dlink.explainer.ca.CABuilder;
import com.dlink.explainer.ca.ColumnCANode;
import com.dlink.explainer.ca.TableCANode;
import com.dlink.gateway.Gateway;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
......
package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Assert;
import com.dlink.cluster.FlinkCluster;
import com.dlink.common.result.Result;
import com.dlink.constant.FlinkConstant;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.exception.BusException;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
......@@ -25,9 +20,6 @@ import com.dlink.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* 任务 服务实现类
*
......@@ -62,11 +54,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Assert.check(statement);
JobConfig config = task.buildSubmitConfig();
GatewayConfig gatewayConfig = new GatewayConfig();
gatewayConfig.setJobName(config.getJobName());
gatewayConfig.getFlinkConfig().setJobName(config.getJobName());
gatewayConfig.setType(GatewayType.YARN_PER_JOB);
gatewayConfig.setFlinkConfigPath("/opt/src/flink-1.12.2_pj/conf");
gatewayConfig.setFlinkLibs("hdfs:///flink12/lib/flinklib");
gatewayConfig.setYarnConfigPath("/usr/local/hadoop/hadoop-2.7.7/etc/hadoop/yarn-site.xml");
ClusterConfig clusterConfig = ClusterConfig.build(
"/opt/src/flink-1.12.2_pj/conf",
"/opt/src/flink-1.12.2_pj/conf",
"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop/yarn-site.xml");
gatewayConfig.setClusterConfig(clusterConfig);
JobManager jobManager = JobManager.build(config);
SubmitResult result = jobManager.submitGraph(statement.getStatement(), gatewayConfig);
return Result.succeed(result,"提交成功");
......
......@@ -14,8 +14,8 @@ import java.util.List;
/**
* MainApp
*
* @author qiwenkai
* @since 2021/10/27 11:10
* @author wenmo
* @since 2021/10/27
**/
public class MainApp {
......
......@@ -3,8 +3,8 @@ package com.dlink.app.constant;
/**
* AppConstant
*
* @author qiwenkai
* @since 2021/10/27 15:24
* @author wenmo
* @since 2021/10/27
**/
public class AppConstant {
......
......@@ -5,8 +5,8 @@ import org.apache.flink.api.java.utils.ParameterTool;
/**
* DBConfig
*
* @author qiwenkai
* @since 2021/10/27 14:46
* @author wenmo
* @since 2021/10/27
**/
public class DBConfig {
......
......@@ -14,8 +14,8 @@ import java.util.Map;
/**
* DBUtil
*
* @author qiwenkai
* @since 2021/10/27 11:25
* @author wenmo
* @since 2021/10/27
**/
public class DBUtil {
......
......@@ -11,8 +11,8 @@ import java.util.List;
/**
* Executor
*
* @author qiwenkai
* @since 2021/10/27 15:52
* @author wenmo
* @since 2021/10/27
**/
public class Executor {
......
......@@ -13,8 +13,8 @@ import java.util.List;
/**
* FlinkSQLFactory
*
* @author qiwenkai
* @since 2021/10/27 11:15
* @author wenmo
* @since 2021/10/27
**/
public class FlinkSQLFactory {
......
......@@ -8,7 +8,7 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.explainer.Explainer;
import com.dlink.gateway.Gateway;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType;
......@@ -21,7 +21,6 @@ import com.dlink.trans.Operations;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import java.time.LocalDateTime;
......
package com.dlink.gateway;
import com.dlink.gateway.config.GatewayConfig;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
package com.dlink.gateway;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import sun.misc.Service;
import java.util.Iterator;
import java.util.Optional;
......@@ -50,6 +50,8 @@ public interface Gateway {
GatewayResult submitJar();
GatewayResult savepoint();
GatewayResult savepointCluster();
GatewayResult savepointJob();
}
package com.dlink.gateway.config;
import com.dlink.assertion.Asserts;
/**
* ActionType
*
* @author wenmo
* @since 2021/11/3 21:58
*/
public enum ActionType{
SAVEPOINT("savepoint"),CANCEL("cancel");
private String value;
ActionType(String value){
this.value = value;
}
public String getValue() {
return value;
}
public static ActionType get(String value){
for (ActionType type : ActionType.values()) {
if(Asserts.isEquals(type.getValue(),value)){
return type;
}
}
return ActionType.SAVEPOINT;
}
}
package com.dlink.gateway.config;
import lombok.Getter;
import lombok.Setter;
/**
* AppConfig
*
* @author wenmo
* @since 2021/11/3 21:55
*/
@Setter
@Getter
public class AppConfig {
private String userJarPath;
private String[] userJarParas;
private String userJarMainAppClass;
public AppConfig() {
}
}
package com.dlink.gateway.config;
import lombok.Getter;
import lombok.Setter;
/**
* ClusterConfig
*
* @author wenmo
* @since 2021/11/3 21:52
*/
@Getter
@Setter
public class ClusterConfig {
private String flinkConfigPath;
private String flinkLibs;
private String yarnConfigPath;
public ClusterConfig() {
}
public ClusterConfig(String flinkConfigPath, String flinkLibs, String yarnConfigPath) {
this.flinkConfigPath = flinkConfigPath;
this.flinkLibs = flinkLibs;
this.yarnConfigPath = yarnConfigPath;
}
public static ClusterConfig build(String flinkConfigPath, String flinkLibs, String yarnConfigPath){
return new ClusterConfig(flinkConfigPath,flinkLibs,yarnConfigPath);
}
}
package com.dlink.gateway.config;
import com.dlink.gateway.ConfigPara;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* FlinkConfig
*
* @author wenmo
* @since 2021/11/3 21:56
*/
@Getter
@Setter
public class FlinkConfig {
private String jobName;
private String jobId;
private ActionType action;
private SavePointType savePointType;
private String savePoint;
private List<ConfigPara> configParas;
public static final String DEFAULT_SAVEPOINT_PREFIX = "hdfs:///flink/savepoints/";
public FlinkConfig() {
}
}
package com.dlink.gateway;
package com.dlink.gateway.config;
import com.dlink.gateway.ConfigPara;
import com.dlink.gateway.GatewayType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -7,9 +9,7 @@ import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* SubmitConfig
......@@ -21,48 +21,49 @@ import java.util.Map;
@Setter
public class GatewayConfig {
private Integer taskId;
private GatewayType type;
private String jobName;
private String flinkConfigPath;
private String userJarPath;
private String[] userJarParas;
private String userJarMainAppClass;
private String savePoint;
private String flinkLibs;
private String yarnConfigPath;
private List<ConfigPara> configParas;
private ClusterConfig clusterConfig;
private FlinkConfig flinkConfig;
private AppConfig appConfig;
private static final ObjectMapper mapper = new ObjectMapper();
public GatewayConfig() {
clusterConfig = new ClusterConfig();
flinkConfig = new FlinkConfig();
appConfig = new AppConfig();
}
public static GatewayConfig build(JsonNode para){
GatewayConfig config = new GatewayConfig();
config.setType(GatewayType.get(para.get("type").asText()));
if(para.has("jobName")) {
config.setJobName(para.get("jobName").asText());
if(para.has("taskId")) {
config.setTaskId(para.get("taskId").asInt());
}
config.setType(GatewayType.get(para.get("type").asText()));
if(para.has("flinkConfigPath")) {
config.setFlinkConfigPath(para.get("flinkConfigPath").asText());
config.getClusterConfig().setFlinkConfigPath(para.get("flinkConfigPath").asText());
}
if(para.has("flinkLibs")) {
config.getClusterConfig().setFlinkLibs(para.get("flinkLibs").asText());
}
if(para.has("yarnConfigPath")) {
config.getClusterConfig().setYarnConfigPath(para.get("yarnConfigPath").asText());
}
if(para.has("jobName")) {
config.getFlinkConfig().setJobName(para.get("jobName").asText());
}
if(para.has("userJarPath")) {
config.setUserJarPath(para.get("userJarPath").asText());
config.getAppConfig().setUserJarPath(para.get("userJarPath").asText());
}
if(para.has("userJarParas")) {
config.setUserJarParas(para.get("userJarParas").asText().split("\\s+"));
config.getAppConfig().setUserJarParas(para.get("userJarParas").asText().split("\\s+"));
}
if(para.has("userJarMainAppClass")) {
config.setUserJarMainAppClass(para.get("userJarMainAppClass").asText());
config.getAppConfig().setUserJarMainAppClass(para.get("userJarMainAppClass").asText());
}
if(para.has("savePoint")) {
config.setSavePoint(para.get("savePoint").asText());
}
if(para.has("flinkLibs")) {
config.setFlinkLibs(para.get("flinkLibs").asText());
}
if(para.has("yarnConfigPath")) {
config.setYarnConfigPath(para.get("yarnConfigPath").asText());
config.getFlinkConfig().setSavePoint(para.get("savePoint").asText());
}
if(para.has("configParas")) {
try {
......@@ -72,7 +73,7 @@ public class GatewayConfig {
configParas.add(new ConfigPara(node.get("key").asText(),node.get("value").asText()));
}
);
config.setConfigParas(configParas);
config.getFlinkConfig().setConfigParas(configParas);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
......@@ -80,19 +81,4 @@ public class GatewayConfig {
return config;
}
@Override
public String toString() {
return "GatewayConfig{" +
"type=" + type +
", jobName='" + jobName + '\'' +
", flinkConfigPath='" + flinkConfigPath + '\'' +
", userJarPath='" + userJarPath + '\'' +
", userJarParas=" + Arrays.toString(userJarParas) +
", userJarMainAppClass='" + userJarMainAppClass + '\'' +
", savePoint='" + savePoint + '\'' +
", flinkLibs='" + flinkLibs + '\'' +
", yarnConfigPath='" + yarnConfigPath + '\'' +
", configParas='" + configParas.toString() + '\'' +
'}';
}
}
package com.dlink.gateway.config;
import com.dlink.assertion.Asserts;
/**
* SavePointType
*
* @author wenmo
* @since 2021/11/3 21:58
*/
public enum SavePointType{
TRIGGER("trigger"),DISPOSE("dispose"),STOP("stop"),CANCEL("cancel");
private String value;
SavePointType(String value){
this.value = value;
}
public String getValue() {
return value;
}
public static SavePointType get(String value){
for (SavePointType type : SavePointType.values()) {
if(Asserts.isEquals(type.getValue(),value)){
return type;
}
}
return SavePointType.TRIGGER;
}
}
package com.dlink.gateway.model;
import lombok.Getter;
import lombok.Setter;
/**
* JobInfo
*
* @author wenmo
* @since 2021/11/3 21:45
*/
@Getter
@Setter
public class JobInfo {
private String jobId;
private String savePoint;
private JobStatus status;
public JobInfo(String jobId) {
this.jobId = jobId;
}
public JobInfo(String jobId, JobStatus status) {
this.jobId = jobId;
this.status = status;
}
public enum JobStatus{
RUN("run"),STOP("stop"),CANCEL("cancel"),FAIL("fail");
private String value;
JobStatus(String value){
this.value = value;
}
}
}
......@@ -9,16 +9,14 @@ import java.time.LocalDateTime;
/**
* AbstractGatewayResult
*
* @author qiwenkai
* @author wenmo
* @since 2021/10/29 15:44
**/
@Setter
@Getter
public abstract class AbstractGatewayResult implements GatewayResult {
protected String jobId;
protected GatewayType type;
protected String savePointPath;
protected LocalDateTime startTime;
protected LocalDateTime endTime;
protected boolean isSuccess;
......@@ -29,9 +27,7 @@ public abstract class AbstractGatewayResult implements GatewayResult {
this.startTime = startTime;
}
public AbstractGatewayResult(String jobId, String savePointPath, LocalDateTime startTime, LocalDateTime endTime, boolean isSuccess, String exceptionMsg) {
this.jobId = jobId;
this.savePointPath = savePointPath;
public AbstractGatewayResult(LocalDateTime startTime, LocalDateTime endTime, boolean isSuccess, String exceptionMsg) {
this.startTime = startTime;
this.endTime = endTime;
this.isSuccess = isSuccess;
......
......@@ -3,7 +3,7 @@ package com.dlink.gateway.result;
/**
* GatewayResult
*
* @author qiwenkai
* @author wenmo
* @since 2021/10/29 15:39
**/
public interface GatewayResult {
......
package com.dlink.gateway.result;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.model.JobInfo;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
import java.util.List;
/**
* TODO
*
* @author wenmo
* @since 2021/11/3 22:20
*/
@Getter
@Setter
public class SavePointResult extends AbstractGatewayResult {
private String appId;
private List<JobInfo> jobInfos;
public SavePointResult(GatewayType type, LocalDateTime startTime) {
super(type, startTime);
}
public SavePointResult(LocalDateTime startTime, LocalDateTime endTime, boolean isSuccess, String exceptionMsg) {
super(startTime, endTime, isSuccess, exceptionMsg);
}
@Override
public String getAppId() {
return appId;
}
public static SavePointResult build(GatewayType type){
return new SavePointResult(type,LocalDateTime.now());
}
}
......@@ -9,8 +9,8 @@ import java.time.LocalDateTime;
/**
* YarnResult
*
* @author qiwenkai
* @since 2021/10/29 15:49
* @author wenmo
* @since 2021/10/29
**/
@Getter
@Setter
......@@ -23,8 +23,8 @@ public class YarnResult extends AbstractGatewayResult {
super(type, startTime);
}
public YarnResult(String appId, String jobId, String savePointPath, LocalDateTime startTime, LocalDateTime endTime, boolean isSuccess, String exceptionMsg) {
super(jobId, savePointPath, startTime, endTime, isSuccess, exceptionMsg);
public YarnResult(String appId, LocalDateTime startTime, LocalDateTime endTime, boolean isSuccess, String exceptionMsg) {
super(startTime, endTime, isSuccess, exceptionMsg);
this.appId = appId;
}
......
package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult;
......@@ -49,15 +50,16 @@ public class YarnApplicationGateway extends YarnGateway {
init();
}
YarnResult result = YarnResult.build(getType());
configuration.set(PipelineOptions.JARS, Collections.singletonList(config.getUserJarPath()));
AppConfig appConfig = config.getAppConfig();
configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
ApplicationConfiguration appConfig = new ApplicationConfiguration(config.getUserJarParas(), config.getUserJarMainAppClass());
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(appConfig.getUserJarParas(), appConfig.getUserJarMainAppClass());
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig);
applicationConfiguration);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
result.setAppId(applicationId.toString());
......
......@@ -3,21 +3,34 @@ package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.AbstractGateway;
import com.dlink.gateway.ConfigPara;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.ActionType;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.GatewayResult;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.YarnResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* YarnSubmiter
......@@ -43,20 +56,20 @@ public abstract class YarnGateway extends AbstractGateway {
}
private void initConfig(){
configuration = GlobalConfiguration.loadConfiguration(config.getFlinkConfigPath());
addConfigParas(config.getConfigParas());
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getFlinkConfigPath());
addConfigParas(config.getFlinkConfig().getConfigParas());
configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
if(Asserts.isNotNullString(config.getSavePoint())) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, config.getSavePoint());
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, config.getFlinkConfig().getSavePoint());
}
configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(config.getFlinkLibs()));
configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getJobName());
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getFlinkConfigPath());
configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(config.getClusterConfig().getFlinkLibs()));
configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getFlinkConfig().getJobName());
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getClusterConfig().getFlinkConfigPath());
}
private void initYarnClient(){
yarnConfiguration = new YarnConfiguration();
yarnConfiguration.addResource( new Path( config.getYarnConfigPath() ) );
yarnConfiguration.addResource( new Path( config.getClusterConfig().getYarnConfigPath() ) );
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
......@@ -70,7 +83,106 @@ public abstract class YarnGateway extends AbstractGateway {
}
}
public GatewayResult savepoint(){
public GatewayResult savepointCluster(){
if(Asserts.isNull(yarnClient)){
init();
}
SavePointResult result = SavePointResult.build(getType());
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (applicationId == null){
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
YarnClusterDescriptor clusterDescriptor = clusterClientFactory
.createClusterDescriptor(
configuration);
try(ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()){
List<JobInfo> jobInfos = new ArrayList<>();
CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs();
for( JobStatusMessage jobStatusMessage: listJobsFuture.get()){
JobInfo jobInfo = new JobInfo(jobStatusMessage.getJobId().toHexString());
jobInfo.setStatus(JobInfo.JobStatus.RUN);
jobInfos.add(jobInfo);
}
runSavePointJob(jobInfos,clusterClient);
result.setJobInfos(jobInfos);
}catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
result.fail(e.getMessage());
}
return result;
}
public GatewayResult savepointJob(){
if(Asserts.isNull(yarnClient)){
init();
}
if(Asserts.isNull(config.getFlinkConfig().getJobId())){
throw new GatewayException(
"No job id was specified. Please specify a job to which you would like to savepont.");
}
SavePointResult result = SavePointResult.build(getType());
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(applicationId)){
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
YarnClusterDescriptor clusterDescriptor = clusterClientFactory
.createClusterDescriptor(
configuration);
try(ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()){
List<JobInfo> jobInfos = new ArrayList<>();
jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(),JobInfo.JobStatus.FAIL));
runSavePointJob(jobInfos,clusterClient);
result.setJobInfos(jobInfos);
}catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
result.fail(e.getMessage());
}
return result;
}
private void runSavePointJob(List<JobInfo> jobInfos,ClusterClient<ApplicationId> clusterClient) throws Exception{
String savePoint = FlinkConfig.DEFAULT_SAVEPOINT_PREFIX;
if(Asserts.isNotNull(config.getTaskId())){
savePoint = savePoint + config.getTaskId();
}
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())){
savePoint = config.getFlinkConfig().getSavePoint();
}
for( JobInfo jobInfo: jobInfos){
if(ActionType.CANCEL== config.getFlinkConfig().getAction()){
clusterClient.cancel(JobID.fromHexString(jobInfo.getJobId()));
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
continue;
}
switch (config.getFlinkConfig().getSavePointType()){
case TRIGGER:
CompletableFuture<String> triggerFuture = clusterClient.triggerSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setSavePoint(triggerFuture.get());
break;
case DISPOSE:
clusterClient.disposeSavepoint(savePoint);
break;
case STOP:
CompletableFuture<String> stopFuture = clusterClient.stopWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), true, savePoint);
jobInfo.setStatus(JobInfo.JobStatus.STOP);
jobInfo.setSavePoint(stopFuture.get());
break;
case CANCEL:
CompletableFuture<String> cancelFuture = clusterClient.cancelWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
jobInfo.setSavePoint(cancelFuture.get());
break;
default:
}
}
}
}
package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
......
package com.dlink.gateway;
import org.junit.Test;
/**
* GatewayTest
*
* @author qiwenkai
* @since 2021/10/29 17:06
* @author wenmo
* @since 2021/10/29
**/
public class GatewayTest {
@Test
public void getTest(){
GatewayConfig config = new GatewayConfig();
config.setJobName("apptest");
config.setType(GatewayType.get("yarn-application"));
config.setFlinkConfigPath("/opt/src/flink-1.12.2_pj/conf");
config.setUserJarPath("hdfs:///flink12/jar/currencyAppJar.jar");
config.setUserJarParas("--id 2410,2412,2411".split("\\s+"));
config.setUserJarMainAppClass("com.app.MainApp");
String longValue = Gateway.build(config).getType().getLongValue();
System.out.println(longValue);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment