Commit 0b27a790 authored by zhu-mingye's avatar zhu-mingye

op alert

parent 96720581
package com.dlink.service.impl; package com.dlink.service.impl;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import com.dlink.alert.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.*;
import com.dlink.assertion.Assert; import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.assertion.Tips; import com.dlink.assertion.Tips;
...@@ -37,46 +19,31 @@ import com.dlink.gateway.config.SavePointStrategy; ...@@ -37,46 +19,31 @@ import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.gateway.config.SavePointType; import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.job.FlinkJobTask; import com.dlink.job.*;
import com.dlink.job.FlinkJobTaskPool;
import com.dlink.job.Job;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.mapper.TaskMapper; import com.dlink.mapper.TaskMapper;
import com.dlink.metadata.driver.Driver; import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.metadata.result.JdbcSelectResult;
import com.dlink.model.AlertGroup; import com.dlink.model.*;
import com.dlink.model.AlertHistory;
import com.dlink.model.AlertInstance;
import com.dlink.model.Cluster;
import com.dlink.model.DataBase;
import com.dlink.model.History;
import com.dlink.model.Jar;
import com.dlink.model.JobHistory;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.JobLifeCycle;
import com.dlink.model.JobStatus;
import com.dlink.model.Savepoints;
import com.dlink.model.Statement;
import com.dlink.model.SystemConfiguration;
import com.dlink.model.Task;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.dlink.service.AlertGroupService; import com.dlink.service.*;
import com.dlink.service.AlertHistoryService;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.DataBaseService;
import com.dlink.service.HistoryService;
import com.dlink.service.JarService;
import com.dlink.service.JobHistoryService;
import com.dlink.service.JobInstanceService;
import com.dlink.service.SavepointsService;
import com.dlink.service.StatementService;
import com.dlink.service.TaskService;
import com.dlink.utils.CustomStringJavaCompiler; import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/** /**
* 任务 服务实现类 * 任务 服务实现类
...@@ -668,6 +635,18 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -668,6 +635,18 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
return "127.0.0.1:" + serverPort; return "127.0.0.1:" + serverPort;
} }
private String getDuration(long jobStartTimeMills ,long jobEndTimeMills ) {
Instant startTime = Instant.ofEpochMilli(jobStartTimeMills);
Instant endTime = Instant.ofEpochMilli(jobEndTimeMills);
long days = ChronoUnit.DAYS.between(startTime, endTime);
long hours = ChronoUnit.HOURS.between(startTime, endTime);
long minutes = ChronoUnit.MINUTES.between(startTime, endTime);
long seconds = ChronoUnit.SECONDS.between(startTime, endTime) ;
String duration = days + "天 " + (hours - (days * 24)) + "小时 " + (minutes - (hours * 60)) + "分 " + (seconds - (minutes * 60)) + "秒";
return duration;
}
private void handleJobDone(JobInstance jobInstance) { private void handleJobDone(JobInstance jobInstance) {
if (Asserts.isNull(jobInstance.getTaskId())) { if (Asserts.isNull(jobInstance.getTaskId())) {
...@@ -681,34 +660,49 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -681,34 +660,49 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
updateById(updateTask); updateById(updateTask);
return; return;
} }
Integer jobInstanceHistoryId = jobInstance.getHistoryId(); Integer jobInstanceId = jobInstance.getId();
JobHistory jobHistory = jobHistoryService.getById(jobInstanceHistoryId); JobHistory jobHistory = jobHistoryService.getById(jobInstanceId); //获取任务历史信息
String jobJson = jobHistory.getJobJson(); String jobJson = jobHistory.getJobJson(); //获取任务历史信息的jobJson
ObjectNode jsonNodes = JSONUtil.parseObject(jobJson); ObjectNode jsonNodes = JSONUtil.parseObject(jobJson);
if (jsonNodes.has("errors")) {
return;
}
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String startTime = dateFormat.format(new Date(jsonNodes.get("start-time").asLong())); long asLongStartTime = jsonNodes.get("start-time").asLong(); //获取任务历史信息的start-time
String endTime = dateFormat.format(new Date(jsonNodes.get("end-time").asLong())); long asLongEndTime = jsonNodes.get("end-time").asLong(); //获取任务历史信息的end-time
Integer duration = jsonNodes.get("duration").asInt();
if (asLongEndTime < asLongStartTime){
asLongEndTime = System.currentTimeMillis();
}
String startTime = dateFormat.format(asLongStartTime);
String endTime = dateFormat.format(asLongEndTime);
// Long duration = jsonNodes.get("duration").asLong();
String duration = getDuration(asLongStartTime, asLongEndTime); //获取任务的 duration 使用的是 start-time 和 end-time 计算 不采用 duration 字段
String clusterJson = jobHistory.getClusterJson(); //获取任务历史信息的clusterJson 主要获取 jobManagerHost
ObjectNode clusterJsonNodes = JSONUtil.parseObject(clusterJson);
String jobManagerHost = clusterJsonNodes.get("jobManagerHost").asText();
if (Asserts.isNotNull(task.getAlertGroupId())) { if (Asserts.isNotNull(task.getAlertGroupId())) {
AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId()); AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId());
if (Asserts.isNotNull(alertGroup)) { if (Asserts.isNotNull(alertGroup)) {
AlertMsg alertMsg = new AlertMsg(); AlertMsg alertMsg = new AlertMsg();
alertMsg.setAlertType("Flink 实时监控"); alertMsg.setAlertType("Flink 实时监控");
alertMsg.setAlertTime(LocalDateTime.now().atZone(ZoneId.systemDefault()).toString()); alertMsg.setAlertTime(dateFormat.format(new Date()));
alertMsg.setJobID(jobInstance.getId().toString()); alertMsg.setJobID(jobInstance.getJid());
alertMsg.setJobName(task.getAlias()); alertMsg.setJobName(task.getName());
alertMsg.setJobName(jobInstance.getType()); alertMsg.setJobType(task.getDialect());
alertMsg.setJobStatus(jobInstance.getStatus()); alertMsg.setJobStatus(jobInstance.getStatus());
alertMsg.setJobStartTime(startTime); alertMsg.setJobStartTime(startTime);
alertMsg.setJobEndTime(endTime); alertMsg.setJobEndTime(endTime);
alertMsg.setJobDuration(duration + " Seconds"); alertMsg.setJobDuration(duration);
String linkUrl = "http://" + jobInstance.getJobManagerAddress() + "/#/job/" + jobInstance.getJid() + "/overview";
String exceptionUrl = "http://" + jobInstance.getJobManagerAddress() + "/#/job/" + jobInstance.getJid() + "/exceptions"; String linkUrl = "http://" + jobManagerHost + "/#/job/" + jobInstance.getJid() + "/overview";
String exceptionUrl = "http://" + jobManagerHost + "/#/job/" + jobInstance.getJid() + "/exceptions";
for (AlertInstance alertInstance : alertGroup.getInstances()) { for (AlertInstance alertInstance : alertGroup.getInstances()) {
Map<String, String> map = JSONUtil.toMap(alertInstance.getParams()); Map<String, String> map = JSONUtil.toMap(alertInstance.getParams());
if ( map.get("msgtype").equals(ShowType.MARKDOWN.getValue())) { if (map.get("msgtype").equals(ShowType.MARKDOWN.getValue())) {
alertMsg.setLinkUrl("[跳转至该任务的 FlinkWeb](" + linkUrl + ")"); alertMsg.setLinkUrl("[跳转至该任务的 FlinkWeb](" + linkUrl + ")");
alertMsg.setExceptionUrl("[点击查看该任务的异常日志](" + exceptionUrl + ")"); alertMsg.setExceptionUrl("[点击查看该任务的异常日志](" + exceptionUrl + ")");
}else { }else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment