Commit 54340813 authored by wenmo's avatar wenmo

实时监控和自动告警

parent beeb8a51
...@@ -87,6 +87,8 @@ Dinky(原 Dlink): ...@@ -87,6 +87,8 @@ Dinky(原 Dlink):
| | 归档 | 新增 执行与提交历史 | 0.4.0 | | | 归档 | 新增 执行与提交历史 | 0.4.0 |
| 运维中心 | 主页 | 新增 任务实例列表 | 0.6.0 | | 运维中心 | 主页 | 新增 任务实例列表 | 0.6.0 |
| | 作业监控 | 新增 作业总览 | 0.6.0 | | | 作业监控 | 新增 作业总览 | 0.6.0 |
| | | 新增 实时监控 Flink Job | 0.6.0 |
| | | 新增 自动告警 | 0.6.0 |
| | | 新增 FlinkWebUI 跳转 | 0.6.0 | | | | 新增 FlinkWebUI 跳转 | 0.6.0 |
| | | 新增 智能重启(重新上线) | 0.6.0 | | | | 新增 智能重启(重新上线) | 0.6.0 |
| | | 新增 智能停止(下线) | 0.6.0 | | | | 新增 智能停止(下线) | 0.6.0 |
......
...@@ -49,10 +49,8 @@ public class FlinkJobTask implements DaemonTask { ...@@ -49,10 +49,8 @@ public class FlinkJobTask implements DaemonTask {
} }
preDealTime = System.currentTimeMillis(); preDealTime = System.currentTimeMillis();
JobInstance jobInstance = taskService.refreshJobInstance(config.getId()); JobInstance jobInstance = taskService.refreshJobInstance(config.getId());
log.info("监控任务:"+jobInstance.getId());
if(!JobStatus.isDone(jobInstance.getStatus())){ if(!JobStatus.isDone(jobInstance.getStatus())){
DefaultThreadPool.getInstance().execute(this); DefaultThreadPool.getInstance().execute(this);
}; }
} }
} }
...@@ -3,6 +3,7 @@ package com.dlink.service.impl; ...@@ -3,6 +3,7 @@ package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.Alert; import com.dlink.alert.Alert;
import com.dlink.alert.AlertConfig; import com.dlink.alert.AlertConfig;
import com.dlink.alert.AlertMsg;
import com.dlink.alert.AlertResult; import com.dlink.alert.AlertResult;
import com.dlink.assertion.Assert; import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
...@@ -17,11 +18,9 @@ import com.dlink.gateway.config.SavePointStrategy; ...@@ -17,11 +18,9 @@ import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.gateway.config.SavePointType; import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import com.dlink.job.JobManager; import com.dlink.job.JobManager;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.mapper.JobInstanceMapper;
import com.dlink.mapper.TaskMapper; import com.dlink.mapper.TaskMapper;
import com.dlink.metadata.driver.Driver; import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.metadata.result.JdbcSelectResult;
...@@ -34,6 +33,7 @@ import org.springframework.beans.factory.annotation.Value; ...@@ -34,6 +33,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -454,18 +454,27 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -454,18 +454,27 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if(Asserts.isNotNull(task.getAlertGroupId())){ if(Asserts.isNotNull(task.getAlertGroupId())){
AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId()); AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId());
if(Asserts.isNotNull(alertGroup)){ if(Asserts.isNotNull(alertGroup)){
List<AlertMsg> alertMsgList = new ArrayList<>();
AlertMsg alertMsg = new AlertMsg();
alertMsg.setType("Flink 实时监控");
alertMsg.setTime(LocalDateTime.now().toString());
alertMsg.setId(jobInstance.getId().toString());
alertMsg.setName(task.getAlias());
alertMsg.setStatus(jobInstance.getStatus());
alertMsg.setContent(jobInstance.getJid());
alertMsgList.add(alertMsg);
for(AlertInstance alertInstance: alertGroup.getInstances()){ for(AlertInstance alertInstance: alertGroup.getInstances()){
sendAlert(alertInstance,jobInstance,task); sendAlert(alertInstance,jobInstance,task,alertMsgList);
} }
} }
} }
} }
private void sendAlert(AlertInstance alertInstance,JobInstance jobInstance,Task task){ private void sendAlert(AlertInstance alertInstance,JobInstance jobInstance,Task task,List<AlertMsg> alertMsgList){
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams())); AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.build(alertConfig); Alert alert = Alert.build(alertConfig);
String title = "任务【"+task.getAlias()+"】:"+jobInstance.getStatus(); String title = "任务【"+task.getAlias()+"】:"+jobInstance.getStatus();
String content = "jid【"+jobInstance.getJid()+"】于【"+jobInstance.getUpdateTime().toString()+"】时【"+jobInstance.getStatus()+"】!"; String content = JSONUtil.toJsonString(alertMsgList);
AlertResult alertResult = alert.send(title, content); AlertResult alertResult = alert.send(title, content);
AlertHistory alertHistory = new AlertHistory(); AlertHistory alertHistory = new AlertHistory();
alertHistory.setAlertGroupId(task.getAlertGroupId()); alertHistory.setAlertGroupId(task.getAlertGroupId());
......
package com.dlink.alert;
/**
* AlertMsg
*
* @author wenmo
* @since 2022/3/7 18:30
**/
public class AlertMsg {
private String type;
private String time;
private String id;
private String name;
private String status;
private String content;
public AlertMsg() {
}
public AlertMsg(String type, String time, String id, String name, String status, String content) {
this.type = type;
this.time = time;
this.id = id;
this.name = name;
this.status = status;
this.content = content;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
...@@ -10,29 +10,29 @@ public final class DingTalkConstants { ...@@ -10,29 +10,29 @@ public final class DingTalkConstants {
static final String TYPE = "DingTalk"; static final String TYPE = "DingTalk";
static final String PROXY_ENABLE = "IsEnableProxy"; static final String PROXY_ENABLE = "isEnableProxy";
static final String WEB_HOOK = "WebHook"; static final String WEB_HOOK = "webhook";
static final String KEYWORD = "Keyword"; static final String KEYWORD = "keyword";
static final String SECRET = "Secret"; static final String SECRET = "secret";
static final String MSG_TYPE = "MsgType"; static final String MSG_TYPE = "msgType";
static final String AT_MOBILES = "AtMobiles"; static final String AT_MOBILES = "atMobiles";
static final String AT_USERIDS = "AtUserIds"; static final String AT_USERIDS = "atUserIds";
static final String AT_ALL = "IsAtAll"; static final String AT_ALL = "isAtAll";
static final String PROXY = "Proxy"; static final String PROXY = "proxy";
static final String PORT = "Port"; static final String PORT = "port";
static final String USER = "User"; static final String USER = "user";
static final String PASSWORD = "Password"; static final String PASSWORD = "password";
static final String MSG_TYPE_TEXT = "text"; static final String MSG_TYPE_TEXT = "text";
......
...@@ -17,6 +17,7 @@ public class WeChatAlert extends AbstractAlert { ...@@ -17,6 +17,7 @@ public class WeChatAlert extends AbstractAlert {
@Override @Override
public AlertResult send(String title, String content) { public AlertResult send(String title, String content) {
return null; WeChatSender sender = new WeChatSender(getConfig().getParam());
return sender.send(title, content);
} }
} }
...@@ -28,7 +28,7 @@ public class WeChatConstants { ...@@ -28,7 +28,7 @@ public class WeChatConstants {
static final String TEAM_SEND_MSG = "teamSendMsg"; static final String TEAM_SEND_MSG = "teamSendMsg";
static final String USER_SEND_MSG = "userSendMsg"; static final String USER_SEND_MSG = "{\"touser\":\"{toUser}\",\"agentid\":{agentId},\"msgtype\":\"{showType}\",\"{showType}\":{\"content\":\"{msg}\"}}";
static final String AGENT_ID = "agentId"; static final String AGENT_ID = "agentId";
......
...@@ -52,7 +52,7 @@ public class WeChatSender { ...@@ -52,7 +52,7 @@ public class WeChatSender {
String weChatCorpId = config.get(WeChatConstants.CORP_ID); String weChatCorpId = config.get(WeChatConstants.CORP_ID);
String weChatSecret = config.get(WeChatConstants.SECRET); String weChatSecret = config.get(WeChatConstants.SECRET);
String weChatTokenUrl = WeChatConstants.TOKEN_URL; String weChatTokenUrl = WeChatConstants.TOKEN_URL;
weChatUserSendMsg = config.get(WeChatConstants.USER_SEND_MSG); weChatUserSendMsg = WeChatConstants.USER_SEND_MSG;
sendType = config.get(WeChatConstants.SEND_TYPE); sendType = config.get(WeChatConstants.SEND_TYPE);
showType = config.get(WeChatConstants.SHOW_TYPE); showType = config.get(WeChatConstants.SHOW_TYPE);
requireNonNull(showType, WeChatConstants.SHOW_TYPE + " must not null"); requireNonNull(showType, WeChatConstants.SHOW_TYPE + " must not null");
...@@ -107,8 +107,7 @@ public class WeChatSender { ...@@ -107,8 +107,7 @@ public class WeChatSender {
} finally { } finally {
response.close(); response.close();
} }
logger.info("Enterprise WeChat send [{}], param:{}, resp:{}", // logger.info("Enterprise WeChat send [{}], param:{}, resp:{}", url, data, resp);
url, data, resp);
return resp; return resp;
} }
} }
......
...@@ -2,12 +2,10 @@ package com.dlink.daemon.task; ...@@ -2,12 +2,10 @@ package com.dlink.daemon.task;
import com.dlink.daemon.constant.FlinkTaskConstant; import com.dlink.daemon.constant.FlinkTaskConstant;
import com.dlink.daemon.pool.DefaultThreadPool; import com.dlink.daemon.pool.DefaultThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List; import java.util.List;
public class DaemonFactory { public class DaemonFactory {
private static final Logger log = LoggerFactory.getLogger(DaemonFactory.class);
public static void start(List<DaemonTaskConfig> configList){ public static void start(List<DaemonTaskConfig> configList){
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
...@@ -30,7 +28,6 @@ public class DaemonFactory { ...@@ -30,7 +28,6 @@ public class DaemonFactory {
}else if(defaultThreadPool.getWorkCount() > num) { }else if(defaultThreadPool.getWorkCount() > num) {
defaultThreadPool.removeWorker(defaultThreadPool.getWorkCount() - num); defaultThreadPool.removeWorker(defaultThreadPool.getWorkCount() - num);
} }
log.info(" >>> taskSize:" + taskSize + " workCount: "+ defaultThreadPool.getWorkCount());
} }
}); });
thread.start(); thread.start();
......
...@@ -21,11 +21,11 @@ public interface DaemonTask { ...@@ -21,11 +21,11 @@ public interface DaemonTask {
} }
static DaemonTask build(DaemonTaskConfig config) { static DaemonTask build(DaemonTaskConfig config) {
Optional<DaemonTask> optionalDriver = DaemonTask.get(config); Optional<DaemonTask> optionalDaemonTask = DaemonTask.get(config);
if (!optionalDriver.isPresent()) { if (!optionalDaemonTask.isPresent()) {
throw new DaemonTaskException("不支持线程任务类型【" + config.getType() + "】"); throw new DaemonTaskException("不支持线程任务类型【" + config.getType() + "】");
} }
DaemonTask daemonTask = optionalDriver.get(); DaemonTask daemonTask = optionalDaemonTask.get();
return daemonTask; return daemonTask;
} }
......
...@@ -106,7 +106,7 @@ const DingTalkForm: React.FC<AlertInstanceFormProps> = (props) => { ...@@ -106,7 +106,7 @@ const DingTalkForm: React.FC<AlertInstanceFormProps> = (props) => {
</Form.Item></>:undefined </Form.Item></>:undefined
} }
<Form.Item <Form.Item
name="IsAtAll" name="isAtAll"
label="@所有人"> label="@所有人">
<Switch checkedChildren="启用" unCheckedChildren="禁用" <Switch checkedChildren="启用" unCheckedChildren="禁用"
defaultChecked={formVals.IsAtAll}/> defaultChecked={formVals.IsAtAll}/>
......
...@@ -701,6 +701,12 @@ export default (): React.ReactNode => { ...@@ -701,6 +701,12 @@ export default (): React.ReactNode => {
<li> <li>
<Link>新增 运维中心任务监控的智能重启和报警推送</Link> <Link>新增 运维中心任务监控的智能重启和报警推送</Link>
</li> </li>
<li>
<Link>新增 实时监控 Flink Job</Link>
</li>
<li>
<Link>新增 实时自动告警</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
...@@ -18,14 +18,13 @@ ...@@ -18,14 +18,13 @@
<module>dlink-connectors</module> <module>dlink-connectors</module>
<module>dlink-executor</module> <module>dlink-executor</module>
<module>dlink-extends</module> <module>dlink-extends</module>
<module>dlink-alert</module>
<module>dlink-daemon</module>
<module>dlink-core</module> <module>dlink-core</module>
<module>dlink-app</module> <module>dlink-app</module>
<module>dlink-web</module> <module>dlink-web</module>
<module>dlink-admin</module> <module>dlink-admin</module>
<module>dlink-assembly</module> <module>dlink-assembly</module>
<module>dlink-alert</module>
<module>dlink-daemon</module>
</modules> </modules>
<properties> <properties>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment