Unverified Commit 03de2a8a authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-603] [admin] Task information will be cleared when refreshing tasks that have lost connection

[Fix-603] [admin] Task information will be cleared when refreshing ta…
parents 858fbd45 a8367874
package com.dlink.service.impl; package com.dlink.service.impl;
import com.dlink.constant.FlinkRestResultConstant;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.dlink.api.FlinkAPI; import com.dlink.api.FlinkAPI;
...@@ -70,6 +71,9 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo ...@@ -70,6 +71,9 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
jobHistory.setId(id); jobHistory.setId(id);
try { try {
JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId); JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId);
if(jobInfo.has(FlinkRestResultConstant.ERRORS)){
return jobHistory;
}
JsonNode exception = FlinkAPI.build(jobManagerHost).getException(jobId); JsonNode exception = FlinkAPI.build(jobManagerHost).getException(jobId);
JsonNode checkPoints = FlinkAPI.build(jobManagerHost).getCheckPoints(jobId); JsonNode checkPoints = FlinkAPI.build(jobManagerHost).getCheckPoints(jobId);
JsonNode checkPointsConfig = FlinkAPI.build(jobManagerHost).getCheckPointsConfig(jobId); JsonNode checkPointsConfig = FlinkAPI.build(jobManagerHost).getCheckPointsConfig(jobId);
...@@ -87,6 +91,7 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo ...@@ -87,6 +91,7 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
} }
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
} finally { } finally {
return jobHistory; return jobHistory;
} }
......
...@@ -604,6 +604,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -604,6 +604,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
} }
jobInfoDetail.setHistory(history); jobInfoDetail.setHistory(history);
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(id));
pool.push(key, jobInfoDetail); pool.push(key, jobInfoDetail);
} }
if (!isCoercive && !inRefreshPlan(jobInfoDetail.getInstance())) { if (!isCoercive && !inRefreshPlan(jobInfoDetail.getInstance())) {
...@@ -611,10 +612,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -611,10 +612,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(), jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave()); JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(), jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson); JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && Asserts.isNull(jobHistory.getJob())) {
return jobInfoDetail.getInstance();
}
jobInfoDetail.setJobHistory(jobHistory); jobInfoDetail.setJobHistory(jobHistory);
String status = jobInfoDetail.getInstance().getStatus(); String status = jobInfoDetail.getInstance().getStatus();
boolean jobStatusChanged = false; boolean jobStatusChanged = false;
if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob()) || jobInfoDetail.getJobHistory().getJob().has(FlinkRestResultConstant.ERRORS)) { if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob()) || Asserts.isNull(jobInfoDetail.getJobHistory().getJob())) {
jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue()); jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue());
} else { } else {
jobInfoDetail.getInstance().setDuration(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000); jobInfoDetail.getInstance().setDuration(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment