Unverified Commit bd3fdd81 authored by mydq's avatar mydq Committed by GitHub

Fix jobhistory bug (#805)

* jobHistory bug modify

* jobHistory bug modify

* jobHistory bug modify
parent ed6e8518
...@@ -87,4 +87,7 @@ public class JobHistory implements Serializable { ...@@ -87,4 +87,7 @@ public class JobHistory implements Serializable {
@TableField(fill = FieldFill.INSERT_UPDATE) @TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime; private LocalDateTime updateTime;
@TableField(exist = false)
private boolean error;
} }
...@@ -95,7 +95,11 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo ...@@ -95,7 +95,11 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId); JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId);
if(jobInfo.has(FlinkRestResultConstant.ERRORS)){ if(jobInfo.has(FlinkRestResultConstant.ERRORS)){
final JobHistory dbHistory = getById(id); final JobHistory dbHistory = getById(id);
return Objects.isNull(dbHistory) ? jobHistory : dbHistory; if (Objects.nonNull(dbHistory)) {
jobHistory = dbHistory;
}
jobHistory.setError(true);
return jobHistory;
} }
JsonNode exception = FlinkAPI.build(jobManagerHost).getException(jobId); JsonNode exception = FlinkAPI.build(jobManagerHost).getException(jobId);
JsonNode checkPoints = FlinkAPI.build(jobManagerHost).getCheckPoints(jobId); JsonNode checkPoints = FlinkAPI.build(jobManagerHost).getCheckPoints(jobId);
...@@ -115,8 +119,7 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo ...@@ -115,8 +119,7 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally {
return jobHistory;
} }
return jobHistory;
} }
} }
...@@ -785,13 +785,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -785,13 +785,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(), jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave()); JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(), jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson); JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && Asserts.isNull(jobHistory.getJob())) { jobInfoDetail.setJobHistory(jobHistory);
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && (Asserts.isNull(jobHistory.getJob()) || jobHistory.isError())) {
return jobInfoDetail.getInstance(); return jobInfoDetail.getInstance();
} }
jobInfoDetail.setJobHistory(jobHistory);
String status = jobInfoDetail.getInstance().getStatus(); String status = jobInfoDetail.getInstance().getStatus();
boolean jobStatusChanged = false; boolean jobStatusChanged = false;
if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob())) { if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob()) || jobInfoDetail.getJobHistory().isError()) {
jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue()); jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue());
} else { } else {
jobInfoDetail.getInstance().setDuration(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000); jobInfoDetail.getInstance().setDuration(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment