Commit c1ce6abb authored by wenmo's avatar wenmo

任务提交成功后保存多任务实例

parent a0e43a64
...@@ -3,11 +3,13 @@ package com.dlink.job; ...@@ -3,11 +3,13 @@ package com.dlink.job;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.context.SpringContextUtils; import com.dlink.context.SpringContextUtils;
import com.dlink.gateway.GatewayType;
import com.dlink.model.Cluster; import com.dlink.model.Cluster;
import com.dlink.model.History; import com.dlink.model.History;
import com.dlink.model.JobInstance;
import com.dlink.model.JobStatus;
import com.dlink.service.ClusterService; import com.dlink.service.ClusterService;
import com.dlink.service.HistoryService; import com.dlink.service.HistoryService;
import com.dlink.service.JobInstanceService;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import java.time.LocalDateTime; import java.time.LocalDateTime;
...@@ -23,10 +25,12 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -23,10 +25,12 @@ public class Job2MysqlHandler implements JobHandler {
private static HistoryService historyService; private static HistoryService historyService;
private static ClusterService clusterService; private static ClusterService clusterService;
private static JobInstanceService jobInstanceService;
static { static {
historyService = SpringContextUtils.getBean("historyServiceImpl",HistoryService.class); historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class);
clusterService = SpringContextUtils.getBean("clusterServiceImpl",ClusterService.class); clusterService = SpringContextUtils.getBean("clusterServiceImpl", ClusterService.class);
jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class);
} }
@Override @Override
...@@ -34,9 +38,9 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -34,9 +38,9 @@ public class Job2MysqlHandler implements JobHandler {
Job job = JobContextHolder.getJob(); Job job = JobContextHolder.getJob();
History history = new History(); History history = new History();
history.setType(job.getType().getLongValue()); history.setType(job.getType().getLongValue());
if(job.isUseGateway()) { if (job.isUseGateway()) {
history.setClusterConfigurationId(job.getJobConfig().getClusterConfigurationId()); history.setClusterConfigurationId(job.getJobConfig().getClusterConfigurationId());
}else{ } else {
history.setClusterId(job.getJobConfig().getClusterId()); history.setClusterId(job.getJobConfig().getClusterId());
} }
history.setJobManagerAddress(job.getJobManagerAddress()); history.setJobManagerAddress(job.getJobManagerAddress());
...@@ -67,24 +71,38 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -67,24 +71,38 @@ public class Job2MysqlHandler implements JobHandler {
Job job = JobContextHolder.getJob(); Job job = JobContextHolder.getJob();
History history = new History(); History history = new History();
history.setId(job.getId()); history.setId(job.getId());
if(job.isUseGateway()&&Asserts.isNullString(job.getJobId())){ if (job.isUseGateway() && Asserts.isNullString(job.getJobId())) {
job.setJobId("unknown-"+LocalDateTime.now().toString()); job.setJobId("unknown-" + LocalDateTime.now().toString());
} }
history.setJobId(job.getJobId()); history.setJobId(job.getJobId());
history.setStatus(job.getStatus().ordinal()); history.setStatus(job.getStatus().ordinal());
history.setEndTime(job.getEndTime()); history.setEndTime(job.getEndTime());
if(job.isUseGateway()){ if (job.isUseGateway()) {
history.setJobManagerAddress(job.getJobManagerAddress()); history.setJobManagerAddress(job.getJobManagerAddress());
} }
if(job.isUseGateway()){ Integer clusterId = job.getJobConfig().getClusterId();
if (job.isUseGateway()) {
Cluster cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(), Cluster cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(),
job.getJobId(),job.getJobConfig().getJobName()+ LocalDateTime.now(), job.getType().getLongValue(), job.getJobId(), job.getJobConfig().getJobName() + LocalDateTime.now(), job.getType().getLongValue(),
job.getJobConfig().getClusterConfigurationId(),job.getJobConfig().getTaskId())); job.getJobConfig().getClusterConfigurationId(), job.getJobConfig().getTaskId()));
if(Asserts.isNotNull(cluster)){ if (Asserts.isNotNull(cluster)) {
history.setClusterId(cluster.getId()); clusterId = cluster.getId();
} }
} }
history.setClusterId(clusterId);
historyService.updateById(history); historyService.updateById(history);
if (Asserts.isNotNullCollection(job.getJids())) {
for (String jid : job.getJids()) {
JobInstance jobInstance = history.buildJobInstance();
jobInstance.setHistoryId(job.getId());
jobInstance.setClusterId(clusterId);
jobInstance.setTaskId(job.getJobConfig().getTaskId());
jobInstance.setName(job.getJobConfig().getJobName());
jobInstance.setJid(jid);
jobInstance.setStatus(JobStatus.INITIALIZING.getValue());
jobInstanceService.save(jobInstance);
}
}
return true; return true;
} }
......
...@@ -44,4 +44,13 @@ public class History implements Serializable { ...@@ -44,4 +44,13 @@ public class History implements Serializable {
private String clusterAlias; private String clusterAlias;
@TableField(exist = false) @TableField(exist = false)
private String taskAlias; private String taskAlias;
public JobInstance buildJobInstance() {
JobInstance jobInstance = new JobInstance();
jobInstance.setHistoryId(id);
jobInstance.setClusterId(clusterId);
jobInstance.setTaskId(taskId);
jobInstance.setName(jobName);
return jobInstance;
}
} }
package com.dlink.model; package com.dlink.model;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
...@@ -25,10 +27,12 @@ public class JobInstance implements Serializable { ...@@ -25,10 +27,12 @@ public class JobInstance implements Serializable {
private Integer taskId; private Integer taskId;
private Integer clusterId; private Integer clusterId;
private String jid; private String jid;
private Integer status; private String status;
private Integer historyId; private Integer historyId;
private String error; private String error;
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime; private LocalDateTime createTime;
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime; private LocalDateTime updateTime;
private LocalDateTime finishTime; private LocalDateTime finishTime;
private Integer failed_restart_count; private Integer failed_restart_count;
......
package com.dlink.model;
/**
* JobState
*
* @author wenmo
* @since 2022/2/22 14:29
**/
public enum JobStatus {
/**
* The job has been received by the Dispatcher, and is waiting for the job manager to receive
* leadership and to be created.
*/
INITIALIZING("INITIALIZING"),
/**
* Job is newly created, no task has started to run.
*/
CREATED("CREATED"),
/**
* Some tasks are scheduled or running, some may be pending, some may be finished.
*/
RUNNING("RUNNING"),
/**
* The job has failed and is currently waiting for the cleanup to complete.
*/
FAILING("FAILING"),
/**
* The job has failed with a non-recoverable task failure.
*/
FAILED("FAILED"),
/**
* Job is being cancelled.
*/
CANCELLING("CANCELLING"),
/**
* Job has been cancelled.
*/
CANCELED("CANCELED"),
/**
* All of the job's tasks have successfully finished.
*/
FINISHED("FINISHED"),
/**
* The job is currently undergoing a reset and total restart.
*/
RESTARTING("RESTARTING"),
/**
* The job has been suspended which means that it has been stopped but not been removed from a
* potential HA job store.
*/
SUSPENDED("SUSPENDED"),
/**
* The job is currently reconciling and waits for task execution report to recover state.
*/
RECONCILING("RECONCILING"),
/**
* The job can't get any info.
*/
UNKNOWN("UNKNOWN");
private String value;
JobStatus(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}
...@@ -3,13 +3,12 @@ package com.dlink.job; ...@@ -3,13 +3,12 @@ package com.dlink.job;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
import com.dlink.parser.SqlType;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List;
/** /**
* Job * Job
...@@ -34,6 +33,7 @@ public class Job { ...@@ -34,6 +33,7 @@ public class Job {
private LocalDateTime endTime; private LocalDateTime endTime;
private Executor executor; private Executor executor;
private boolean useGateway; private boolean useGateway;
private List<String> jids;
public enum JobStatus { public enum JobStatus {
INITIALIZE, INITIALIZE,
......
This diff is collapsed.
This diff is collapsed.
...@@ -6,7 +6,7 @@ package com.dlink.parser; ...@@ -6,7 +6,7 @@ package com.dlink.parser;
* @author wenmo * @author wenmo
* @since 2021/7/3 11:11 * @since 2021/7/3 11:11
*/ */
public enum SqlType { public enum SqlType {
SELECT("SELECT"), SELECT("SELECT"),
CREATE("CREATE"), CREATE("CREATE"),
DROP("DROP"), DROP("DROP"),
...@@ -39,7 +39,15 @@ public enum SqlType { ...@@ -39,7 +39,15 @@ public enum SqlType {
return type; return type;
} }
public boolean equalsValue(String value){ public boolean equalsValue(String value) {
return type.equalsIgnoreCase(value); return type.equalsIgnoreCase(value);
} }
public boolean isInsert() {
if (type.equals("INSERT")) {
return true;
}
return false;
}
} }
...@@ -10,15 +10,14 @@ import com.dlink.assertion.Asserts; ...@@ -10,15 +10,14 @@ import com.dlink.assertion.Asserts;
**/ **/
public enum GatewayType { public enum GatewayType {
LOCAL("l","local"),STANDALONE("s","standalone"), LOCAL("l", "local"), STANDALONE("s", "standalone"),
YARN_SESSION("ys","yarn-session"),YARN_APPLICATION("ya","yarn-application"), YARN_SESSION("ys", "yarn-session"), YARN_APPLICATION("ya", "yarn-application"),
YARN_PER_JOB("ypj","yarn-per-job"),KUBERNETES_SESSION("ks","kubernetes-session") YARN_PER_JOB("ypj", "yarn-per-job"), KUBERNETES_SESSION("ks", "kubernetes-session"), KUBERNETES_APPLICATION("ka", "kubernetes-application");
,KUBERNETES_APPLICATION("ka","kubernetes-application");
private String value; private String value;
private String longValue; private String longValue;
GatewayType(String value, String longValue){ GatewayType(String value, String longValue) {
this.value = value; this.value = value;
this.longValue = longValue; this.longValue = longValue;
} }
...@@ -31,24 +30,24 @@ public enum GatewayType { ...@@ -31,24 +30,24 @@ public enum GatewayType {
return longValue; return longValue;
} }
public static GatewayType get(String value){ public static GatewayType get(String value) {
for (GatewayType type : GatewayType.values()) { for (GatewayType type : GatewayType.values()) {
if(Asserts.isEquals(type.getValue(),value)||Asserts.isEquals(type.getLongValue(),value)){ if (Asserts.isEquals(type.getValue(), value) || Asserts.isEquals(type.getLongValue(), value)) {
return type; return type;
} }
} }
return GatewayType.YARN_APPLICATION; return GatewayType.YARN_APPLICATION;
} }
public boolean equalsValue(String type){ public boolean equalsValue(String type) {
if(Asserts.isEquals(value,type)||Asserts.isEquals(longValue,type)){ if (Asserts.isEquals(value, type) || Asserts.isEquals(longValue, type)) {
return true; return true;
} }
return false; return false;
} }
public static boolean isDeployCluster(String type){ public static boolean isDeployCluster(String type) {
switch (get(type)){ switch (get(type)) {
case YARN_APPLICATION: case YARN_APPLICATION:
case YARN_PER_JOB: case YARN_PER_JOB:
case KUBERNETES_APPLICATION: case KUBERNETES_APPLICATION:
...@@ -58,8 +57,8 @@ public enum GatewayType { ...@@ -58,8 +57,8 @@ public enum GatewayType {
} }
} }
public boolean isDeployCluster(){ public boolean isDeployCluster() {
switch (value){ switch (value) {
case "ya": case "ya":
case "ypj": case "ypj":
case "ka": case "ka":
...@@ -68,4 +67,14 @@ public enum GatewayType { ...@@ -68,4 +67,14 @@ public enum GatewayType {
return false; return false;
} }
} }
public boolean isApplicationMode() {
switch (value) {
case "ya":
case "ka":
return true;
default:
return false;
}
}
} }
...@@ -13,9 +13,13 @@ import org.apache.flink.client.program.ClusterClient; ...@@ -13,9 +13,13 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor; import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
/** /**
* KubernetesApplicationGateway * KubernetesApplicationGateway
...@@ -36,7 +40,7 @@ public class KubernetesApplicationGateway extends KubernetesGateway { ...@@ -36,7 +40,7 @@ public class KubernetesApplicationGateway extends KubernetesGateway {
@Override @Override
public GatewayResult submitJar() { public GatewayResult submitJar() {
if(Asserts.isNull(client)){ if (Asserts.isNull(client)) {
init(); init();
} }
KubernetesResult result = KubernetesResult.build(getType()); KubernetesResult result = KubernetesResult.build(getType());
...@@ -45,16 +49,24 @@ public class KubernetesApplicationGateway extends KubernetesGateway { ...@@ -45,16 +49,24 @@ public class KubernetesApplicationGateway extends KubernetesGateway {
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(appConfig.getUserJarParas(), appConfig.getUserJarMainAppClass()); ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(appConfig.getUserJarParas(), appConfig.getUserJarMainAppClass());
KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration, client); KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration, client);
try{ try {
ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration); ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient(); ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient();
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
if (jobStatusMessages.size() > 0) {
List<String> jids = new ArrayList<>();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jids.add(jobStatusMessage.getJobId().toHexString());
}
result.setJids(jids);
}
String clusterId = clusterClient.getClusterId(); String clusterId = clusterClient.getClusterId();
result.setClusterId(clusterId); result.setClusterId(clusterId);
result.setWebURL(clusterClient.getWebInterfaceURL()); result.setWebURL(clusterClient.getWebInterfaceURL());
result.success(); result.success();
}catch (Exception e){ } catch (Exception e) {
result.fail(LogUtil.getError(e)); result.fail(LogUtil.getError(e));
}finally { } finally {
kubernetesClusterDescriptor.close(); kubernetesClusterDescriptor.close();
} }
return result; return result;
......
package com.dlink.gateway.result; package com.dlink.gateway.result;
import java.util.List;
/** /**
* GatewayResult * GatewayResult
* *
...@@ -11,4 +13,6 @@ public interface GatewayResult { ...@@ -11,4 +13,6 @@ public interface GatewayResult {
String getAppId(); String getAppId();
String getWebURL(); String getWebURL();
List<String> getJids();
} }
...@@ -3,6 +3,7 @@ package com.dlink.gateway.result; ...@@ -3,6 +3,7 @@ package com.dlink.gateway.result;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List;
/** /**
* KubernetesResult * KubernetesResult
...@@ -13,6 +14,7 @@ import java.time.LocalDateTime; ...@@ -13,6 +14,7 @@ import java.time.LocalDateTime;
public class KubernetesResult extends AbstractGatewayResult { public class KubernetesResult extends AbstractGatewayResult {
private String clusterId; private String clusterId;
private String webURL; private String webURL;
private List<String> jids;
public KubernetesResult(GatewayType type, LocalDateTime startTime) { public KubernetesResult(GatewayType type, LocalDateTime startTime) {
super(type, startTime); super(type, startTime);
...@@ -44,6 +46,15 @@ public class KubernetesResult extends AbstractGatewayResult { ...@@ -44,6 +46,15 @@ public class KubernetesResult extends AbstractGatewayResult {
return webURL; return webURL;
} }
@Override
public List<String> getJids() {
return jids;
}
public void setJids(List<String> jids) {
this.jids = jids;
}
public static KubernetesResult build(GatewayType type){ public static KubernetesResult build(GatewayType type){
return new KubernetesResult(type,LocalDateTime.now()); return new KubernetesResult(type,LocalDateTime.now());
} }
......
...@@ -38,6 +38,11 @@ public class SavePointResult extends AbstractGatewayResult { ...@@ -38,6 +38,11 @@ public class SavePointResult extends AbstractGatewayResult {
return null; return null;
} }
@Override
public List<String> getJids() {
return null;
}
public static SavePointResult build(GatewayType type){ public static SavePointResult build(GatewayType type){
return new SavePointResult(type,LocalDateTime.now()); return new SavePointResult(type,LocalDateTime.now());
} }
......
...@@ -5,6 +5,7 @@ import lombok.Getter; ...@@ -5,6 +5,7 @@ import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List;
/** /**
* YarnResult * YarnResult
...@@ -16,6 +17,7 @@ public class YarnResult extends AbstractGatewayResult { ...@@ -16,6 +17,7 @@ public class YarnResult extends AbstractGatewayResult {
private String appId; private String appId;
private String webURL; private String webURL;
private List<String> jids;
public YarnResult(GatewayType type, LocalDateTime startTime) { public YarnResult(GatewayType type, LocalDateTime startTime) {
super(type, startTime); super(type, startTime);
...@@ -42,6 +44,14 @@ public class YarnResult extends AbstractGatewayResult { ...@@ -42,6 +44,14 @@ public class YarnResult extends AbstractGatewayResult {
this.webURL = webURL; this.webURL = webURL;
} }
public List<String> getJids() {
return jids;
}
public void setJids(List<String> jids) {
this.jids = jids;
}
public static YarnResult build(GatewayType type){ public static YarnResult build(GatewayType type){
return new YarnResult(type,LocalDateTime.now()); return new YarnResult(type,LocalDateTime.now());
} }
......
...@@ -13,12 +13,16 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration; ...@@ -13,12 +13,16 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
/** /**
* YarnApplicationGateway * YarnApplicationGateway
...@@ -44,10 +48,10 @@ public class YarnApplicationGateway extends YarnGateway { ...@@ -44,10 +48,10 @@ public class YarnApplicationGateway extends YarnGateway {
public GatewayResult submitJobGraph(JobGraph jobGraph) { public GatewayResult submitJobGraph(JobGraph jobGraph) {
throw new GatewayException("Couldn't deploy Yarn Application Cluster with job graph."); throw new GatewayException("Couldn't deploy Yarn Application Cluster with job graph.");
} }
@Override @Override
public GatewayResult submitJar() { public GatewayResult submitJar() {
if(Asserts.isNull(yarnClient)){ if (Asserts.isNull(yarnClient)) {
init(); init();
} }
YarnResult result = YarnResult.build(getType()); YarnResult result = YarnResult.build(getType());
...@@ -62,13 +66,21 @@ public class YarnApplicationGateway extends YarnGateway { ...@@ -62,13 +66,21 @@ public class YarnApplicationGateway extends YarnGateway {
clusterSpecification, clusterSpecification,
applicationConfiguration); applicationConfiguration);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
if (jobStatusMessages.size() > 0) {
List<String> jids = new ArrayList<>();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jids.add(jobStatusMessage.getJobId().toHexString());
}
result.setJids(jids);
}
ApplicationId applicationId = clusterClient.getClusterId(); ApplicationId applicationId = clusterClient.getClusterId();
result.setAppId(applicationId.toString()); result.setAppId(applicationId.toString());
result.setWebURL(clusterClient.getWebInterfaceURL()); result.setWebURL(clusterClient.getWebInterfaceURL());
result.success(); result.success();
}catch (Exception e){ } catch (Exception e) {
result.fail(LogUtil.getError(e)); result.fail(LogUtil.getError(e));
}finally { } finally {
yarnClusterDescriptor.close(); yarnClusterDescriptor.close();
} }
return result; return result;
......
package com.dlink.gateway.yarn; package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException; import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult; import com.dlink.gateway.result.YarnResult;
...@@ -10,11 +10,16 @@ import com.dlink.utils.LogUtil; ...@@ -10,11 +10,16 @@ import com.dlink.utils.LogUtil;
import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/** /**
* YarnApplicationGateway * YarnApplicationGateway
* *
...@@ -37,7 +42,7 @@ public class YarnPerJobGateway extends YarnGateway { ...@@ -37,7 +42,7 @@ public class YarnPerJobGateway extends YarnGateway {
@Override @Override
public GatewayResult submitJobGraph(JobGraph jobGraph) { public GatewayResult submitJobGraph(JobGraph jobGraph) {
if(Asserts.isNull(yarnClient)){ if (Asserts.isNull(yarnClient)) {
init(); init();
} }
YarnResult result = YarnResult.build(getType()); YarnResult result = YarnResult.build(getType());
...@@ -45,15 +50,23 @@ public class YarnPerJobGateway extends YarnGateway { ...@@ -45,15 +50,23 @@ public class YarnPerJobGateway extends YarnGateway {
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true); configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try { try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification,jobGraph,false); ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, false);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId(); ApplicationId applicationId = clusterClient.getClusterId();
result.setAppId(applicationId.toString()); result.setAppId(applicationId.toString());
result.setWebURL(clusterClient.getWebInterfaceURL()); result.setWebURL(clusterClient.getWebInterfaceURL());
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
if (jobStatusMessages.size() > 0) {
List<String> jids = new ArrayList<>();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jids.add(jobStatusMessage.getJobId().toHexString());
}
result.setJids(jids);
}
result.success(); result.success();
}catch (Exception e){ } catch (Exception e) {
result.fail(LogUtil.getError(e)); result.fail(LogUtil.getError(e));
}finally { } finally {
yarnClusterDescriptor.close(); yarnClusterDescriptor.close();
} }
return result; return result;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment