Commit e99d6977 authored by walkhan's avatar walkhan

Merge branch 'dev' of https://github.com/DataLinkDC/dlink into dev

parents f416f4ad 54340813
...@@ -87,6 +87,8 @@ Dinky(原 Dlink): ...@@ -87,6 +87,8 @@ Dinky(原 Dlink):
| | 归档 | 新增 执行与提交历史 | 0.4.0 | | | 归档 | 新增 执行与提交历史 | 0.4.0 |
| 运维中心 | 主页 | 新增 任务实例列表 | 0.6.0 | | 运维中心 | 主页 | 新增 任务实例列表 | 0.6.0 |
| | 作业监控 | 新增 作业总览 | 0.6.0 | | | 作业监控 | 新增 作业总览 | 0.6.0 |
| | | 新增 实时监控 Flink Job | 0.6.0 |
| | | 新增 自动告警 | 0.6.0 |
| | | 新增 FlinkWebUI 跳转 | 0.6.0 | | | | 新增 FlinkWebUI 跳转 | 0.6.0 |
| | | 新增 智能重启(重新上线) | 0.6.0 | | | | 新增 智能重启(重新上线) | 0.6.0 |
| | | 新增 智能停止(下线) | 0.6.0 | | | | 新增 智能停止(下线) | 0.6.0 |
......
...@@ -125,6 +125,10 @@ ...@@ -125,6 +125,10 @@
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-alert-base</artifactId> <artifactId>dlink-alert-base</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-daemon</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId> <artifactId>dlink-executor</artifactId>
......
package com.dlink.init; package com.dlink.init;
import com.dlink.daemon.task.DaemonFactory;
import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.job.FlinkJobTask;
import com.dlink.model.JobInstance;
import com.dlink.service.JobInstanceService;
import com.dlink.service.SysConfigService; import com.dlink.service.SysConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/** /**
* SystemInit * SystemInit
* *
...@@ -17,12 +27,22 @@ import org.springframework.stereotype.Component; ...@@ -17,12 +27,22 @@ import org.springframework.stereotype.Component;
@Order(value = 1) @Order(value = 1)
public class SystemInit implements ApplicationRunner { public class SystemInit implements ApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(SystemInit.class);
@Autowired @Autowired
private SysConfigService sysConfigService; private SysConfigService sysConfigService;
@Autowired
private JobInstanceService jobInstanceService;
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
sysConfigService.initSysConfig(); sysConfigService.initSysConfig();
List<JobInstance> jobInstances = jobInstanceService.listJobInstanceActive();
List<DaemonTaskConfig> configList =new ArrayList<>();
for(JobInstance jobInstance: jobInstances){
configList.add(new DaemonTaskConfig(FlinkJobTask.TYPE,jobInstance.getId()));
}
log.info("启动的任务数量:"+ configList.size());
DaemonFactory.start(configList);
} }
} }
package com.dlink.job;
import com.dlink.context.SpringContextUtils;
import com.dlink.daemon.constant.FlinkTaskConstant;
import com.dlink.daemon.pool.DefaultThreadPool;
import com.dlink.daemon.task.DaemonTask;
import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.model.JobInstance;
import com.dlink.model.JobStatus;
import com.dlink.service.TaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
@DependsOn("springContextUtils")
public class FlinkJobTask implements DaemonTask {
private static final Logger log = LoggerFactory.getLogger(FlinkJobTask.class);
private DaemonTaskConfig config;
public final static String TYPE = "jobInstance";
private static TaskService taskService;
private long preDealTime;
static {
taskService = SpringContextUtils.getBean("taskServiceImpl", TaskService.class);
}
@Override
public DaemonTask setConfig(DaemonTaskConfig config) {
this.config = config;
return this;
}
@Override
public String getType() {
return TYPE;
}
@Override
public void dealTask() {
long gap = System.currentTimeMillis() - this.preDealTime;
if(gap < FlinkTaskConstant.TIME_SLEEP){
try {
Thread.sleep(FlinkTaskConstant.TIME_SLEEP);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
preDealTime = System.currentTimeMillis();
JobInstance jobInstance = taskService.refreshJobInstance(config.getId());
if(!JobStatus.isDone(jobInstance.getStatus())){
DefaultThreadPool.getInstance().execute(this);
}
}
}
...@@ -2,15 +2,14 @@ package com.dlink.job; ...@@ -2,15 +2,14 @@ package com.dlink.job;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.context.SpringContextUtils; import com.dlink.context.SpringContextUtils;
import com.dlink.daemon.task.DaemonFactory;
import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.model.*; import com.dlink.model.*;
import com.dlink.service.*; import com.dlink.service.*;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/** /**
* Job2MysqlHandler * Job2MysqlHandler
...@@ -95,17 +94,17 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -95,17 +94,17 @@ public class Job2MysqlHandler implements JobHandler {
if (Asserts.isNotNull(cluster)) { if (Asserts.isNotNull(cluster)) {
clusterId = cluster.getId(); clusterId = cluster.getId();
} }
}else{ } else {
cluster = clusterService.getById(clusterId); cluster = clusterService.getById(clusterId);
} }
history.setClusterId(clusterId); history.setClusterId(clusterId);
historyService.updateById(history); historyService.updateById(history);
ClusterConfiguration clusterConfiguration = null; ClusterConfiguration clusterConfiguration = null;
if(Asserts.isNotNull(job.getJobConfig().getClusterConfigurationId())){ if (Asserts.isNotNull(job.getJobConfig().getClusterConfigurationId())) {
clusterConfiguration = clusterConfigurationService.getClusterConfigById(job.getJobConfig().getClusterConfigurationId()); clusterConfiguration = clusterConfigurationService.getClusterConfigById(job.getJobConfig().getClusterConfigurationId());
} }
Jar jar = null; Jar jar = null;
if(Asserts.isNotNull(job.getJobConfig().getJarId())){ if (Asserts.isNotNull(job.getJobConfig().getJarId())) {
jar = jarService.getById(job.getJobConfig().getJarId()); jar = jarService.getById(job.getJobConfig().getJarId());
} }
if (Asserts.isNotNullCollection(job.getJids())) { if (Asserts.isNotNullCollection(job.getJids())) {
...@@ -128,6 +127,7 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -128,6 +127,7 @@ public class Job2MysqlHandler implements JobHandler {
jobHistory.setClusterJson(JSONUtil.toJsonString(cluster)); jobHistory.setClusterJson(JSONUtil.toJsonString(cluster));
jobHistory.setClusterConfigurationJson(JSONUtil.toJsonString(clusterConfiguration)); jobHistory.setClusterConfigurationJson(JSONUtil.toJsonString(clusterConfiguration));
jobHistoryService.save(jobHistory); jobHistoryService.save(jobHistory);
DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId()));
break; break;
} }
} }
......
...@@ -17,4 +17,6 @@ import java.util.List; ...@@ -17,4 +17,6 @@ import java.util.List;
public interface JobInstanceMapper extends SuperMapper<JobInstance> { public interface JobInstanceMapper extends SuperMapper<JobInstance> {
List<JobInstanceCount> countStatus(); List<JobInstanceCount> countStatus();
List<JobInstance> listJobInstanceActive();
} }
...@@ -5,6 +5,8 @@ import com.dlink.model.JobInfoDetail; ...@@ -5,6 +5,8 @@ import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance; import com.dlink.model.JobInstance;
import com.dlink.model.JobInstanceStatus; import com.dlink.model.JobInstanceStatus;
import java.util.List;
/** /**
* JobInstanceService * JobInstanceService
* *
...@@ -15,6 +17,8 @@ public interface JobInstanceService extends ISuperService<JobInstance> { ...@@ -15,6 +17,8 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
JobInstanceStatus getStatusCount(); JobInstanceStatus getStatusCount();
List<JobInstance> listJobInstanceActive();
JobInfoDetail getJobInfoDetail(Integer id); JobInfoDetail getJobInfoDetail(Integer id);
JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance); JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance);
......
...@@ -65,13 +65,14 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo ...@@ -65,13 +65,14 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
@Override @Override
public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId) { public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId) {
JobHistory jobHistory = new JobHistory();
jobHistory.setId(id);
try {
JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId); JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId);
JsonNode exception = FlinkAPI.build(jobManagerHost).getException(jobId); JsonNode exception = FlinkAPI.build(jobManagerHost).getException(jobId);
JsonNode checkPoints = FlinkAPI.build(jobManagerHost).getCheckPoints(jobId); JsonNode checkPoints = FlinkAPI.build(jobManagerHost).getCheckPoints(jobId);
JsonNode checkPointsConfig = FlinkAPI.build(jobManagerHost).getCheckPointsConfig(jobId); JsonNode checkPointsConfig = FlinkAPI.build(jobManagerHost).getCheckPointsConfig(jobId);
JsonNode jobsConfig = FlinkAPI.build(jobManagerHost).getJobsConfig(jobId); JsonNode jobsConfig = FlinkAPI.build(jobManagerHost).getJobsConfig(jobId);
JobHistory jobHistory = new JobHistory();
jobHistory.setId(id);
jobHistory.setJobJson(JSONUtil.toJsonString(jobInfo)); jobHistory.setJobJson(JSONUtil.toJsonString(jobInfo));
jobHistory.setExceptionsJson(JSONUtil.toJsonString(exception)); jobHistory.setExceptionsJson(JSONUtil.toJsonString(exception));
jobHistory.setCheckpointsJson(JSONUtil.toJsonString(checkPoints)); jobHistory.setCheckpointsJson(JSONUtil.toJsonString(checkPoints));
...@@ -82,6 +83,9 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo ...@@ -82,6 +83,9 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
} else { } else {
save(jobHistory); save(jobHistory);
} }
}catch (Exception e){
}finally {
return jobHistory; return jobHistory;
} }
}
} }
package com.dlink.service.impl; package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkRestResultConstant; import com.dlink.constant.FlinkRestResultConstant;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
...@@ -92,6 +93,11 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, ...@@ -92,6 +93,11 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
return jobInstanceStatus; return jobInstanceStatus;
} }
@Override
public List<JobInstance> listJobInstanceActive() {
return baseMapper.listJobInstanceActive();
}
@Override @Override
public JobInfoDetail getJobInfoDetail(Integer id) { public JobInfoDetail getJobInfoDetail(Integer id) {
return getJobInfoDetailInfo(getById(id)); return getJobInfoDetailInfo(getById(id));
......
...@@ -3,6 +3,7 @@ package com.dlink.service.impl; ...@@ -3,6 +3,7 @@ package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.Alert; import com.dlink.alert.Alert;
import com.dlink.alert.AlertConfig; import com.dlink.alert.AlertConfig;
import com.dlink.alert.AlertMsg;
import com.dlink.alert.AlertResult; import com.dlink.alert.AlertResult;
import com.dlink.assertion.Assert; import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
...@@ -17,11 +18,9 @@ import com.dlink.gateway.config.SavePointStrategy; ...@@ -17,11 +18,9 @@ import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.gateway.config.SavePointType; import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import com.dlink.job.JobManager; import com.dlink.job.JobManager;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.mapper.JobInstanceMapper;
import com.dlink.mapper.TaskMapper; import com.dlink.mapper.TaskMapper;
import com.dlink.metadata.driver.Driver; import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.metadata.result.JdbcSelectResult;
...@@ -34,6 +33,7 @@ import org.springframework.beans.factory.annotation.Value; ...@@ -34,6 +33,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -424,7 +424,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -424,7 +424,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Cluster cluster = clusterService.getById(jobInstance.getClusterId()); Cluster cluster = clusterService.getById(jobInstance.getClusterId());
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, cluster.getJobManagerHost(), jobInstance.getJid()); JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, cluster.getJobManagerHost(), jobInstance.getJid());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson); JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
if(jobHistory.getJob().has(FlinkRestResultConstant.ERRORS)){ if(Asserts.isNull(jobHistory.getJob())||jobHistory.getJob().has(FlinkRestResultConstant.ERRORS)){
jobInstance.setStatus(JobStatus.UNKNOWN.getValue()); jobInstance.setStatus(JobStatus.UNKNOWN.getValue());
}else{ }else{
jobInstance.setDuration(jobHistory.getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong()/1000); jobInstance.setDuration(jobHistory.getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong()/1000);
...@@ -443,6 +443,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -443,6 +443,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
private void handleJobDone(JobInstance jobInstance){ private void handleJobDone(JobInstance jobInstance){
if(Asserts.isNull(jobInstance.getTaskId())){
return;
}
Task task = new Task(); Task task = new Task();
task.setId(jobInstance.getTaskId()); task.setId(jobInstance.getTaskId());
task.setJobInstanceId(0); task.setJobInstanceId(0);
...@@ -451,18 +454,27 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -451,18 +454,27 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if(Asserts.isNotNull(task.getAlertGroupId())){ if(Asserts.isNotNull(task.getAlertGroupId())){
AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId()); AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId());
if(Asserts.isNotNull(alertGroup)){ if(Asserts.isNotNull(alertGroup)){
List<AlertMsg> alertMsgList = new ArrayList<>();
AlertMsg alertMsg = new AlertMsg();
alertMsg.setType("Flink 实时监控");
alertMsg.setTime(LocalDateTime.now().toString());
alertMsg.setId(jobInstance.getId().toString());
alertMsg.setName(task.getAlias());
alertMsg.setStatus(jobInstance.getStatus());
alertMsg.setContent(jobInstance.getJid());
alertMsgList.add(alertMsg);
for(AlertInstance alertInstance: alertGroup.getInstances()){ for(AlertInstance alertInstance: alertGroup.getInstances()){
sendAlert(alertInstance,jobInstance,task); sendAlert(alertInstance,jobInstance,task,alertMsgList);
} }
} }
} }
} }
private void sendAlert(AlertInstance alertInstance,JobInstance jobInstance,Task task){ private void sendAlert(AlertInstance alertInstance,JobInstance jobInstance,Task task,List<AlertMsg> alertMsgList){
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams())); AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.build(alertConfig); Alert alert = Alert.build(alertConfig);
String title = "任务【"+task.getAlias()+"】:"+jobInstance.getStatus(); String title = "任务【"+task.getAlias()+"】:"+jobInstance.getStatus();
String content = "jid【"+jobInstance.getJid()+"】于【"+jobInstance.getUpdateTime().toString()+"】时【"+jobInstance.getStatus()+"】!"; String content = JSONUtil.toJsonString(alertMsgList);
AlertResult alertResult = alert.send(title, content); AlertResult alertResult = alert.send(title, content);
AlertHistory alertHistory = new AlertHistory(); AlertHistory alertHistory = new AlertHistory();
alertHistory.setAlertGroupId(task.getAlertGroupId()); alertHistory.setAlertGroupId(task.getAlertGroupId());
......
...@@ -49,4 +49,13 @@ ...@@ -49,4 +49,13 @@
dlink_job_instance dlink_job_instance
group by status group by status
</select> </select>
<select id="listJobInstanceActive" resultType="com.dlink.model.JobInstance">
select
*
from
dlink_job_instance
where status not in ('FAILED','CANCELED','FINISHED','UNKNOWN')
order by id desc
</select>
</mapper> </mapper>
package com.dlink.alert;
/**
* AlertMsg
*
* @author wenmo
* @since 2022/3/7 18:30
**/
public class AlertMsg {
private String type;
private String time;
private String id;
private String name;
private String status;
private String content;
public AlertMsg() {
}
public AlertMsg(String type, String time, String id, String name, String status, String content) {
this.type = type;
this.time = time;
this.id = id;
this.name = name;
this.status = status;
this.content = content;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
...@@ -10,29 +10,29 @@ public final class DingTalkConstants { ...@@ -10,29 +10,29 @@ public final class DingTalkConstants {
static final String TYPE = "DingTalk"; static final String TYPE = "DingTalk";
static final String PROXY_ENABLE = "IsEnableProxy"; static final String PROXY_ENABLE = "isEnableProxy";
static final String WEB_HOOK = "WebHook"; static final String WEB_HOOK = "webhook";
static final String KEYWORD = "Keyword"; static final String KEYWORD = "keyword";
static final String SECRET = "Secret"; static final String SECRET = "secret";
static final String MSG_TYPE = "MsgType"; static final String MSG_TYPE = "msgType";
static final String AT_MOBILES = "AtMobiles"; static final String AT_MOBILES = "atMobiles";
static final String AT_USERIDS = "AtUserIds"; static final String AT_USERIDS = "atUserIds";
static final String AT_ALL = "IsAtAll"; static final String AT_ALL = "isAtAll";
static final String PROXY = "Proxy"; static final String PROXY = "proxy";
static final String PORT = "Port"; static final String PORT = "port";
static final String USER = "User"; static final String USER = "user";
static final String PASSWORD = "Password"; static final String PASSWORD = "password";
static final String MSG_TYPE_TEXT = "text"; static final String MSG_TYPE_TEXT = "text";
......
...@@ -17,6 +17,7 @@ public class WeChatAlert extends AbstractAlert { ...@@ -17,6 +17,7 @@ public class WeChatAlert extends AbstractAlert {
@Override @Override
public AlertResult send(String title, String content) { public AlertResult send(String title, String content) {
return null; WeChatSender sender = new WeChatSender(getConfig().getParam());
return sender.send(title, content);
} }
} }
...@@ -28,7 +28,7 @@ public class WeChatConstants { ...@@ -28,7 +28,7 @@ public class WeChatConstants {
static final String TEAM_SEND_MSG = "teamSendMsg"; static final String TEAM_SEND_MSG = "teamSendMsg";
static final String USER_SEND_MSG = "userSendMsg"; static final String USER_SEND_MSG = "{\"touser\":\"{toUser}\",\"agentid\":{agentId},\"msgtype\":\"{showType}\",\"{showType}\":{\"content\":\"{msg}\"}}";
static final String AGENT_ID = "agentId"; static final String AGENT_ID = "agentId";
......
...@@ -52,7 +52,7 @@ public class WeChatSender { ...@@ -52,7 +52,7 @@ public class WeChatSender {
String weChatCorpId = config.get(WeChatConstants.CORP_ID); String weChatCorpId = config.get(WeChatConstants.CORP_ID);
String weChatSecret = config.get(WeChatConstants.SECRET); String weChatSecret = config.get(WeChatConstants.SECRET);
String weChatTokenUrl = WeChatConstants.TOKEN_URL; String weChatTokenUrl = WeChatConstants.TOKEN_URL;
weChatUserSendMsg = config.get(WeChatConstants.USER_SEND_MSG); weChatUserSendMsg = WeChatConstants.USER_SEND_MSG;
sendType = config.get(WeChatConstants.SEND_TYPE); sendType = config.get(WeChatConstants.SEND_TYPE);
showType = config.get(WeChatConstants.SHOW_TYPE); showType = config.get(WeChatConstants.SHOW_TYPE);
requireNonNull(showType, WeChatConstants.SHOW_TYPE + " must not null"); requireNonNull(showType, WeChatConstants.SHOW_TYPE + " must not null");
...@@ -107,8 +107,7 @@ public class WeChatSender { ...@@ -107,8 +107,7 @@ public class WeChatSender {
} finally { } finally {
response.close(); response.close();
} }
logger.info("Enterprise WeChat send [{}], param:{}, resp:{}", // logger.info("Enterprise WeChat send [{}], param:{}, resp:{}", url, data, resp);
url, data, resp);
return resp; return resp;
} }
} }
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>dlink-deamon</artifactId> <artifactId>dlink-daemon</artifactId>
<properties> <properties>
<maven.compiler.source>8</maven.compiler.source> <maven.compiler.source>8</maven.compiler.source>
......
package com.dlink.daemon.constant;
public interface FlinkTaskConstant {
/**
* 检测停顿时间
*/
int TIME_SLEEP = 1000;
/**
* 启动线程轮询日志时间,用于设置work等信息
*/
int MAX_POLLING_GAP = 1000;
/**
* 最小
*/
int MIN_POLLING_GAP = 50;
}
package com.dlink.daemon.entity;
import javafx.concurrent.Task;
import java.util.LinkedList;
public class TaskQueue<T> {
private final LinkedList<T> tasks = new LinkedList<>();
private final Object lock = new Object();
public void enqueue(T task) {
synchronized (lock) {
lock.notifyAll();
tasks.addLast( task );
}
}
public T dequeue() {
synchronized (lock) {
while (tasks.isEmpty()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = tasks.removeFirst();
return task;
}
}
public int getTaskSize() {
synchronized (lock) {
return tasks.size();
}
}
}
package com.dlink.daemon.entity;
import com.dlink.daemon.task.DaemonTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TaskWorker implements Runnable {
private static final Logger log = LoggerFactory.getLogger(TaskWorker.class);
private volatile boolean running = true;
private TaskQueue<DaemonTask> queue;
public TaskWorker(TaskQueue queue) {
this.queue = queue;
}
@Override
public void run() {
log.info("TaskWorker run");
while (running) {
DaemonTask daemonTask = queue.dequeue();
if (daemonTask != null) {
try {
daemonTask.dealTask();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public void shutdown() {
log.info(Thread.currentThread().getName() + "TaskWorker shutdown");
running = false;
}
}
package com.dlink.daemon.exception;
public class DaemonTaskException extends RuntimeException {
public DaemonTaskException(String message, Throwable cause) {
super(message, cause);
}
public DaemonTaskException(String message) {
super(message);
}
}
package com.dlink.daemon.pool;
import com.dlink.daemon.entity.TaskQueue;
import com.dlink.daemon.entity.TaskWorker;
import com.dlink.daemon.task.DaemonTask;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author lcg
* @operate
* @date 2022/3/7 10:36
* @return
*/
public class DefaultThreadPool implements ThreadPool {
private static final int MAX_WORKER_NUM = 10;
private static final int DEFAULT_WORKER_NUM = 5;
private static final int MIN_WORKER_NUM = 1;
private final List<TaskWorker> workers = Collections.synchronizedList(new ArrayList<>());
private final Object lock = new Object();
private volatile AtomicInteger workerNum = new AtomicInteger(0);
private final TaskQueue<DaemonTask> queue = new TaskQueue<>();
private static DefaultThreadPool defaultThreadPool;
private DefaultThreadPool() {
addWorkers(DEFAULT_WORKER_NUM);
}
public static DefaultThreadPool getInstance() {
if (defaultThreadPool == null) {
synchronized (DefaultThreadPool.class) {
if(defaultThreadPool == null){
defaultThreadPool = new DefaultThreadPool();
}
}
}
return defaultThreadPool;
}
@Override
public void execute(DaemonTask daemonTask) {
if (daemonTask != null) {
queue.enqueue(daemonTask);
}
}
@Override
public void addWorkers(int num) {
synchronized (lock) {
if (num + this.workerNum.get() > MAX_WORKER_NUM) {
num = MAX_WORKER_NUM - this.workerNum.get();
if (num <= 0) return;
}
for (int i = 0; i < num; i++) {
TaskWorker worker = new TaskWorker(queue);
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + workerNum.incrementAndGet());
thread.start();
}
}
}
@Override
public void removeWorker(int num) {
synchronized (lock) {
if (num >= this.workerNum.get()) {
num = this.workerNum.get() - MIN_WORKER_NUM;
if (num <= 0) return;
}
int count = num - 1;
while (count >= 0) {
TaskWorker worker = workers.get(count);
if (workers.remove(worker)) {
worker.shutdown();
count--;
}
}
//减少线程
workerNum.getAndAdd(-num);
}
}
@Override
public void shutdown() {
synchronized (lock) {
for (TaskWorker worker : workers) {
worker.shutdown();
}
workers.clear();
}
}
@Override
public int getTaskSize() {
return queue.getTaskSize();
}
public int getWorkCount() {
synchronized (lock) {
return this.workerNum.get();
}
}
}
package com.dlink.daemon.pool;
import com.dlink.daemon.task.DaemonTask;
/**
*
* @author lcg
* @operate
* @date 2022/3/7 10:36
* @return
*/
public interface ThreadPool{
//执行任务
void execute(DaemonTask daemonTask);
//关闭连接池
void shutdown();
//增加工作数
void addWorkers(int num);
//减少工作数
void removeWorker(int num);
int getTaskSize();
}
package com.dlink.daemon.task;
import com.dlink.daemon.constant.FlinkTaskConstant;
import com.dlink.daemon.pool.DefaultThreadPool;
import java.util.List;
public class DaemonFactory {
public static void start(List<DaemonTaskConfig> configList){
Thread thread = new Thread(() -> {
DefaultThreadPool defaultThreadPool = DefaultThreadPool.getInstance();
for (DaemonTaskConfig config: configList) {
DaemonTask daemonTask = DaemonTask.build(config);
defaultThreadPool.execute(daemonTask);
}
while (true) {
int taskSize = defaultThreadPool.getTaskSize();
try {
Thread.sleep(Math.max(FlinkTaskConstant.MAX_POLLING_GAP / (taskSize + 1), FlinkTaskConstant.MIN_POLLING_GAP));
} catch (InterruptedException e) {
e.printStackTrace();
}
int num = taskSize / 100 + 1;
if (defaultThreadPool.getWorkCount() < num) {
defaultThreadPool.addWorkers(num - defaultThreadPool.getWorkCount() );
}else if(defaultThreadPool.getWorkCount() > num) {
defaultThreadPool.removeWorker(defaultThreadPool.getWorkCount() - num);
}
}
});
thread.start();
}
public static void addTask(DaemonTaskConfig config){
DefaultThreadPool.getInstance().execute(DaemonTask.build(config));
}
}
package com.dlink.daemon.task;
import com.dlink.assertion.Asserts;
import com.dlink.daemon.exception.DaemonTaskException;
import sun.misc.Service;
import java.util.Iterator;
import java.util.Optional;
public interface DaemonTask {
static Optional<DaemonTask> get(DaemonTaskConfig config) {
Asserts.checkNotNull(config, "线程任务配置不能为空");
Iterator<DaemonTask> providers = Service.providers(DaemonTask.class);
while (providers.hasNext()) {
DaemonTask daemonTask = providers.next();
if (daemonTask.canHandle(config.getType())) {
return Optional.of(daemonTask.setConfig(config));
}
}
return Optional.empty();
}
static DaemonTask build(DaemonTaskConfig config) {
Optional<DaemonTask> optionalDaemonTask = DaemonTask.get(config);
if (!optionalDaemonTask.isPresent()) {
throw new DaemonTaskException("不支持线程任务类型【" + config.getType() + "】");
}
DaemonTask daemonTask = optionalDaemonTask.get();
return daemonTask;
}
DaemonTask setConfig(DaemonTaskConfig config);
default boolean canHandle(String type){
return Asserts.isEqualsIgnoreCase(getType(),type);
}
String getType();
void dealTask();
}
package com.dlink.daemon.task;
public class DaemonTaskConfig {
private String type;
private Integer id;
public DaemonTaskConfig() {
}
public DaemonTaskConfig(String type, Integer id) {
this.type = type;
this.id = id;
}
public static DaemonTaskConfig build(String type,Integer id){
return new DaemonTaskConfig(type,id);
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
package com.dlink.daemon.task;
import com.dlink.daemon.constant.FlinkTaskConstant;
import com.dlink.model.JobStatus;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
@Data
public class FlinkDaemonTask {
private static final Logger log = LoggerFactory.getLogger(FlinkDaemonTask.class);
public static Random random = new Random(5);
private String id;
private JobStatus status;
private long preDealTime;
private int count;
// @Override
public DaemonTask setConfig(DaemonTaskConfig config) {
return null;
}
// @Override
public String getType() {
return null;
}
// @Override
public void dealTask() {
long gap = 0;
if (this.preDealTime != 0L) {
gap = System.currentTimeMillis() - this.preDealTime;
}
preDealTime = System.currentTimeMillis();
int i = random.nextInt(10);
if(i > 5){
log.info("deal FlinkTask id:" + id + " status: finished count:"+ count + " gap:"+ gap + "ms");
}else {
log.info("deal FlinkTask id:" + id + " status: running count:" +count + " gap:"+ gap + "ms");
//加入等待下次检测
// DefaultThreadPool.getInstance().execute(this);
}
count++;
if(gap < FlinkTaskConstant.TIME_SLEEP){
try {
Thread.sleep(FlinkTaskConstant.TIME_SLEEP);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public FlinkDaemonTask() {
}
public FlinkDaemonTask(String id) {
this.id = id;
}
public FlinkDaemonTask(String id, JobStatus status) {
this.id = id;
this.status = status;
}
}
package com.dlink.deamon; package com.dlink.daemon;
/** /**
* DeamonTest * DeamonTest
...@@ -6,5 +6,5 @@ package com.dlink.deamon; ...@@ -6,5 +6,5 @@ package com.dlink.deamon;
* @author wenmo * @author wenmo
* @since 2022/3/2 23:31 * @since 2022/3/2 23:31
*/ */
public class DeamonTest { public class DaemonTest {
} }
package com.dlink.deamon;
/**
* Deamon
*
* @author wenmo
* @since 2022/3/2 23:31
*/
public interface Deamon {
}
...@@ -106,7 +106,7 @@ const DingTalkForm: React.FC<AlertInstanceFormProps> = (props) => { ...@@ -106,7 +106,7 @@ const DingTalkForm: React.FC<AlertInstanceFormProps> = (props) => {
</Form.Item></>:undefined </Form.Item></>:undefined
} }
<Form.Item <Form.Item
name="IsAtAll" name="isAtAll"
label="@所有人"> label="@所有人">
<Switch checkedChildren="启用" unCheckedChildren="禁用" <Switch checkedChildren="启用" unCheckedChildren="禁用"
defaultChecked={formVals.IsAtAll}/> defaultChecked={formVals.IsAtAll}/>
......
...@@ -701,6 +701,12 @@ export default (): React.ReactNode => { ...@@ -701,6 +701,12 @@ export default (): React.ReactNode => {
<li> <li>
<Link>新增 运维中心任务监控的智能重启和报警推送</Link> <Link>新增 运维中心任务监控的智能重启和报警推送</Link>
</li> </li>
<li>
<Link>新增 实时监控 Flink Job</Link>
</li>
<li>
<Link>新增 实时自动告警</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
...@@ -18,14 +18,13 @@ ...@@ -18,14 +18,13 @@
<module>dlink-connectors</module> <module>dlink-connectors</module>
<module>dlink-executor</module> <module>dlink-executor</module>
<module>dlink-extends</module> <module>dlink-extends</module>
<module>dlink-alert</module>
<module>dlink-daemon</module>
<module>dlink-core</module> <module>dlink-core</module>
<module>dlink-app</module> <module>dlink-app</module>
<module>dlink-web</module> <module>dlink-web</module>
<module>dlink-admin</module> <module>dlink-admin</module>
<module>dlink-assembly</module> <module>dlink-assembly</module>
<module>dlink-alert</module>
<module>dlink-deamon</module>
</modules> </modules>
<properties> <properties>
...@@ -269,7 +268,7 @@ ...@@ -269,7 +268,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-deamon</artifactId> <artifactId>dlink-daemon</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment