Commit a8367874 authored by wenmo's avatar wenmo

[Fix-603] [admin] Task information will be cleared when refreshing tasks that have lost connection

parent 858fbd45
package com.dlink.service.impl;
import com.dlink.constant.FlinkRestResultConstant;
import org.springframework.stereotype.Service;
import com.dlink.api.FlinkAPI;
......@@ -70,6 +71,9 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
jobHistory.setId(id);
try {
JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId);
if(jobInfo.has(FlinkRestResultConstant.ERRORS)){
return jobHistory;
}
JsonNode exception = FlinkAPI.build(jobManagerHost).getException(jobId);
JsonNode checkPoints = FlinkAPI.build(jobManagerHost).getCheckPoints(jobId);
JsonNode checkPointsConfig = FlinkAPI.build(jobManagerHost).getCheckPointsConfig(jobId);
......@@ -87,6 +91,7 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
return jobHistory;
}
......
......@@ -604,6 +604,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
}
jobInfoDetail.setHistory(history);
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(id));
pool.push(key, jobInfoDetail);
}
if (!isCoercive && !inRefreshPlan(jobInfoDetail.getInstance())) {
......@@ -611,10 +612,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(), jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && Asserts.isNull(jobHistory.getJob())) {
return jobInfoDetail.getInstance();
}
jobInfoDetail.setJobHistory(jobHistory);
String status = jobInfoDetail.getInstance().getStatus();
boolean jobStatusChanged = false;
if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob()) || jobInfoDetail.getJobHistory().getJob().has(FlinkRestResultConstant.ERRORS)) {
if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob()) || Asserts.isNull(jobInfoDetail.getJobHistory().getJob())) {
jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue());
} else {
jobInfoDetail.getInstance().setDuration(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment