Commit 18208b09 authored by wenmo's avatar wenmo

新增 作业上下线自动提交和停止任务

parent d51a3ede
...@@ -143,15 +143,15 @@ public class TaskController { ...@@ -143,15 +143,15 @@ public class TaskController {
*/ */
@GetMapping(value = "/onLineTask") @GetMapping(value = "/onLineTask")
public Result onLineTask(@RequestParam Integer id) { public Result onLineTask(@RequestParam Integer id) {
return Result.succeed(taskService.onLineTask(id),"操作成功"); return taskService.onLineTask(id);
} }
/** /**
* 下线任务 * 下线任务
*/ */
@GetMapping(value = "/offLineTask") @GetMapping(value = "/offLineTask")
public Result offLineTask(@RequestParam Integer id) { public Result offLineTask(@RequestParam Integer id,@RequestParam String type) {
return Result.succeed(taskService.offLineTask(id),"操作成功"); return taskService.offLineTask(id,type);
} }
/** /**
......
...@@ -74,6 +74,7 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -74,6 +74,7 @@ public class Job2MysqlHandler implements JobHandler {
@Override @Override
public boolean success() { public boolean success() {
Job job = JobContextHolder.getJob(); Job job = JobContextHolder.getJob();
Integer taskId = job.getJobConfig().getTaskId();
History history = new History(); History history = new History();
history.setId(job.getId()); history.setId(job.getId());
if (job.isUseGateway() && Asserts.isNullString(job.getJobId())) { if (job.isUseGateway() && Asserts.isNullString(job.getJobId())) {
...@@ -96,7 +97,7 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -96,7 +97,7 @@ public class Job2MysqlHandler implements JobHandler {
if (job.isUseGateway()) { if (job.isUseGateway()) {
cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(), cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(),
job.getJobId(), job.getJobConfig().getJobName() + LocalDateTime.now(), job.getType().getLongValue(), job.getJobId(), job.getJobConfig().getJobName() + LocalDateTime.now(), job.getType().getLongValue(),
job.getJobConfig().getClusterConfigurationId(), job.getJobConfig().getTaskId())); job.getJobConfig().getClusterConfigurationId(), taskId));
if (Asserts.isNotNull(cluster)) { if (Asserts.isNotNull(cluster)) {
clusterId = cluster.getId(); clusterId = cluster.getId();
} }
...@@ -118,13 +119,15 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -118,13 +119,15 @@ public class Job2MysqlHandler implements JobHandler {
JobInstance jobInstance = history.buildJobInstance(); JobInstance jobInstance = history.buildJobInstance();
jobInstance.setHistoryId(job.getId()); jobInstance.setHistoryId(job.getId());
jobInstance.setClusterId(clusterId); jobInstance.setClusterId(clusterId);
jobInstance.setTaskId(job.getJobConfig().getTaskId()); jobInstance.setTaskId(taskId);
jobInstance.setName(job.getJobConfig().getJobName()); jobInstance.setName(job.getJobConfig().getJobName());
jobInstance.setJid(jid); jobInstance.setJid(jid);
jobInstance.setStep(job.getJobConfig().getStep());
jobInstance.setStatus(JobStatus.INITIALIZING.getValue()); jobInstance.setStatus(JobStatus.INITIALIZING.getValue());
jobInstanceService.save(jobInstance); jobInstanceService.save(jobInstance);
job.setJobInstanceId(jobInstance.getId());
Task task = new Task(); Task task = new Task();
task.setId(jobInstance.getTaskId()); task.setId(taskId);
task.setJobInstanceId(jobInstance.getId()); task.setJobInstanceId(jobInstance.getId());
taskService.updateById(task); taskService.updateById(task);
JobHistory jobHistory = new JobHistory(); JobHistory jobHistory = new JobHistory();
......
...@@ -31,6 +31,8 @@ public class JobInstance implements Serializable { ...@@ -31,6 +31,8 @@ public class JobInstance implements Serializable {
private Integer taskId; private Integer taskId;
private Integer step;
private Integer clusterId; private Integer clusterId;
private String jid; private String jid;
......
...@@ -104,7 +104,7 @@ public class Task extends SuperEntity { ...@@ -104,7 +104,7 @@ public class Task extends SuperEntity {
for (Map<String, String> item : config) { for (Map<String, String> item : config) {
map.put(item.get("key"), item.get("value")); map.put(item.get("key"), item.get("value"));
} }
return new JobConfig(type, false, false, useRemote, clusterId, clusterConfigurationId, jarId, getId(), return new JobConfig(type, step, false, false, useRemote, clusterId, clusterConfigurationId, jarId, getId(),
alias, fragment, statementSet, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map); alias, fragment, statementSet, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map);
} }
......
...@@ -22,6 +22,8 @@ public interface TaskService extends ISuperService<Task> { ...@@ -22,6 +22,8 @@ public interface TaskService extends ISuperService<Task> {
JobResult submitTask(Integer id); JobResult submitTask(Integer id);
JobResult submitTaskToOnline(Integer id);
JobResult restartTask(Integer id); JobResult restartTask(Integer id);
List<SqlExplainResult> explainTask(Integer id); List<SqlExplainResult> explainTask(Integer id);
...@@ -40,9 +42,9 @@ public interface TaskService extends ISuperService<Task> { ...@@ -40,9 +42,9 @@ public interface TaskService extends ISuperService<Task> {
boolean developTask(Integer id); boolean developTask(Integer id);
boolean onLineTask(Integer id); Result onLineTask(Integer id);
boolean offLineTask(Integer id); Result offLineTask(Integer id, String type);
boolean cancelTask(Integer id); boolean cancelTask(Integer id);
......
...@@ -19,6 +19,7 @@ import com.dlink.gateway.config.SavePointStrategy; ...@@ -19,6 +19,7 @@ import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.gateway.config.SavePointType; import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.job.Job;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import com.dlink.job.JobManager; import com.dlink.job.JobManager;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
...@@ -99,11 +100,29 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -99,11 +100,29 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
} }
@Override
public JobResult submitTaskToOnline(Integer id) {
Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
task.setStep(JobLifeCycle.ONLINE.getValue());
if (Dialect.isSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
}
JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config);
if (!config.isJarTask()) {
return jobManager.executeSql(task.getStatement());
} else {
return jobManager.executeJar();
}
}
@Override @Override
public JobResult restartTask(Integer id) { public JobResult restartTask(Integer id) {
Task task = this.getTaskInfoById(id); Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST); Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if(Asserts.isNotNull(task.getJobInstanceId())&&task.getJobInstanceId()!=0){ if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) {
savepointTask(task, SavePointType.CANCEL.getValue()); savepointTask(task, SavePointType.CANCEL.getValue());
} }
if (Dialect.isSql(task.getDialect())) { if (Dialect.isSql(task.getDialect())) {
...@@ -291,15 +310,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -291,15 +310,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Assert.check(task); Assert.check(task);
if (JobLifeCycle.DEVELOP.equalsValue(task.getStep())) { if (JobLifeCycle.DEVELOP.equalsValue(task.getStep())) {
List<SqlExplainResult> sqlExplainResults = explainTask(id); List<SqlExplainResult> sqlExplainResults = explainTask(id);
for(SqlExplainResult sqlExplainResult: sqlExplainResults){ for (SqlExplainResult sqlExplainResult : sqlExplainResults) {
if(!sqlExplainResult.isParseTrue()||!sqlExplainResult.isExplainTrue()){ if (!sqlExplainResult.isParseTrue() || !sqlExplainResult.isExplainTrue()) {
return Result.failed("语法校验和逻辑检查有误,发布失败"); return Result.failed("语法校验和逻辑检查有误,发布失败");
} }
} }
task.setStep(JobLifeCycle.RELEASE.getValue()); task.setStep(JobLifeCycle.RELEASE.getValue());
if(updateById(task)){ if (updateById(task)) {
return Result.succeed("发布成功"); return Result.succeed("发布成功");
}else { } else {
return Result.failed("由于未知原因,发布失败"); return Result.failed("由于未知原因,发布失败");
} }
} }
...@@ -318,25 +337,49 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -318,25 +337,49 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
@Override @Override
public boolean onLineTask(Integer id) { public Result onLineTask(Integer id) {
Task task = getById(id); Task task = getById(id);
Assert.check(task); Assert.check(task);
if (JobLifeCycle.RELEASE.equalsValue(task.getStep())) { if (JobLifeCycle.RELEASE.equalsValue(task.getStep())) {
task.setStep(JobLifeCycle.ONLINE.getValue()); if(Asserts.isNotNull(task.getJobInstanceId())&&task.getJobInstanceId()!=0){
return updateById(task); return Result.failed("当前发布状态下有作业正在运行,上线失败,请停止后上线");
}
JobResult jobResult = submitTaskToOnline(id);
if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
task.setStep(JobLifeCycle.ONLINE.getValue());
task.setJobInstanceId(jobResult.getJobInstanceId());
if (updateById(task)) {
return Result.succeed("上线成功");
} else {
return Result.failed("由于未知原因,上线失败");
}
} else {
return Result.failed("上线失败,原因:" + jobResult.getError());
}
} }
return false; return Result.failed("上线失败,作业不存在。");
} }
@Override @Override
public boolean offLineTask(Integer id) { public Result offLineTask(Integer id, String type) {
Task task = getById(id); Task task = getById(id);
Assert.check(task); Assert.check(task);
if (JobLifeCycle.ONLINE.equalsValue(task.getStep())) { if (Asserts.isNullString(type)) {
type = SavePointType.CANCEL.getValue();
}
if (savepointTask(id, type)) {
if(!JobLifeCycle.ONLINE.equalsValue(task.getStep())){
return Result.succeed("停止成功");
}
task.setStep(JobLifeCycle.RELEASE.getValue()); task.setStep(JobLifeCycle.RELEASE.getValue());
return updateById(task); if (updateById(task)) {
return Result.succeed("下线成功");
} else {
return Result.failed("由于未知原因,下线失败");
}
} else {
return Result.failed("SavePoint失败,下线失败");
} }
return false;
} }
@Override @Override
...@@ -367,7 +410,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -367,7 +410,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Asserts.checkNotNull(cluster, "该集群不存在"); Asserts.checkNotNull(cluster, "该集群不存在");
Asserts.checkNotNull(task.getJobInstanceId(), "无任务需要SavePoint"); Asserts.checkNotNull(task.getJobInstanceId(), "无任务需要SavePoint");
JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId()); JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId());
Asserts.checkNotNull(jobInstance, "任务实例不存在"); if(Asserts.isNull(jobInstance)){
return true;
}
String jobId = jobInstance.getJid(); String jobId = jobInstance.getJid();
boolean useGateway = false; boolean useGateway = false;
JobConfig jobConfig = new JobConfig(); JobConfig jobConfig = new JobConfig();
...@@ -382,10 +427,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -382,10 +427,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
jobConfig.setTaskId(task.getId()); jobConfig.setTaskId(task.getId());
JobManager jobManager = JobManager.build(jobConfig); JobManager jobManager = JobManager.build(jobConfig);
jobManager.setUseGateway(useGateway); jobManager.setUseGateway(useGateway);
if("canceljob".equals(savePointType)){
return jobManager.cancel(jobId);
}
SavePointResult savePointResult = jobManager.savepoint(jobId, savePointType, null); SavePointResult savePointResult = jobManager.savepoint(jobId, savePointType, null);
if (Asserts.isNotNull(savePointResult)) { if (Asserts.isNotNull(savePointResult)) {
for (JobInfo item : savePointResult.getJobInfos()) { for (JobInfo item : savePointResult.getJobInfos()) {
if (Asserts.isEqualsIgnoreCase(jobId, item.getJobId())&&Asserts.isNotNull(jobConfig.getTaskId())) { if (Asserts.isEqualsIgnoreCase(jobId, item.getJobId()) && Asserts.isNotNull(jobConfig.getTaskId())) {
Savepoints savepoints = new Savepoints(); Savepoints savepoints = new Savepoints();
savepoints.setName(savePointType); savepoints.setName(savePointType);
savepoints.setType(savePointType); savepoints.setType(savePointType);
...@@ -401,7 +449,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -401,7 +449,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override @Override
public boolean savepointTask(Integer taskId, String savePointType) { public boolean savepointTask(Integer taskId, String savePointType) {
return savepointTask(getById(taskId),savePointType); return savepointTask(getById(taskId), savePointType);
} }
private JobConfig buildJobConfig(Task task) { private JobConfig buildJobConfig(Task task) {
...@@ -466,20 +514,20 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -466,20 +514,20 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
public JobInstance refreshJobInstance(Integer id) { public JobInstance refreshJobInstance(Integer id) {
JobInstance jobInstance = jobInstanceService.getById(id); JobInstance jobInstance = jobInstanceService.getById(id);
Asserts.checkNull(jobInstance, "该任务实例不存在"); Asserts.checkNull(jobInstance, "该任务实例不存在");
if(JobStatus.isDone(jobInstance.getStatus())){ if (JobStatus.isDone(jobInstance.getStatus())) {
return jobInstance; return jobInstance;
} }
Cluster cluster = clusterService.getById(jobInstance.getClusterId()); Cluster cluster = clusterService.getById(jobInstance.getClusterId());
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, cluster.getJobManagerHost(), jobInstance.getJid()); JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, cluster.getJobManagerHost(), jobInstance.getJid());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson); JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
if(Asserts.isNull(jobHistory.getJob())||jobHistory.getJob().has(FlinkRestResultConstant.ERRORS)){ if (Asserts.isNull(jobHistory.getJob()) || jobHistory.getJob().has(FlinkRestResultConstant.ERRORS)) {
jobInstance.setStatus(JobStatus.UNKNOWN.getValue()); jobInstance.setStatus(JobStatus.UNKNOWN.getValue());
}else{ } else {
jobInstance.setDuration(jobHistory.getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong()/1000); jobInstance.setDuration(jobHistory.getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
jobInstance.setStatus(jobHistory.getJob().get(FlinkRestResultConstant.JOB_STATE).asText()); jobInstance.setStatus(jobHistory.getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
} }
jobInstanceService.updateById(jobInstance); jobInstanceService.updateById(jobInstance);
if(JobStatus.isDone(jobInstance.getStatus())){ if (JobStatus.isDone(jobInstance.getStatus())) {
handleJobDone(jobInstance); handleJobDone(jobInstance);
} }
return jobInstance; return jobInstance;
...@@ -490,18 +538,21 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -490,18 +538,21 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return jobInstanceService.getJobInfoDetailInfo(refreshJobInstance(id)); return jobInstanceService.getJobInfoDetailInfo(refreshJobInstance(id));
} }
private void handleJobDone(JobInstance jobInstance){ private void handleJobDone(JobInstance jobInstance) {
if(Asserts.isNull(jobInstance.getTaskId())){ if (Asserts.isNull(jobInstance.getTaskId())) {
return;
}
Task task = getTaskInfoById(jobInstance.getTaskId());
Task updateTask = new Task();
updateTask.setId(jobInstance.getTaskId());
updateTask.setJobInstanceId(0);
if (!JobLifeCycle.ONLINE.equalsValue(task.getStep())) {
updateById(updateTask);
return; return;
} }
Task task = new Task(); if (Asserts.isNotNull(task.getAlertGroupId())) {
task.setId(jobInstance.getTaskId());
task.setJobInstanceId(0);
updateById(task);
task = getTaskInfoById(jobInstance.getTaskId());
if(Asserts.isNotNull(task.getAlertGroupId())){
AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId()); AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId());
if(Asserts.isNotNull(alertGroup)){ if (Asserts.isNotNull(alertGroup)) {
List<AlertMsg> alertMsgList = new ArrayList<>(); List<AlertMsg> alertMsgList = new ArrayList<>();
AlertMsg alertMsg = new AlertMsg(); AlertMsg alertMsg = new AlertMsg();
alertMsg.setType("Flink 实时监控"); alertMsg.setType("Flink 实时监控");
...@@ -511,17 +562,19 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -511,17 +562,19 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
alertMsg.setStatus(jobInstance.getStatus()); alertMsg.setStatus(jobInstance.getStatus());
alertMsg.setContent(jobInstance.getJid()); alertMsg.setContent(jobInstance.getJid());
alertMsgList.add(alertMsg); alertMsgList.add(alertMsg);
for(AlertInstance alertInstance: alertGroup.getInstances()){ for (AlertInstance alertInstance : alertGroup.getInstances()) {
sendAlert(alertInstance,jobInstance,task,alertMsgList); sendAlert(alertInstance, jobInstance, task, alertMsgList);
} }
} }
} }
updateTask.setStep(JobLifeCycle.RELEASE.getValue());
updateById(updateTask);
} }
private void sendAlert(AlertInstance alertInstance,JobInstance jobInstance,Task task,List<AlertMsg> alertMsgList){ private void sendAlert(AlertInstance alertInstance, JobInstance jobInstance, Task task, List<AlertMsg> alertMsgList) {
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams())); AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.build(alertConfig); Alert alert = Alert.build(alertConfig);
String title = "任务【"+task.getAlias()+"】:"+jobInstance.getStatus(); String title = "任务【" + task.getAlias() + "】:" + jobInstance.getStatus();
String content = JSONUtil.toJsonString(alertMsgList); String content = JSONUtil.toJsonString(alertMsgList);
AlertResult alertResult = alert.send(title, content); AlertResult alertResult = alert.send(title, content);
AlertHistory alertHistory = new AlertHistory(); AlertHistory alertHistory = new AlertHistory();
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
<if test='param.type!=null and param.type!=""'> <if test='param.type!=null and param.type!=""'>
and dh.type = #{param.type} and dh.type = #{param.type}
</if> </if>
<if test='param.step!=null and param.step!=""'>
and a.step = #{param.step}
</if>
<if test='param.name!=null and param.name!=""'> <if test='param.name!=null and param.name!=""'>
and a.name like "%${param.name}%" and a.name like "%${param.name}%"
</if> </if>
......
...@@ -20,6 +20,7 @@ import java.util.List; ...@@ -20,6 +20,7 @@ import java.util.List;
@Setter @Setter
public class Job { public class Job {
private Integer id; private Integer id;
private Integer jobInstanceId;
private JobConfig jobConfig; private JobConfig jobConfig;
private String jobManagerAddress; private String jobManagerAddress;
private JobStatus status; private JobStatus status;
...@@ -59,6 +60,6 @@ public class Job { ...@@ -59,6 +60,6 @@ public class Job {
} }
public JobResult getJobResult() { public JobResult getJobResult() {
return new JobResult(id, jobConfig, jobManagerAddress, status, statement, jobId, error, result, startTime, endTime); return new JobResult(id, jobInstanceId, jobConfig, jobManagerAddress, status, statement, jobId, error, result, startTime, endTime);
} }
} }
...@@ -27,6 +27,8 @@ public class JobConfig { ...@@ -27,6 +27,8 @@ public class JobConfig {
// flink run mode // flink run mode
private String type; private String type;
// task JobLifeCycle
private Integer step;
private boolean useResult; private boolean useResult;
private boolean useChangeLog; private boolean useChangeLog;
private boolean useAutoCancel; private boolean useAutoCancel;
...@@ -126,11 +128,12 @@ public class JobConfig { ...@@ -126,11 +128,12 @@ public class JobConfig {
this.maxRowNum = maxRowNum; this.maxRowNum = maxRowNum;
} }
public JobConfig(String type,boolean useResult, boolean useSession, boolean useRemote, Integer clusterId, public JobConfig(String type, Integer step,boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment, Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet,boolean useBatchModel,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue, boolean useStatementSet,boolean useBatchModel,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue,
String savePointPath,Map<String,String> config) { String savePointPath,Map<String,String> config) {
this.type = type; this.type = type;
this.step = step;
this.useResult = useResult; this.useResult = useResult;
this.useSession = useSession; this.useSession = useSession;
this.useRemote = useRemote; this.useRemote = useRemote;
......
...@@ -24,6 +24,7 @@ public class JobResult { ...@@ -24,6 +24,7 @@ public class JobResult {
private boolean success; private boolean success;
private String statement; private String statement;
private String jobId; private String jobId;
private Integer jobInstanceId;
private String error; private String error;
private IResult result; private IResult result;
private LocalDateTime startTime; private LocalDateTime startTime;
...@@ -32,8 +33,9 @@ public class JobResult { ...@@ -32,8 +33,9 @@ public class JobResult {
public JobResult() { public JobResult() {
} }
public JobResult(Integer id, JobConfig jobConfig, String jobManagerAddress, Job.JobStatus status, String statement, String jobId, String error, IResult result, LocalDateTime startTime, LocalDateTime endTime) { public JobResult(Integer id, Integer jobInstanceId, JobConfig jobConfig, String jobManagerAddress, Job.JobStatus status, String statement, String jobId, String error, IResult result, LocalDateTime startTime, LocalDateTime endTime) {
this.id = id; this.id = id;
this.jobInstanceId = jobInstanceId;
this.jobConfig = jobConfig; this.jobConfig = jobConfig;
this.jobManagerAddress = jobManagerAddress; this.jobManagerAddress = jobManagerAddress;
this.status = status; this.status = status;
......
...@@ -313,6 +313,7 @@ create table dlink_job_instance ...@@ -313,6 +313,7 @@ create table dlink_job_instance
primary key, primary key,
name varchar(255) null comment '作业实例名', name varchar(255) null comment '作业实例名',
task_id int null comment 'taskID', task_id int null comment 'taskID',
step int null comment '生命周期',
cluster_id int null comment '集群ID', cluster_id int null comment '集群ID',
jid varchar(50) null comment 'FlinkJobId', jid varchar(50) null comment 'FlinkJobId',
status varchar(50) null comment '实例状态', status varchar(50) null comment '实例状态',
......
...@@ -629,5 +629,10 @@ ALTER TABLE `dlink_task` ...@@ -629,5 +629,10 @@ ALTER TABLE `dlink_task`
ADD COLUMN `job_instance_id` BIGINT NULL COMMENT '任务实例ID' AFTER `step`; ADD COLUMN `job_instance_id` BIGINT NULL COMMENT '任务实例ID' AFTER `step`;
ALTER TABLE `dlink_task` ALTER TABLE `dlink_task`
ADD COLUMN `alert_group_id` BIGINT NULL COMMENT '报警组ID' AFTER `env_id`; ADD COLUMN `alert_group_id` BIGINT NULL COMMENT '报警组ID' AFTER `env_id`;
-- ----------------------------
-- 0.6.0-SNAPSHOT 2022-03-13
-- ----------------------------
ALTER TABLE `dlink_job_instance`
ADD COLUMN `step` INT NULL COMMENT '生命周期' AFTER `task_id`;
SET FOREIGN_KEY_CHECKS = 1; SET FOREIGN_KEY_CHECKS = 1;
...@@ -237,8 +237,8 @@ export function onLineTask(id: number) { ...@@ -237,8 +237,8 @@ export function onLineTask(id: number) {
return getData('api/task/onLineTask',{id}); return getData('api/task/onLineTask',{id});
} }
/*--- 下线作业 ---*/ /*--- 下线作业 ---*/
export function offLineTask(id: number) { export function offLineTask(id: number, type: string) {
return getData('api/task/offLineTask',{id}); return getData('api/task/offLineTask',{id, type});
} }
/*--- 注销作业 ---*/ /*--- 注销作业 ---*/
export function cancelTask(id: number) { export function cancelTask(id: number) {
......
...@@ -110,6 +110,7 @@ const StudioMenu = (props: any) => { ...@@ -110,6 +110,7 @@ const StudioMenu = (props: any) => {
result.then(res => { result.then(res => {
notification.close(taskKey); notification.close(taskKey);
if (res.datas.success) { if (res.datas.success) {
props.changeTaskJobInstance(current.task.id,res.datas.jobInstanceId);
message.success('执行成功'); message.success('执行成功');
} else { } else {
message.error('执行失败'); message.error('执行失败');
...@@ -154,6 +155,7 @@ const StudioMenu = (props: any) => { ...@@ -154,6 +155,7 @@ const StudioMenu = (props: any) => {
const res = await postDataArray('/api/task/submit', [task.id]); const res = await postDataArray('/api/task/submit', [task.id]);
notification.close(taskKey); notification.close(taskKey);
if (res.datas[0].success) { if (res.datas[0].success) {
props.changeTaskJobInstance(current.task.id,res.datas[0].jobInstanceId);
message.success('异步提交成功'); message.success('异步提交成功');
} else { } else {
message.success('异步提交失败'); message.success('异步提交失败');
...@@ -315,27 +317,51 @@ const StudioMenu = (props: any) => { ...@@ -315,27 +317,51 @@ const StudioMenu = (props: any) => {
onOk: async () => { onOk: async () => {
const res = onLineTask(current.task.id); const res = onLineTask(current.task.id);
res.then((result) => { res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.ONLINE);
if(result.code == CODE.SUCCESS) { if(result.code == CODE.SUCCESS) {
props.changeTaskStep(current.task.id,TASKSTEPS.ONLINE);
message.success(`上线作业【${current.task.alias}】成功`); message.success(`上线作业【${current.task.alias}】成功`);
}else {
message.error(`上线作业【${current.task.alias}】失败,原因:\n${result.msg}`);
} }
}); });
} }
}); });
}; };
const toOffLineTask = () => { const handleCancelTask = (type: string) => {
Modal.confirm({
title: '停止作业',
content: `确定停止作业【${current.task.alias}】吗?`,
okText: '确认',
cancelText: '取消',
onOk: async () => {
const res = offLineTask(current.task.id,type);
res.then((result) => {
if(result.code == CODE.SUCCESS) {
props.changeTaskJobInstance(current.task.id,0);
message.success(`停止作业【${current.task.alias}】成功`);
}else {
message.error(`停止作业【${current.task.alias}】失败,原因:\n${result.msg}`);
}
});
}
});
};
const toOffLineTask = (type: string) => {
Modal.confirm({ Modal.confirm({
title: '下线作业', title: '下线作业',
content: `确定下线作业【${current.task.alias}】吗?`, content: `确定下线作业【${current.task.alias}】吗?`,
okText: '确认', okText: '确认',
cancelText: '取消', cancelText: '取消',
onOk: async () => { onOk: async () => {
const res = offLineTask(current.task.id); const res = offLineTask(current.task.id,type);
res.then((result) => { res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.RELEASE);
if(result.code == CODE.SUCCESS) { if(result.code == CODE.SUCCESS) {
props.changeTaskStep(current.task.id,TASKSTEPS.RELEASE);
message.success(`下线作业【${current.task.alias}】成功`); message.success(`下线作业【${current.task.alias}】成功`);
}else {
message.error(`下线作业【${current.task.alias}】失败,原因:\n${result.msg}`);
} }
}); });
} }
...@@ -505,24 +531,34 @@ const StudioMenu = (props: any) => { ...@@ -505,24 +531,34 @@ const StudioMenu = (props: any) => {
onClick={onGetStreamGraph} onClick={onGetStreamGraph}
/> />
</Tooltip>)} </Tooltip>)}
{(!current.task.dialect||current.task.dialect === DIALECT.FLINKSQL||isSql( current.task.dialect )) &&( { current.task.jobInstanceId&&(current.task.jobInstanceId != 0) ?
<Tooltip title="执行当前的 SQL"> <Tooltip title="停止">
<Button
type="text"
icon={<PlayCircleTwoTone/>}
//loading={loadings[2]}
onClick={execute}
/>
</Tooltip>)}
{(!current.task.dialect||current.task.dialect === DIALECT.FLINKSQL||current.task.dialect === DIALECT.FLINKJAR||isSql( current.task.dialect )) &&(<>
<Tooltip title="提交当前的作业到集群,提交前请手动保存">
<Button <Button
type="text" type="text"
icon={<RocketTwoTone/>} icon={<PauseCircleTwoTone />}
onClick={submit} onClick={()=>handleCancelTask('canceljob')}
/> />
</Tooltip> </Tooltip>:<>
{(!current.task.dialect||current.task.dialect === DIALECT.FLINKSQL||isSql( current.task.dialect )) &&(
<Tooltip title="执行当前的 SQL">
<Button
type="text"
icon={<PlayCircleTwoTone/>}
//loading={loadings[2]}
onClick={execute}
/>
</Tooltip>)}
{(!current.task.dialect||current.task.dialect === DIALECT.FLINKSQL||current.task.dialect === DIALECT.FLINKJAR||isSql( current.task.dialect )) &&(<>
<Tooltip title="提交当前的作业到集群,提交前请手动保存">
<Button
type="text"
icon={<RocketTwoTone/>}
onClick={submit}
/>
</Tooltip>
</>)} </>)}
</>
}
<Divider type="vertical"/> <Divider type="vertical"/>
{current.task.step == TASKSTEPS.DEVELOP ? {current.task.step == TASKSTEPS.DEVELOP ?
<Tooltip title="发布,发布后将无法修改"> <Tooltip title="发布,发布后将无法修改">
...@@ -551,7 +587,7 @@ const StudioMenu = (props: any) => { ...@@ -551,7 +587,7 @@ const StudioMenu = (props: any) => {
<Button <Button
type="text" type="text"
icon={<PauseCircleTwoTone />} icon={<PauseCircleTwoTone />}
onClick={toOffLineTask} onClick={()=>toOffLineTask('cancel')}
/> />
</Tooltip>:undefined </Tooltip>:undefined
}{(current.task.step != TASKSTEPS.ONLINE && current.task.step != TASKSTEPS.CANCEL) ? }{(current.task.step != TASKSTEPS.ONLINE && current.task.step != TASKSTEPS.CANCEL) ?
...@@ -651,6 +687,11 @@ const mapDispatchToProps = (dispatch: Dispatch)=>({ ...@@ -651,6 +687,11 @@ const mapDispatchToProps = (dispatch: Dispatch)=>({
payload: { payload: {
id,step id,step
}, },
}),changeTaskJobInstance:(id: number, jobInstanceId: number)=>dispatch({
type: "Studio/changeTaskStep",
payload: {
id,jobInstanceId
},
}), }),
}); });
......
...@@ -13,7 +13,8 @@ import moment from "moment"; ...@@ -13,7 +13,8 @@ import moment from "moment";
import BaseInfo from "@/pages/DevOps/JobInfo/BaseInfo"; import BaseInfo from "@/pages/DevOps/JobInfo/BaseInfo";
import Config from "@/pages/DevOps/JobInfo/Config"; import Config from "@/pages/DevOps/JobInfo/Config";
import JobStatus, {isStatusDone} from "@/components/Common/JobStatus"; import JobStatus, {isStatusDone} from "@/components/Common/JobStatus";
import {cancelJob, restartJob, savepointJob} from "@/components/Studio/StudioEvent/DDL"; import {cancelJob, offLineTask, restartJob} from "@/components/Studio/StudioEvent/DDL";
import {CODE} from "@/components/Common/crud";
const {Link} = Typography; const {Link} = Typography;
...@@ -65,7 +66,7 @@ const JobInfo = (props: any) => { ...@@ -65,7 +66,7 @@ const JobInfo = (props: any) => {
if (!job?.cluster?.id) return; if (!job?.cluster?.id) return;
const res = cancelJob(job?.cluster?.id, job?.instance?.jid); const res = cancelJob(job?.cluster?.id, job?.instance?.jid);
res.then((result) => { res.then((result) => {
if (result.datas == true) { if (result.code == CODE.SUCCESS) {
message.success(key+"成功"); message.success(key+"成功");
handleGetJobInfoDetail(); handleGetJobInfoDetail();
} else { } else {
...@@ -83,9 +84,9 @@ const JobInfo = (props: any) => { ...@@ -83,9 +84,9 @@ const JobInfo = (props: any) => {
cancelText: '取消', cancelText: '取消',
onOk: async () => { onOk: async () => {
if (!job?.cluster?.id) return; if (!job?.cluster?.id) return;
const res = savepointJob(job?.cluster?.id, job?.instance?.jid,key,key,job?.instance?.taskId); const res = offLineTask(job?.instance?.taskId,key);
res.then((result) => { res.then((result) => {
if (result.datas == true) { if (result.code == CODE.SUCCESS) {
message.success(key+"成功"); message.success(key+"成功");
handleGetJobInfoDetail(); handleGetJobInfoDetail();
} else { } else {
...@@ -106,7 +107,7 @@ const JobInfo = (props: any) => { ...@@ -106,7 +107,7 @@ const JobInfo = (props: any) => {
if (!job?.cluster?.id) return; if (!job?.cluster?.id) return;
const res = restartJob(job?.instance?.taskId); const res = restartJob(job?.instance?.taskId);
res.then((result) => { res.then((result) => {
if (result.datas.success == true) { if (result.code == CODE.SUCCESS) {
message.success("重新上线成功"); message.success("重新上线成功");
} else { } else {
message.error("重新上线失败"); message.error("重新上线失败");
...@@ -127,9 +128,9 @@ const JobInfo = (props: any) => { ...@@ -127,9 +128,9 @@ const JobInfo = (props: any) => {
FlinkWebUI FlinkWebUI
</Link></Button>); </Link></Button>);
} }
buttons.push(<Button key="autorestart" type="primary" onClick={handleRestart}>重新上线</Button>); buttons.push(<Button key="autorestart" type="primary" onClick={handleRestart}>重新{job?.instance?.step == 5?'上线':'启动'}</Button>);
if(!isStatusDone(job?.instance?.status as string)){ if(!isStatusDone(job?.instance?.status as string)){
buttons.push(<Button key="autostop" type="primary" danger onClick={()=>{handleSavepoint('cancel')}}>下线</Button>); buttons.push(<Button key="autostop" type="primary" danger onClick={()=>{handleSavepoint('cancel')}}>{job?.instance?.step == 5?'下线':'智能停止'}</Button>);
buttons.push(<Dropdown buttons.push(<Dropdown
key="dropdown" key="dropdown"
trigger={['click']} trigger={['click']}
......
...@@ -10,7 +10,7 @@ import type { ProColumns } from '@ant-design/pro-table'; ...@@ -10,7 +10,7 @@ import type { ProColumns } from '@ant-design/pro-table';
import ProTable from "@ant-design/pro-table"; import ProTable from "@ant-design/pro-table";
import {JobInstanceTableListItem} from "@/pages/DevOps/data"; import {JobInstanceTableListItem} from "@/pages/DevOps/data";
import moment from 'moment'; import moment from 'moment';
import {RUN_MODE} from "@/components/Studio/conf"; import {RUN_MODE, TASKSTEPS} from "@/components/Studio/conf";
import JobStatus from "@/components/Common/JobStatus"; import JobStatus from "@/components/Common/JobStatus";
const url = '/api/jobInstance'; const url = '/api/jobInstance';
...@@ -24,6 +24,34 @@ const JobInstanceTable = (props: any) => { ...@@ -24,6 +24,34 @@ const JobInstanceTable = (props: any) => {
title: "作业名", title: "作业名",
dataIndex: "name", dataIndex: "name",
sorter: true, sorter: true,
},{
title: "生命周期",
dataIndex: "step",
sorter: true,
valueType: 'radio',
valueEnum: {
'': {text: '全部', status: 'ALL'},
1: {
text: '已创建',
status: TASKSTEPS.CREATE,
},
2: {
text: '开发中',
status: TASKSTEPS.DEVELOP,
},
4: {
text: '已发布',
status: TASKSTEPS.RELEASE,
},
5: {
text: '已上线',
status: TASKSTEPS.ONLINE,
},
0: {
text: '未知',
status: TASKSTEPS.UNKNOWN,
},
},
},{ },{
title: "运行模式", title: "运行模式",
dataIndex: "type", dataIndex: "type",
......
...@@ -7,6 +7,7 @@ export type JobInstanceTableListItem = { ...@@ -7,6 +7,7 @@ export type JobInstanceTableListItem = {
id: number, id: number,
name: string, name: string,
taskId: number, taskId: number,
step: number,
clusterId: number, clusterId: number,
clusterAlias: string, clusterAlias: string,
type: string, type: string,
......
...@@ -4,10 +4,10 @@ export function getStatusCount() { ...@@ -4,10 +4,10 @@ export function getStatusCount() {
return getData("api/jobInstance/getStatusCount"); return getData("api/jobInstance/getStatusCount");
} }
export function getJobInfoDetail(id:number) { export function getJobInfoDetail(id: number) {
return getData("api/jobInstance/getJobInfoDetail",{id}); return getData("api/jobInstance/getJobInfoDetail",{id});
} }
export function refreshJobInfoDetail(id:number) { export function refreshJobInfoDetail(id: number) {
return getData("api/jobInstance/refreshJobInfoDetail",{id}); return getData("api/jobInstance/refreshJobInfoDetail",{id});
} }
...@@ -80,6 +80,7 @@ export type TaskType = { ...@@ -80,6 +80,7 @@ export type TaskType = {
databaseName?: string, databaseName?: string,
jarId?: number, jarId?: number,
envId?: number, envId?: number,
jobInstanceId?: number,
note?: string, note?: string,
enabled?: boolean, enabled?: boolean,
createTime?: Date, createTime?: Date,
...@@ -189,6 +190,7 @@ export type ModelType = { ...@@ -189,6 +190,7 @@ export type ModelType = {
saveEnv: Reducer<StateType>; saveEnv: Reducer<StateType>;
saveChart: Reducer<StateType>; saveChart: Reducer<StateType>;
changeTaskStep: Reducer<StateType>; changeTaskStep: Reducer<StateType>;
changeTaskJobInstance: Reducer<StateType>;
}; };
}; };
...@@ -514,6 +516,23 @@ const Model: ModelType = { ...@@ -514,6 +516,23 @@ const Model: ModelType = {
tabs: {...newTabs}, tabs: {...newTabs},
}; };
}, },
changeTaskJobInstance(state, {payload}) {
const newTabs = state.tabs;
let newCurrent = state.current;
for (let i = 0; i < newTabs.panes.length; i++) {
if (newTabs.panes[i].task.id == payload.id) {
newTabs.panes[i].task.jobInstanceId = payload.jobInstanceId;
if(newCurrent.key == newTabs.panes[i].key){
newCurrent = newTabs.panes[i];
}
}
}
return {
...state,
current: {...newCurrent},
tabs: {...newTabs},
};
},
}, },
}; };
......
...@@ -737,6 +737,9 @@ export default (): React.ReactNode => { ...@@ -737,6 +737,9 @@ export default (): React.ReactNode => {
<li> <li>
<Link>优化 IDEA调试时的依赖配置</Link> <Link>优化 IDEA调试时的依赖配置</Link>
</li> </li>
<li>
<Link>新增 作业上下线自动提交和停止任务</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment