Unverified Commit 5e71add4 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Optimization-640][Alert,web] Optimization all Alert of sendMsg

[Optimization-640][Alert,web] Optimization all Alert of sendMsg 
parents 31ca6734 0b27a790
......@@ -2,6 +2,7 @@ package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.*;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.AlertInstanceMapper;
import com.dlink.model.AlertInstance;
......@@ -13,6 +14,7 @@ import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
......@@ -33,15 +35,30 @@ public class AlertInstanceServiceImpl extends SuperServiceImpl<AlertInstanceMapp
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.buildTest(alertConfig);
String currentDateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
String testSendMsg = "[{\"type\":\"Flink 实时监控\"," +
"\"time\":\"" + currentDateTime + "\"," +
"\"id\":\"" + UUID.randomUUID() + "\"," +
"\"name\":\"此信息仅用于测试告警信息是否发送正常 ! 请忽略此信息!\"," +
"\"status\":\"Test\"," +
"\"content\" :\"" + UUID.randomUUID() + "\"}]";
List<AlertMsg> lists = JSONUtil.toList(testSendMsg, AlertMsg.class);
String title = "任务【测试任务】:" + alertInstance.getType() + " 报警 !";
String content = JSONUtil.toJsonString(lists);
return alert.send(title, content);
String uuid = UUID.randomUUID().toString();
AlertMsg alertMsg = new AlertMsg();
alertMsg.setAlertType("实时告警监控");
alertMsg.setAlertTime(currentDateTime);
alertMsg.setJobID(uuid);
alertMsg.setJobName("测试任务");
alertMsg.setJobType("SQL");
alertMsg.setJobStatus("FAILED");
alertMsg.setJobStartTime(currentDateTime);
alertMsg.setJobEndTime(currentDateTime);
alertMsg.setJobDuration("1 Seconds");
String linkUrl = "http://cdh1:8081/#/job/"+ uuid+"/overview";
String exceptionUrl = "http://cdh1:8081/#/job/"+uuid+"/exceptions";
Map<String, String> map = JSONUtil.toMap(alertInstance.getParams());
if ( map.get("msgtype").equals(ShowType.MARKDOWN.getValue())) {
alertMsg.setLinkUrl("[跳转至该任务的 FlinkWeb](" + linkUrl + ")");
alertMsg.setExceptionUrl("[点击查看该任务的异常日志](" + exceptionUrl + ")");
}else {
alertMsg.setLinkUrl(linkUrl);
alertMsg.setExceptionUrl(exceptionUrl);
}
String title = "任务【"+alertMsg.getJobName()+"】:" +alertMsg.getJobStatus() + "!";
return alert.send(title, alertMsg.toString());
}
}
package com.dlink.service.impl;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.Alert;
import com.dlink.alert.AlertConfig;
import com.dlink.alert.AlertMsg;
import com.dlink.alert.AlertResult;
import com.dlink.alert.*;
import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts;
import com.dlink.assertion.Tips;
......@@ -34,46 +19,31 @@ import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.job.FlinkJobTask;
import com.dlink.job.FlinkJobTaskPool;
import com.dlink.job.Job;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.job.*;
import com.dlink.mapper.TaskMapper;
import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.result.JdbcSelectResult;
import com.dlink.model.AlertGroup;
import com.dlink.model.AlertHistory;
import com.dlink.model.AlertInstance;
import com.dlink.model.Cluster;
import com.dlink.model.DataBase;
import com.dlink.model.History;
import com.dlink.model.Jar;
import com.dlink.model.JobHistory;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.JobLifeCycle;
import com.dlink.model.JobStatus;
import com.dlink.model.Savepoints;
import com.dlink.model.Statement;
import com.dlink.model.SystemConfiguration;
import com.dlink.model.Task;
import com.dlink.model.*;
import com.dlink.result.SqlExplainResult;
import com.dlink.service.AlertGroupService;
import com.dlink.service.AlertHistoryService;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.DataBaseService;
import com.dlink.service.HistoryService;
import com.dlink.service.JarService;
import com.dlink.service.JobHistoryService;
import com.dlink.service.JobInstanceService;
import com.dlink.service.SavepointsService;
import com.dlink.service.StatementService;
import com.dlink.service.TaskService;
import com.dlink.service.*;
import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* 任务 服务实现类
......@@ -527,7 +497,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private JobConfig buildJobConfig(Task task) {
boolean isJarTask = Dialect.FLINKJAR.equalsVal(task.getDialect());
if (!isJarTask && Asserts.isNotNull(task.getFragment())?task.getFragment():false) {
if (!isJarTask && Asserts.isNotNull(task.getFragment()) ? task.getFragment() : false) {
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
if (Asserts.isNotNullString(flinkWithSql)) {
task.setStatement(flinkWithSql + "\r\n" + task.getStatement());
......@@ -667,6 +637,18 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
return "127.0.0.1:" + serverPort;
}
private String getDuration(long jobStartTimeMills ,long jobEndTimeMills ) {
Instant startTime = Instant.ofEpochMilli(jobStartTimeMills);
Instant endTime = Instant.ofEpochMilli(jobEndTimeMills);
long days = ChronoUnit.DAYS.between(startTime, endTime);
long hours = ChronoUnit.HOURS.between(startTime, endTime);
long minutes = ChronoUnit.MINUTES.between(startTime, endTime);
long seconds = ChronoUnit.SECONDS.between(startTime, endTime) ;
String duration = days + "天 " + (hours - (days * 24)) + "小时 " + (minutes - (hours * 60)) + "分 " + (seconds - (minutes * 60)) + "秒";
return duration;
}
private void handleJobDone(JobInstance jobInstance) {
if (Asserts.isNull(jobInstance.getTaskId())) {
......@@ -680,20 +662,56 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
updateById(updateTask);
return;
}
Integer jobInstanceId = jobInstance.getId();
JobHistory jobHistory = jobHistoryService.getById(jobInstanceId); //获取任务历史信息
String jobJson = jobHistory.getJobJson(); //获取任务历史信息的jobJson
ObjectNode jsonNodes = JSONUtil.parseObject(jobJson);
if (jsonNodes.has("errors")) {
return;
}
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long asLongStartTime = jsonNodes.get("start-time").asLong(); //获取任务历史信息的start-time
long asLongEndTime = jsonNodes.get("end-time").asLong(); //获取任务历史信息的end-time
if (asLongEndTime < asLongStartTime){
asLongEndTime = System.currentTimeMillis();
}
String startTime = dateFormat.format(asLongStartTime);
String endTime = dateFormat.format(asLongEndTime);
// Long duration = jsonNodes.get("duration").asLong();
String duration = getDuration(asLongStartTime, asLongEndTime); //获取任务的 duration 使用的是 start-time 和 end-time 计算 不采用 duration 字段
String clusterJson = jobHistory.getClusterJson(); //获取任务历史信息的clusterJson 主要获取 jobManagerHost
ObjectNode clusterJsonNodes = JSONUtil.parseObject(clusterJson);
String jobManagerHost = clusterJsonNodes.get("jobManagerHost").asText();
if (Asserts.isNotNull(task.getAlertGroupId())) {
AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId());
if (Asserts.isNotNull(alertGroup)) {
List<AlertMsg> alertMsgList = new ArrayList<>();
AlertMsg alertMsg = new AlertMsg();
alertMsg.setType("Flink 实时监控");
alertMsg.setTime(LocalDateTime.now().toString());
alertMsg.setId(jobInstance.getId().toString());
alertMsg.setName(task.getAlias());
alertMsg.setStatus(jobInstance.getStatus());
alertMsg.setContent(jobInstance.getJid());
alertMsgList.add(alertMsg);
alertMsg.setAlertType("Flink 实时监控");
alertMsg.setAlertTime(dateFormat.format(new Date()));
alertMsg.setJobID(jobInstance.getJid());
alertMsg.setJobName(task.getName());
alertMsg.setJobType(task.getDialect());
alertMsg.setJobStatus(jobInstance.getStatus());
alertMsg.setJobStartTime(startTime);
alertMsg.setJobEndTime(endTime);
alertMsg.setJobDuration(duration);
String linkUrl = "http://" + jobManagerHost + "/#/job/" + jobInstance.getJid() + "/overview";
String exceptionUrl = "http://" + jobManagerHost + "/#/job/" + jobInstance.getJid() + "/exceptions";
for (AlertInstance alertInstance : alertGroup.getInstances()) {
sendAlert(alertInstance, jobInstance, task, alertMsgList);
Map<String, String> map = JSONUtil.toMap(alertInstance.getParams());
if (map.get("msgtype").equals(ShowType.MARKDOWN.getValue())) {
alertMsg.setLinkUrl("[跳转至该任务的 FlinkWeb](" + linkUrl + ")");
alertMsg.setExceptionUrl("[点击查看该任务的异常日志](" + exceptionUrl + ")");
}else {
alertMsg.setLinkUrl(linkUrl);
alertMsg.setExceptionUrl(exceptionUrl);
}
sendAlert(alertInstance, jobInstance, task, alertMsg);
}
}
}
......@@ -701,12 +719,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
updateById(updateTask);
}
private void sendAlert(AlertInstance alertInstance, JobInstance jobInstance, Task task, List<AlertMsg> alertMsgList) {
private void sendAlert(AlertInstance alertInstance, JobInstance jobInstance, Task task, AlertMsg alertMsg) {
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.build(alertConfig);
String title = "任务【" + task.getAlias() + "】:" + jobInstance.getStatus();
String content = JSONUtil.toJsonString(alertMsgList);
String content = alertMsg.toString();
AlertResult alertResult = alert.send(title, content);
AlertHistory alertHistory = new AlertHistory();
alertHistory.setAlertGroupId(task.getAlertGroupId());
alertHistory.setJobInstanceId(jobInstance.getId());
......
package com.dlink.alert;
import lombok.Data;
/**
* AlertMsg
*
* @author wenmo
* @since 2022/3/7 18:30
**/
public class AlertMsg {
private String type;
private String time;
private String id;
private String name;
private String status;
private String content;
public AlertMsg() {
}
public AlertMsg(String type, String time, String id, String name, String status, String content) {
this.type = type;
this.time = time;
this.id = id;
this.name = name;
this.status = status;
this.content = content;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
@Data
public class AlertMsg {
public void setName(String name) {
this.name = name;
}
private String AlertType; // 告警类型
private String AlertTime; // 告警时间
private String JobID; // 任务ID
private String JobName; // 任务名称
private String JobType; // 任务类型
private String JobStatus; // 任务状态
private String JobStartTime; // 任务开始时间
private String JobEndTime; // 任务结束时间
private String JobDuration; // 任务耗时
private String LinkUrl; // Flink webUI 链接
public String getStatus() {
return status;
}
private String ExceptionUrl; // Flink job Root Exception 链接
public void setStatus(String status) {
this.status = status;
public AlertMsg() {
}
public String getContent() {
return content;
public AlertMsg(String alertType, String alertTime, String jobID, String jobName, String jobType, String jobStatus, String jobStartTime, String jobEndTime, String jobDuration, String linkUrl, String exceptionUrl) {
this.AlertType = alertType;
this.AlertTime = alertTime;
this.JobID = jobID;
this.JobName = jobName;
this.JobType = jobType;
this.JobStatus = jobStatus;
this.JobStartTime = jobStartTime;
this.JobEndTime = jobEndTime;
this.JobDuration = jobDuration;
this.LinkUrl = linkUrl;
this.ExceptionUrl = exceptionUrl;
}
public void setContent(String content) {
this.content = content;
public String toString() {
return "[{ \"Alert Type\":\""+AlertType+"\","
+
"\"Alert Time\":\""+AlertTime+"\","
+
"\"Job ID\":\""+JobID+"\","
+
"\"Job Name\":\""+ JobName +"\","
+
"\"Job Type\":\""+ JobType +"\","
+
"\"Job Status\":\""+ JobStatus +"\","
+
"\"Job StartTime\": \"" +JobStartTime +"\","
+
"\"Job EndTime\": \""+JobEndTime+"\","
+
"\"Job Duration\": \""+JobDuration+"\","
// +
// "\"LinkUrl\": \""+ LinkUrl +"\","
+
"\"Exception Log\" :\""+ ExceptionUrl +"\"" +
"}]";
}
}
package com.dlink.alert.dingtalk;
import com.dlink.alert.Alert;
import com.dlink.alert.AlertConfig;
import com.dlink.alert.AlertResult;
import com.dlink.alert.ShowType;
import com.dlink.alert.*;
import com.dlink.utils.JSONUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* DingTalkTest
......@@ -20,32 +19,30 @@ import java.util.Map;
public class DingTalkTest {
private static Map<String, String> config = new HashMap<>();
private String contentTest = "[{\"id\":\"70\","
+
"\"name\":\"UserBehavior-0--1193959466\","
+
"\"Job name\":\"Start workflow\","
+
"\"State\":\"SUCCESS\","
+
"\"Recovery\":\"NO\","
+
"\"Run time\":\"1\","
+
"\"Start time\": \"2018-08-06 10:31:34.0\","
+
"\"End time\": \"2018-08-06 10:31:49.0\","
+
"\"Host\": \"192.168.xx.xx\","
+
"\"Notify group\" :\"4\"}]";
private AlertMsg alertMsg = new AlertMsg();
@Before
public void initDingTalkConfig() {
String uuid = UUID.randomUUID().toString();
alertMsg.setAlertType("实时告警监控");
alertMsg.setAlertTime("2018-08-06 10:31:34.0");
alertMsg.setJobID(uuid);
alertMsg.setJobName("测试任务");
alertMsg.setJobType("SQL");
alertMsg.setJobStatus("FAILED");
alertMsg.setJobStartTime("2018-08-06 10:31:34.0");
alertMsg.setJobEndTime("2018-08-06 10:31:49.0");
alertMsg.setJobDuration("23 Seconds");
String linkUrl = "[跳转至该任务的FlinkWeb](http://cdh1:8081/#/job/"+uuid+"/overview)";
alertMsg.setLinkUrl(linkUrl);
String exceptionUrl = "[点击查看该任务的异常日志](http://cdh1:8081/#/job/"+uuid+"/exceptions)";
alertMsg.setExceptionUrl(exceptionUrl);
config.put(DingTalkConstants.KEYWORD, "Dinky-Fink 钉钉告警测试");
config.put(DingTalkConstants.WEB_HOOK, "url");
config.put(DingTalkConstants.MSG_TYPE, ShowType.MARKDOWN.getValue());
config.put(DingTalkConstants.AT_ALL, "true");
config.put(DingTalkConstants.PROXY_ENABLE, "false");
config.put(DingTalkConstants.PASSWORD, "password");
......@@ -57,7 +54,7 @@ public class DingTalkTest {
public void sendMarkDownMsgTest() {
AlertConfig config = AlertConfig.build("MarkDownTest", "DingTalk", DingTalkTest.config);
Alert alert = Alert.build(config);
AlertResult result = alert.send("Dinky钉钉告警测试", contentTest);
AlertResult result = alert.send("Dinky钉钉告警测试", alertMsg.toString());
Assert.assertEquals(true, result.getSuccess());
}
......@@ -66,7 +63,7 @@ public class DingTalkTest {
config.put(DingTalkConstants.MSG_TYPE, ShowType.TEXT.getValue());
AlertConfig config = AlertConfig.build("TextMsgTest", "DingTalk", DingTalkTest.config);
Alert alert = Alert.build(config);
AlertResult result = alert.send("Dinky钉钉告警测试", contentTest);
AlertResult result = alert.send("Dinky钉钉告警测试", alertMsg.toString());
Assert.assertEquals(true, result.getSuccess());
}
......
package com.dlink.alert.email;
import com.dlink.alert.AlertMsg;
import com.dlink.alert.AlertResult;
import com.dlink.alert.ShowType;
import com.dlink.alert.email.template.AlertTemplate;
import com.dlink.alert.email.template.DefaultHTMLTemplate;
import com.dlink.utils.JSONUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
......@@ -20,19 +22,28 @@ public class EmailSenderTest {
private static AlertTemplate alertTemplate;
String title = "Dinky Email Alert";
String content = "[{\"id\":\"69\","
+ "\"name\":\"UserBehavior-0--1193959466\","
+ "\"Job name\": \"Start workflow\","
+ "\"State\": \"SUCCESS\","
+ "\"Recovery\":\"NO\","
+ "\"Run time\": \"1\","
+ "\"Start time\": \"2018-08-06 10:31:34.0\","
+ "\"End time\": \"2018-08-06 10:31:49.0\","
+ "\"Host\": \"192.168.xx.xx\","
+ "\"Notify group\" :\"4\"}]";
@BeforeClass
public static void initEmailConfig() {
private AlertMsg alertMsg = new AlertMsg();
@Before
public void initEmailConfig() {
String uuid = UUID.randomUUID().toString();
alertMsg.setAlertType("实时告警监控");
alertMsg.setAlertTime("2018-08-06 10:31:34.0");
alertMsg.setJobID(uuid);
alertMsg.setJobName("测试任务");
alertMsg.setJobType("SQL");
alertMsg.setJobStatus("FAILED");
alertMsg.setJobStartTime("2018-08-06 10:31:34.0");
alertMsg.setJobEndTime("2018-08-06 10:31:49.0");
alertMsg.setJobDuration("23 Seconds");
String linkUrl = "[跳转至该任务的FlinkWeb](http://cdh1:8081/#/job/"+uuid+"/overview)";
alertMsg.setLinkUrl(linkUrl);
String exceptionUrl = "[点击查看该任务的异常日志](http://cdh1:8081/#/job/"+uuid+"/exceptions)";
alertMsg.setExceptionUrl(exceptionUrl);
emailConfig.put(EmailConstants.NAME_MAIL_PROTOCOL, "smtp");
emailConfig.put(EmailConstants.NAME_MAIL_SMTP_HOST, "smtp.mxhichina.com");
emailConfig.put(EmailConstants.NAME_MAIL_SMTP_PORT, "465");
......@@ -52,7 +63,7 @@ public class EmailSenderTest {
@Test
public void testTextSendMails() {
AlertResult alertResult = mailSender.send(title, content);
AlertResult alertResult = mailSender.send(title, alertMsg.toString());
Assert.assertEquals(true, alertResult.getSuccess()); // 格式需要调整
}
......@@ -61,7 +72,7 @@ public class EmailSenderTest {
public void testSendTableMail() {
emailConfig.put(EmailConstants.NAME_SHOW_TYPE, ShowType.TABLE.getValue());
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.send(title, content);
AlertResult alertResult = mailSender.send(title, alertMsg.toString());
Assert.assertEquals(true, alertResult.getSuccess());
}
......@@ -69,7 +80,7 @@ public class EmailSenderTest {
public void testAttachmentFile() {
emailConfig.put(EmailConstants.NAME_SHOW_TYPE, ShowType.ATTACHMENT.getValue());
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.send(title, content);
AlertResult alertResult = mailSender.send(title, alertMsg.toString());
Assert.assertEquals(true, alertResult.getSuccess());
}
......@@ -77,13 +88,13 @@ public class EmailSenderTest {
public void testTableAttachmentFile() {
emailConfig.put(EmailConstants.NAME_SHOW_TYPE, ShowType.TABLE_ATTACHMENT.getValue());
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.send(title, content);
AlertResult alertResult = mailSender.send(title, alertMsg.toString());
Assert.assertEquals(true, alertResult.getSuccess());
}
@Test
public void testGenTextEmail() {
List<LinkedHashMap> linkedHashMaps = JSONUtil.toList(content, LinkedHashMap.class);
List<LinkedHashMap> linkedHashMaps = JSONUtil.toList(alertMsg.toString(), LinkedHashMap.class);
if (linkedHashMaps.size() > EmailConstants.NUMBER_1000) {
linkedHashMaps = linkedHashMaps.subList(0, EmailConstants.NUMBER_1000);
}
......
......@@ -17,8 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
/**
......@@ -68,9 +67,9 @@ public final class FeiShuSender {
}
}
private String toJsonSendMsg(AlertMsg alertMsg) {
private String toJsonSendMsg(String title, String content) {
String jsonResult ="";
byte[] byt = StringUtils.getBytesUtf8(formatContent(alertMsg));
byte[] byt = StringUtils.getBytesUtf8(formatContent(title,content));
String contentResult = StringUtils.newStringUtf8(byt);
String userIdsToText = mkUserIds(org.apache.commons.lang3.StringUtils.isBlank(atUserIds)? "all": atUserIds);
if (StringUtils.equals(ShowType.TEXT.getValue(), msgType)) {
......@@ -130,17 +129,19 @@ public final class FeiShuSender {
return alertResult;
}
public static String formatContent(AlertMsg alertMsg) {
if (alertMsg.getContent() != null) {
List<Map> list = JSONUtil.toList(alertMsg.getContent(), Map.class);
if (list.isEmpty()) {
return alertMsg.getName() + alertMsg.getContent();
public static String formatContent(String title, String content) {
List<LinkedHashMap> mapSendResultItemsList = JSONUtil.toList(content, LinkedHashMap.class);
if (null == mapSendResultItemsList || mapSendResultItemsList.isEmpty()) {
logger.error("itemsList is null");
throw new RuntimeException("itemsList is null");
}
StringBuilder contents = new StringBuilder(100);
contents.append(String.format("`%s`"+FeiShuConstants.MARKDOWN_ENTER, alertMsg.getName()));
for (Map map : list) {
for (Entry<String, Object> entry : (Iterable<Entry<String, Object>>) map.entrySet()) {
contents.append(String.format("`%s` %s",title,FeiShuConstants.MARKDOWN_ENTER));
for (LinkedHashMap mapItems : mapSendResultItemsList) {
Set<Entry<String, Object>> entries = mapItems.entrySet();
Iterator<Entry<String, Object>> iterator = entries.iterator();
while (iterator.hasNext()) { {
Map.Entry<String, Object> entry = iterator.next();
String key = entry.getKey();
String value = entry.getValue().toString();
contents.append(FeiShuConstants.MARKDOWN_QUOTE);
......@@ -154,11 +155,8 @@ public final class FeiShuSender {
public AlertResult send(String title,String content) {
AlertResult alertResult;
AlertMsg alertMsg = new AlertMsg();
alertMsg.setName(title);
alertMsg.setContent(content);
try {
String resp = sendMsg(alertMsg);
String resp = sendMsg(title, content);
return checkSendFeiShuSendMsgResult(resp);
} catch (Exception e) {
logger.info("send fei shu alert msg exception : {}", e.getMessage());
......@@ -169,9 +167,9 @@ public final class FeiShuSender {
return alertResult;
}
private String sendMsg(AlertMsg alertMsg) throws IOException {
private String sendMsg(String title,String content) throws IOException {
String msgToJson = toJsonSendMsg(alertMsg);
String msgToJson = toJsonSendMsg(title,content);
HttpPost httpPost = HttpRequestUtil.constructHttpPost(url, msgToJson);
CloseableHttpClient httpClient;
httpClient = HttpRequestUtil.getHttpClient(enableProxy, proxy, port, user, password);
......@@ -190,7 +188,7 @@ public final class FeiShuSender {
} finally {
response.close();
}
logger.info("Fei Shu send title :{} ,content :{}, resp: {}", alertMsg.getName(), alertMsg.getContent(), resp);
logger.info("Fei Shu send title :{} ,content :{}, resp: {}", title, content, resp);
return resp;
} finally {
httpClient.close();
......
......@@ -8,6 +8,7 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @Author: zhumingye
......@@ -18,52 +19,46 @@ public class FeiShuSenderTest {
private static Map<String, String> feiShuConfig = new HashMap<>();
private AlertMsg alertMsg = new AlertMsg();
String alertMsgContentTemplate = "[\n"
+ " {\n"
+ " \"owner\": \"dlink\",\n"
+ " \"processEndTime\": \"2021-01-29 19:01:11\",\n"
+ " \"processHost\": \"10.81.129.4:5678\",\n"
+ " \"processId\": 2926,\n"
+ " \"processName\": \"3-20210129190038108\",\n"
+ " \"processStartTime\": \"2021-01-29 19:00:38\",\n"
+ " \"processState\": \"SUCCESS\",\n"
+ " \"processType\": \"START_PROCESS\",\n"
+ " \"projectId\": 2,\n"
+ " \"projectName\": \"testdelproject\",\n"
+ " \"recovery\": \"NO\",\n"
+ " \"retryTimes\": 0,\n"
+ " \"runTimes\": 1,\n"
+ " \"taskId\": 0\n"
+ " }\n"
+ "]";
@Before
public void initFeiShuConfig() {
feiShuConfig.put(FeiShuConstants.WEB_HOOK, "https://open.feishu.cn/open-apis/bot/v2/hook/aea3cd7f13154854541dsadsadas08f2a9");
String uuid = UUID.randomUUID().toString();
alertMsg.setAlertType("实时告警监控");
alertMsg.setAlertTime("2018-08-06 10:31:34.0");
alertMsg.setJobID(uuid);
alertMsg.setJobName("测试任务");
alertMsg.setJobType("SQL");
alertMsg.setJobStatus("FAILED");
alertMsg.setJobStartTime("2018-08-06 10:31:34.0");
alertMsg.setJobEndTime("2018-08-06 10:31:49.0");
alertMsg.setJobDuration("23 Seconds");
String linkUrl = "[跳转至该任务的FlinkWeb](http://cdh1:8081/#/job/"+uuid+"/overview)";
alertMsg.setLinkUrl(linkUrl);
String exceptionUrl = "[点击查看该任务的异常日志](http://cdh1:8081/#/job/"+uuid+"/exceptions)";
alertMsg.setExceptionUrl(exceptionUrl);
feiShuConfig.put(FeiShuConstants.WEB_HOOK, "https://open.feishu.cn/open-apis/bot/v2/hook/aea3cd7f-75b4-45cd-abea-2c0dc808f2a9");
feiShuConfig.put(FeiShuConstants.KEY_WORD, "Dinky 飞书WebHook 告警测试");
feiShuConfig.put(FeiShuConstants.MSG_TYPE,"text");
feiShuConfig.put(FeiShuConstants.AT_ALL, "false");
feiShuConfig.put(FeiShuConstants.AT_USERS, "user1,user2,user3");
feiShuConfig.put(FeiShuConstants.AT_ALL, "true");
feiShuConfig.put(FeiShuConstants.AT_USERS, "zhumingye");
}
@Test
public void testTextTypeSend() {
AlertMsg alertMsg = new AlertMsg();
alertMsg.setName("Dinky 飞书WebHook 告警测试");
alertMsg.setContent(alertMsgContentTemplate);
FeiShuSender feiShuSender = new FeiShuSender(feiShuConfig);
AlertResult alertResult = feiShuSender.send(alertMsg.getName(),alertMsg.getContent());
AlertResult alertResult = feiShuSender.send("FeiShu Alert", alertMsg.toString());
Assert.assertEquals(true, alertResult.getSuccess());
}
@Test
public void testPostTypeSend() {
feiShuConfig.put(FeiShuConstants.MSG_TYPE,"post");
AlertMsg alertMsg = new AlertMsg();
alertMsg.setName("Dinky 飞书WebHook 告警测试");
alertMsg.setContent(alertMsgContentTemplate);
FeiShuSender feiShuSender = new FeiShuSender(feiShuConfig);
AlertResult alertResult = feiShuSender.send(alertMsg.getName(),alertMsg.getContent());
AlertResult alertResult = feiShuSender.send("FeiShu Alert", alertMsg.toString());
Assert.assertEquals(true, alertResult.getSuccess());
}
......
......@@ -24,7 +24,7 @@ public class WeChatConstants {
static final String WEBHOOK = "webhook";
static final String WEBHOOK_TEMPLATE = "{\"msgtype\":\"{showType}\",\"{showType}\":{\"content\":\"{msg} \"}}";
static final String WEBHOOK_TEMPLATE = "{\"msgtype\":\"{msgtype}\",\"{msgtype}\":{\"content\":\"{msg} \"}}";
static final String KEYWORD = "keyword";
......@@ -36,7 +36,7 @@ public class WeChatConstants {
static final String TEAM_SEND_MSG = "teamSendMsg";
static final String USER_SEND_MSG = "{\"touser\":\"{toUser}\",\"agentid\":{agentId},\"msgtype\":\"{showType}\",\"{showType}\":{\"content\":\"{msg}\"}}";
static final String USER_SEND_MSG = "{\"touser\":\"{toUser}\",\"agentid\":{agentId},\"msgtype\":\"{msgtype}\",\"{msgtype}\":{\"content\":\"{msg}\"}}";
static final String AGENT_ID = "agentId";
......@@ -44,6 +44,6 @@ public class WeChatConstants {
static final String SEND_TYPE = "sendType";
static final String SHOW_TYPE = "showType";
static final String SHOW_TYPE = "msgtype";
}
......@@ -36,7 +36,7 @@ public class WeChatSender {
private static final String CORP_ID_REGEX = "{corpId}";
private static final String SECRET_REGEX = "{secret}";
private static final String TOKEN_REGEX = "{token}";
private static final String SHOW_TYPE_REGEX = "{showType}";
private static final String SHOW_TYPE_REGEX = "{msgtype}";
private final String weChatAgentId;
private final String weChatUsers;
private final String weChatUserSendMsg;
......
......@@ -13,10 +13,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* FlinkAPI
......@@ -182,46 +179,149 @@ public class FlinkAPI {
return get(FlinkRestAPIConstant.JOBS + jobId + FlinkRestAPIConstant.CONFIG);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerMetrics 获取jobManager的监控信息
* @return JsonNode
*/
public JsonNode getJobManagerMetrics() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.METRICS);
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER));
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerConfig 获取jobManager的配置信息
* @return JsonNode
*/
public JsonNode getJobManagerConfig() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.CONFIG);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerLog 获取jobManager的日志信息
* @return JsonNode
*/
public String getJobManagerLog() {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOG);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerStdOut 获取jobManager的控制台输出日志
* @return String
*/
public String getJobManagerStdOut() {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.STDOUT);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerLogList 获取jobManager的日志列表
* @return JsonNode
*/
public JsonNode getJobManagerLogList() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerLogFileDetail 获取jobManager的日志文件的具体信息
* @param logName 日志文件名
* @return String
*/
public String getJobManagerLogFileDetail(String logName) {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS + logName);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagers 获取taskManager的列表
* @return JsonNode
*/
public JsonNode getTaskManagers() {
return get(FlinkRestAPIConstant.TASK_MANAGER);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: buildMetricsParms 构建metrics参数
* @Params: type: 入参类型 可选值:task-manager, job-manager
* @return String
*/
public String buildMetricsParms(String type) {
JsonNode jsonNode = get(type + FlinkRestAPIConstant.METRICS);
StringBuilder sb = new StringBuilder();
Iterator<JsonNode> jsonNodeIterator = jsonNode.elements();
while(jsonNodeIterator.hasNext()) {
JsonNode node = jsonNodeIterator.next();
sb.append(node.get("id").asText()).append(",");
}
return sb.deleteCharAt(sb.length() - 1).toString();
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerLog 获取jobManager的日志信息
* @return JsonNode
*/
public JsonNode getTaskManagerMetrics(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.METRICS);
JsonNode TaskManagerMetricsJsonNode = get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER));
return TaskManagerMetricsJsonNode;
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerLog 获取taskManager的日志信息
* @param containerId 容器id
* @return String
*/
public String getTaskManagerLog(String containerId) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOG);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerStdOut 获取taskManager的StdOut日志信息
* @param containerId 容器id
* @return JsonNode
*/
public String getTaskManagerStdOut(String containerId) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.STDOUT);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerLogList 获取taskManager的日志列表
* @param containerId 容器id
* @return JsonNode
*/
public JsonNode getTaskManagerLogList(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOGS);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerLogFileDeatil 获取具体日志的详细信息
* @param logName 日志名称
* @return String
*/
public String getTaskManagerLogFileDeatil(String containerId,String logName) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOGS + logName);
}
/**
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerThreadDump 获取taskManager的线程信息
* @return JsonNode
*/
public JsonNode getTaskManagerThreadDump(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.THREAD_DUMP);
}
......
......@@ -44,4 +44,7 @@ public final class FlinkRestAPIConstant {
public static final String THREAD_DUMP = "/thread-dump/";
public static final String GET = "?get=";
}
......@@ -16,7 +16,7 @@ import java.util.List;
public class FlinkRestAPITest {
//private String address = "192.168.123.157:8081";
private String address = "cdh5:8081";
private String address = "cdh1:8081";
@Test
public void savepointTest() {
......@@ -39,19 +39,19 @@ public class FlinkRestAPITest {
@Test
public void getCheckPointsDetailInfoTest() {
JsonNode checkPointsDetailInfo = FlinkAPI.build(address).getCheckPointsConfig("178e954faaa4bf06cfbda971bb8b2957");
JsonNode checkPointsDetailInfo = FlinkAPI.build(address).getCheckPointsConfig("9b0910c865874430b98d3817a248eb24");
System.out.println(checkPointsDetailInfo.toString());
}
@Test
public void getConfigurationsDetailsInfoTest() {
JsonNode configurationsDetailsInfo = FlinkAPI.build(address).getJobsConfig("178e954faaa4bf06cfbda971bb8b2957");
JsonNode configurationsDetailsInfo = FlinkAPI.build(address).getJobsConfig("9b0910c865874430b98d3817a248eb24");
System.out.println(configurationsDetailsInfo.toString());
}
@Test
public void getExectionsInfoTest() {
JsonNode exectionsDetailInfo = FlinkAPI.build(address).getException("178e954faaa4bf06cfbda971bb8b2957");
JsonNode exectionsDetailInfo = FlinkAPI.build(address).getException("9b0910c865874430b98d3817a248eb24");
System.out.println(exectionsDetailInfo.toString());
}
......@@ -86,6 +86,11 @@ public class FlinkRestAPITest {
JsonNode jobManagerLogList = FlinkAPI.build(address).getJobManagerLogList();
System.out.println(jobManagerLogList.toString());
}
@Test
public void getJobManagerLogListToDetailTest() {
String jobManagerLogList = FlinkAPI.build(address).getJobManagerLogFileDetail("jobmanager.log");
System.out.println(jobManagerLogList.toString());
}
@Test
public void getTaskManagersTest() {
......@@ -95,33 +100,37 @@ public class FlinkRestAPITest {
@Test
public void getTaskManagerMetricsTest() {
JsonNode taskManagerMetrics = FlinkAPI.build(address).getTaskManagerMetrics("container_e34_1646992539398_0004_01_000002");
JsonNode taskManagerMetrics = FlinkAPI.build(address).getTaskManagerMetrics("container_e46_1655948912029_0061_01_000002");
System.out.println(taskManagerMetrics.toString());
}
@Test
public void getTaskManagerLogTest() {
String taskManagerLog = FlinkAPI.build(address).getTaskManagerLog("container_e34_1646992539398_0004_01_000002");
String taskManagerLog = FlinkAPI.build(address).getTaskManagerLog("container_e46_1655948912029_0061_01_000002");
System.out.println(taskManagerLog);
}
@Test
public void getTaskManagerStdOutTest() {
String taskManagerStdOut = FlinkAPI.build(address).getTaskManagerStdOut("container_e34_1646992539398_0004_01_000002");
String taskManagerStdOut = FlinkAPI.build(address).getTaskManagerStdOut("container_e46_1655948912029_0061_01_000002");
System.out.println(taskManagerStdOut);
}
@Test
public void getTaskManagerLogListTest() {
JsonNode taskManagerLogList = FlinkAPI.build(address).getTaskManagerLogList("container_e34_1646992539398_0004_01_000002");
JsonNode taskManagerLogList = FlinkAPI.build(address).getTaskManagerLogList("container_e46_1655948912029_0061_01_000002");
System.out.println(taskManagerLogList.toString());
}
@Test
public void getTaskManagerLogListToDetail() {
String taskManagerLogDetail = FlinkAPI.build(address).getTaskManagerLogFileDeatil("container_e46_1655948912029_0061_01_000002","taskmanager.log");
System.out.println(taskManagerLogDetail);
}
@Test
public void getTaskManagerThreadDumpTest() {
JsonNode taskManagerThreadDump = FlinkAPI.build(address).getTaskManagerThreadDump("container_e34_1646992539398_0004_01_000002");
JsonNode taskManagerThreadDump = FlinkAPI.build(address).getTaskManagerThreadDump("container_e46_1655948912029_0061_01_000002");
System.out.println(taskManagerThreadDump.toString());
}
}
......@@ -137,7 +137,7 @@ const WeChatForm: React.FC<AlertInstanceFormProps> = (props) => {
</>
}
<Form.Item
name="showType"
name="msgtype"
label="展示方式"
rules={[{required: true, message: '请选择展示方式!'}]}
>
......
......@@ -302,7 +302,7 @@ const ClusterTableList: React.FC<{}> = (props: any) => {
{
title: '操作',
dataIndex: 'option',
tooltip: 'FLinkWebUI连接 当集群状态为`可用`时! 支持 SESSION | STANDALONE',
tooltip: 'FLinkWebUI连接 当集群状态为`可用`时! 支持 KUBERNETES 之外的模式',
valueType: 'option',
render: (_, record) => [
<a
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment