Commit 24edc060 authored by wenmo's avatar wenmo

智能重启和报警推送

parent 983d8f1a
......@@ -88,9 +88,10 @@ Dinky(原 Dlink):
| 运维中心 | 主页 | 新增 任务实例列表 | 0.6.0 |
| | 作业监控 | 新增 作业总览 | 0.6.0 |
| | | 新增 FlinkWebUI 跳转 | 0.6.0 |
| | | 新增 智能重启 | dev |
| | | 新增 智能停止 | 0.6.0 |
| | | 新增 智能重启(重新上线) | 0.6.0 |
| | | 新增 智能停止(下线) | 0.6.0 |
| | | 新增 SavePoint 多种操作 | 0.6.0 |
| | | 新增 报警推送 | 0.6.0 |
| | | 新增 配置信息 | 0.6.0 |
| | | 新增 集群信息 | dev |
| | | 新增 作业快照 | dev |
......
......@@ -121,6 +121,10 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-base</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-alert-base</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId>
......@@ -129,6 +133,11 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
</dependency>
<!-- <dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-alert-dingtalk</artifactId>
<version>0.6.0-SNAPSHOT</version>
</dependency>-->
<!--<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-mysql</artifactId>
......
......@@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
......@@ -81,4 +82,12 @@ public class AlertGroupController {
alertGroup = alertGroupService.getById(alertGroup.getId());
return Result.succeed(alertGroup,"获取成功");
}
/**
* 获取可用的报警组
*/
@GetMapping("/listEnabledAll")
public Result listEnabledAll() {
return Result.succeed(alertGroupService.listEnabledAll(),"获取成功");
}
}
......@@ -5,6 +5,7 @@ import com.dlink.common.result.Result;
import com.dlink.model.Jar;
import com.dlink.model.JobInstance;
import com.dlink.service.JobInstanceService;
import com.dlink.service.TaskService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -25,6 +26,8 @@ import java.util.List;
public class JobInstanceController {
@Autowired
private JobInstanceService jobInstanceService;
@Autowired
private TaskService taskService;
/**
* 动态查询列表
......@@ -87,6 +90,6 @@ public class JobInstanceController {
*/
@GetMapping("/refreshJobInfoDetail")
public Result refreshJobInfoDetail(@RequestParam Integer id) {
return Result.succeed(jobInstanceService.refreshJobInfoDetail(id), "刷新成功");
return Result.succeed(taskService.refreshJobInfoDetail(id), "刷新成功");
}
}
......@@ -169,5 +169,13 @@ public class TaskController {
public Result recoveryTask(@RequestParam Integer id) {
return Result.succeed(taskService.recoveryTask(id),"操作成功");
}
/**
* 重启任务
*/
@GetMapping(value = "/restartTask")
public Result restartTask(@RequestParam Integer id) {
return Result.succeed(taskService.restartByTaskId(id),"操作成功");
}
}
......@@ -5,9 +5,12 @@ import com.dlink.context.SpringContextUtils;
import com.dlink.model.*;
import com.dlink.service.*;
import com.dlink.utils.JSONUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* Job2MysqlHandler
......@@ -24,6 +27,7 @@ public class Job2MysqlHandler implements JobHandler {
private static JarService jarService;
private static JobInstanceService jobInstanceService;
private static JobHistoryService jobHistoryService;
private static TaskService taskService;
static {
historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class);
......@@ -32,6 +36,7 @@ public class Job2MysqlHandler implements JobHandler {
jarService = SpringContextUtils.getBean("jarServiceImpl", JarService.class);
jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class);
jobHistoryService = SpringContextUtils.getBean("jobHistoryServiceImpl", JobHistoryService.class);
taskService = SpringContextUtils.getBean("taskServiceImpl", TaskService.class);
}
@Override
......@@ -113,12 +118,17 @@ public class Job2MysqlHandler implements JobHandler {
jobInstance.setJid(jid);
jobInstance.setStatus(JobStatus.INITIALIZING.getValue());
jobInstanceService.save(jobInstance);
Task task = new Task();
task.setId(jobInstance.getTaskId());
task.setJobInstanceId(jobInstance.getId());
taskService.updateById(task);
JobHistory jobHistory = new JobHistory();
jobHistory.setId(jobInstance.getId());
jobHistory.setJarJson(JSONUtil.toJsonString(jar));
jobHistory.setClusterJson(JSONUtil.toJsonString(cluster));
jobHistory.setClusterConfigurationJson(JSONUtil.toJsonString(clusterConfiguration));
jobHistoryService.save(jobHistory);
break;
}
}
return true;
......
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.db.model.SuperEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.List;
/**
* AlertGroup
*
......@@ -21,4 +24,7 @@ public class AlertGroup extends SuperEntity {
private String alertInstanceIds;
private String note;
@TableField(exist = false)
private List<AlertInstance> instances;
}
......@@ -27,9 +27,9 @@ public class AlertHistory implements Serializable {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
private String alertGroupId;
private Integer alertGroupId;
private String jobInstanceId;
private Integer jobInstanceId;
private String title;
......
......@@ -60,12 +60,16 @@ public class Task extends SuperEntity {
private Integer envId;
private Integer alertGroupId;
private String configJson;
private String note;
private Integer step;
private Integer jobInstanceId;
@TableField(exist = false)
private String statement;
......
......@@ -3,6 +3,8 @@ package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.AlertGroup;
import java.util.List;
/**
* AlertGroupService
*
......@@ -10,4 +12,8 @@ import com.dlink.model.AlertGroup;
* @since 2022/2/24 20:00
**/
public interface AlertGroupService extends ISuperService<AlertGroup> {
List<AlertGroup> listEnabledAll();
AlertGroup getAlertGroupInfo(Integer id);
}
......@@ -19,7 +19,4 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance);
JobInstance refreshJobInstance(Integer id);
JobInfoDetail refreshJobInfoDetail(Integer id);
}
......@@ -3,6 +3,8 @@ package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.job.JobResult;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.Task;
import java.util.List;
......@@ -17,6 +19,8 @@ public interface TaskService extends ISuperService<Task> {
JobResult submitByTaskId(Integer id);
JobResult restartByTaskId(Integer id);
Task getTaskInfoById(Integer id);
boolean saveOrUpdateTask(Task task);
......@@ -38,4 +42,10 @@ public interface TaskService extends ISuperService<Task> {
boolean cancelTask(Integer id);
boolean recoveryTask(Integer id);
boolean savepointTask(Integer taskId,String savePointType);
JobInstance refreshJobInstance(Integer id);
JobInfoDetail refreshJobInfoDetail(Integer id);
}
package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.AlertGroupMapper;
import com.dlink.model.AlertGroup;
import com.dlink.model.AlertInstance;
import com.dlink.service.AlertGroupService;
import com.dlink.service.AlertInstanceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* AlertGroupServiceImpl
*
......@@ -14,4 +22,30 @@ import org.springframework.stereotype.Service;
**/
@Service
public class AlertGroupServiceImpl extends SuperServiceImpl<AlertGroupMapper, AlertGroup> implements AlertGroupService {
@Autowired
private AlertInstanceService alertInstanceService;
@Override
public List<AlertGroup> listEnabledAll() {
return list(new QueryWrapper<AlertGroup>().eq("enabled",1));
}
@Override
public AlertGroup getAlertGroupInfo(Integer id) {
AlertGroup alertGroup = getById(id);
if(Asserts.isNull(alertGroup)||Asserts.isNullString(alertGroup.getAlertInstanceIds())){
return alertGroup;
}
String[] alertInstanceIds = alertGroup.getAlertInstanceIds().split(",");
List<AlertInstance> alertInstanceList = new ArrayList<>();
for(String alertInstanceId: alertInstanceIds){
if(Asserts.isNullString(alertInstanceId)||alertInstanceId.equals("0")){
continue;
}
alertInstanceList.add(alertInstanceService.getById(Integer.valueOf(alertInstanceId)));
}
alertGroup.setInstances(alertInstanceList);
return alertGroup;
}
}
......@@ -33,8 +33,6 @@ import java.util.List;
@Service
public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, JobInstance> implements JobInstanceService {
@Autowired
private TaskService taskService;
@Autowired
private HistoryService historyService;
@Autowired
......@@ -115,29 +113,4 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
return jobInfoDetail;
}
@Override
public JobInstance refreshJobInstance(Integer id) {
JobInstance jobInstance = getById(id);
Asserts.checkNull(jobInstance, "该任务实例不存在");
if(JobStatus.isDone(jobInstance.getStatus())){
return jobInstance;
}
Cluster cluster = clusterService.getById(jobInstance.getClusterId());
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, cluster.getJobManagerHost(), jobInstance.getJid());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
if(jobHistory.getJob().has(FlinkRestResultConstant.ERRORS)){
jobInstance.setStatus(JobStatus.UNKNOWN.getValue());
updateById(jobInstance);
return jobInstance;
}
jobInstance.setDuration(jobHistory.getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong()/1000);
jobInstance.setStatus(jobHistory.getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
updateById(jobInstance);
return jobInstance;
}
@Override
public JobInfoDetail refreshJobInfoDetail(Integer id) {
return getJobInfoDetailInfo(refreshJobInstance(id));
}
}
package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.Alert;
import com.dlink.alert.AlertConfig;
import com.dlink.alert.AlertResult;
import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts;
import com.dlink.assertion.Tips;
import com.dlink.config.Dialect;
import com.dlink.constant.FlinkRestResultConstant;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.dto.SqlDTO;
import com.dlink.exception.BusException;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.mapper.JobInstanceMapper;
import com.dlink.mapper.TaskMapper;
import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.result.JdbcSelectResult;
import com.dlink.model.*;
import com.dlink.service.*;
import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
......@@ -48,6 +58,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private JarService jarService;
@Autowired
private DataBaseService dataBaseService;
@Autowired
private JobInstanceService jobInstanceService;
@Autowired
private JobHistoryService jobHistoryService;
@Autowired
private AlertGroupService alertGroupService;
@Autowired
private AlertHistoryService alertHistoryService;
@Value("${spring.datasource.driver-class-name}")
private String driver;
......@@ -79,6 +97,27 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
}
@Override
public JobResult restartByTaskId(Integer id) {
Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if(Asserts.isNotNull(task.getJobInstanceId())||task.getJobInstanceId()!=0){
savepointTask(task, SavePointType.CANCEL.getValue());
}
if (Dialect.isSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
}
task.setSavePointStrategy(SavePointStrategy.LATEST.getValue());
JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config);
if (!config.isJarTask()) {
return jobManager.executeSql(task.getStatement());
} else {
return jobManager.executeJar();
}
}
private JobResult executeCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement());
......@@ -274,6 +313,49 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return false;
}
private boolean savepointTask(Task task, String savePointType) {
Asserts.checkNotNull(task, "该任务不存在");
Cluster cluster = clusterService.getById(task.getClusterId());
Asserts.checkNotNull(cluster, "该集群不存在");
Asserts.checkNotNull(task.getJobInstanceId(), "无任务需要SavePoint");
JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId());
Asserts.checkNotNull(jobInstance, "任务实例不存在");
String jobId = jobInstance.getJid();
boolean useGateway = false;
JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost());
jobConfig.setType(cluster.getType());
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
useGateway = true;
}
jobConfig.setTaskId(task.getId());
JobManager jobManager = JobManager.build(jobConfig);
jobManager.setUseGateway(useGateway);
SavePointResult savePointResult = jobManager.savepoint(jobId, savePointType, null);
if (Asserts.isNotNull(savePointResult)) {
for (JobInfo item : savePointResult.getJobInfos()) {
if (Asserts.isEqualsIgnoreCase(jobId, item.getJobId())&&Asserts.isNotNull(jobConfig.getTaskId())) {
Savepoints savepoints = new Savepoints();
savepoints.setName(savePointType);
savepoints.setType(savePointType);
savepoints.setPath(item.getSavePoint());
savepoints.setTaskId(jobConfig.getTaskId());
savepointsService.save(savepoints);
}
}
return true;
}
return false;
}
@Override
public boolean savepointTask(Integer taskId, String savePointType) {
return savepointTask(getById(taskId),savePointType);
}
private JobConfig buildJobConfig(Task task) {
boolean isJarTask = Dialect.FLINKJAR.equalsVal(task.getDialect());
if (!isJarTask && task.isFragment()) {
......@@ -331,4 +413,64 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
return config;
}
@Override
public JobInstance refreshJobInstance(Integer id) {
JobInstance jobInstance = jobInstanceService.getById(id);
Asserts.checkNull(jobInstance, "该任务实例不存在");
if(JobStatus.isDone(jobInstance.getStatus())){
return jobInstance;
}
Cluster cluster = clusterService.getById(jobInstance.getClusterId());
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, cluster.getJobManagerHost(), jobInstance.getJid());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
if(jobHistory.getJob().has(FlinkRestResultConstant.ERRORS)){
jobInstance.setStatus(JobStatus.UNKNOWN.getValue());
}else{
jobInstance.setDuration(jobHistory.getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong()/1000);
jobInstance.setStatus(jobHistory.getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
}
jobInstanceService.updateById(jobInstance);
if(JobStatus.isDone(jobInstance.getStatus())){
handleJobDone(jobInstance);
}
return jobInstance;
}
@Override
public JobInfoDetail refreshJobInfoDetail(Integer id) {
return jobInstanceService.getJobInfoDetailInfo(refreshJobInstance(id));
}
private void handleJobDone(JobInstance jobInstance){
Task task = new Task();
task.setId(jobInstance.getTaskId());
task.setJobInstanceId(0);
updateById(task);
task = getTaskInfoById(jobInstance.getTaskId());
if(Asserts.isNotNull(task.getAlertGroupId())){
AlertGroup alertGroup = alertGroupService.getAlertGroupInfo(task.getAlertGroupId());
if(Asserts.isNotNull(alertGroup)){
for(AlertInstance alertInstance: alertGroup.getInstances()){
sendAlert(alertInstance,jobInstance,task);
}
}
}
}
private void sendAlert(AlertInstance alertInstance,JobInstance jobInstance,Task task){
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.build(alertConfig);
String title = "任务【"+task.getAlias()+"】:"+jobInstance.getStatus();
String content = "jid【"+jobInstance.getJid()+"】于【"+jobInstance.getUpdateTime().toString()+"】时【"+jobInstance.getStatus()+"】!";
AlertResult alertResult = alert.send(title, content);
AlertHistory alertHistory = new AlertHistory();
alertHistory.setAlertGroupId(task.getAlertGroupId());
alertHistory.setJobInstanceId(jobInstance.getId());
alertHistory.setTitle(title);
alertHistory.setContent(content);
alertHistory.setStatus(alertResult.getSuccessCode());
alertHistory.setLog(alertResult.getMessage());
alertHistoryService.save(alertHistory);
}
}
......@@ -7,23 +7,27 @@ package com.dlink.alert;
* @since 2022/2/23 20:20
**/
public class AlertResult {
private String status;
private boolean success;
private String message;
public AlertResult() {
}
public AlertResult(String status, String message) {
this.status = status;
public AlertResult(boolean success, String message) {
this.success = success;
this.message = message;
}
public String getStatus() {
return status;
public boolean getSuccess() {
return success;
}
public void setStatus(String status) {
this.status = status;
public Integer getSuccessCode() {
return success?1:0;
}
public void setSuccess(boolean success) {
this.success = success;
}
public String getMessage() {
......
......@@ -78,7 +78,7 @@ public class DingTalkSender {
} catch (Exception e) {
logger.info("send ding talk alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
alertResult.setMessage("send ding talk alert fail.");
}
return alertResult;
......@@ -202,7 +202,7 @@ public class DingTalkSender {
private AlertResult checkMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
if (null == result) {
alertResult.setMessage("send ding talk msg error");
......@@ -216,7 +216,7 @@ public class DingTalkSender {
return alertResult;
}
if (response.getErrcode() == 0) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage("send ding talk msg success");
return alertResult;
}
......
......@@ -38,6 +38,6 @@ public class DingTalkTest {
AlertConfig config = AlertConfig.build("test", "DingTalk", DingTalkTest.config);
Alert alert = Alert.build(config);
AlertResult result = alert.send("hello word", "UTF-8");
Assert.assertEquals("false", result.getStatus());
Assert.assertEquals(false, result.getSuccess());
}
}
......@@ -75,7 +75,7 @@ public class WeChatSender {
.replace(SHOW_TYPE_REGEX,showType);
if (Asserts.isNullString(weChatToken)) {
alertResult.setMessage("send we chat alert fail,get weChat token error");
alertResult.setStatus(ALERT_STATUS);
alertResult.setSuccess(false);
return alertResult;
}
String enterpriseWeChatPushUrlReplace = "";
......@@ -89,7 +89,7 @@ public class WeChatSender {
} catch (Exception e) {
logger.info("send we chat alert msg exception : {}", e.getMessage());
alertResult.setMessage("send we chat alert fail");
alertResult.setStatus(ALERT_STATUS);
alertResult.setSuccess(false);
}
return alertResult;
}
......@@ -220,7 +220,7 @@ public class WeChatSender {
private static AlertResult checkWeChatSendMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus(ALERT_STATUS);
alertResult.setSuccess(false);
if (null == result) {
alertResult.setMessage("we chat send fail");
logger.info("send we chat msg error,resp is null");
......@@ -233,11 +233,11 @@ public class WeChatSender {
return alertResult;
}
if (sendMsgResponse.getErrcode() == 0) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage("we chat alert send success");
return alertResult;
}
alertResult.setStatus(ALERT_STATUS);
alertResult.setSuccess(false);
alertResult.setMessage(sendMsgResponse.getErrmsg());
return alertResult;
}
......
......@@ -79,7 +79,7 @@ public class WeChatSenderTest {
public void testSendWeChatTableMsg() {
WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.send("TABLE-TEST", contentTest);
Assert.assertEquals("true", alertResult.getStatus());
Assert.assertEquals(true, alertResult.getSuccess());
}
@Test
......@@ -87,6 +87,6 @@ public class WeChatSenderTest {
weChatConfig.put(WeChatConstants.SHOW_TYPE,ShowType.TEXT.getValue());
WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.send("TEXT-TEST", contentTest);
Assert.assertEquals("true", alertResult.getStatus());
Assert.assertEquals(true, alertResult.getSuccess());
}
}
......@@ -150,6 +150,22 @@
<include>dlink-metadata-doris-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-alert/dlink-alert-dingtalk/target
</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>dlink-alert-dingtalk-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-alert/dlink-alert-wechat/target
</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>dlink-alert-wechat-${project.version}.jar</include>
</includes>
</fileSet>
<!-- 将模块dlink-extends的常用jar文件放到打包目录/plugins下 -->
<!--<fileSet>
<directory>${project.parent.basedir}/dlink-extends/target
......
......@@ -95,9 +95,11 @@ public enum JobStatus {
case FAILED:
case CANCELED:
case FINISHED:
case UNKNOWN:
return true;
default:
return false;
}
}
}
......@@ -261,9 +261,11 @@ CREATE TABLE `dlink_task` (
`database_id` int(11) NULL DEFAULT NULL COMMENT '数据源ID',
`jar_id` int(11) NULL DEFAULT NULL COMMENT 'jarID',
`env_id` int(11) NULL DEFAULT NULL COMMENT '环境ID',
`alert_group_id` int(11) NULL DEFAULT NULL COMMENT '报警组ID',
`config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON',
`note` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '注释',
`step` int(11) NULL DEFAULT NULL COMMENT '作业生命周期',
`job_instance_id` int(11) NULL DEFAULT NULL COMMENT '作业实例ID',
`enabled` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否启用',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
......
......@@ -618,10 +618,16 @@ create table dlink_job_history
comment 'Job历史详情';
-- ----------------------------
-- 0.4.0 2021-03-02
-- 0.6.0-SNAPSHOT 2021-03-02
-- ----------------------------
ALTER TABLE `dlink_history`
CHANGE COLUMN `config` `config_json` json NULL COMMENT '配置JSON' AFTER `result`;
-- ----------------------------
-- 0.6.0-SNAPSHOT 2022-03-04
-- ----------------------------
ALTER TABLE `dlink_task`
ADD COLUMN `job_instance_id` BIGINT NULL COMMENT '任务实例ID' AFTER `step`;
ALTER TABLE `dlink_task`
ADD COLUMN `alert_group_id` BIGINT NULL COMMENT '报警组ID' AFTER `env_id`;
SET FOREIGN_KEY_CHECKS = 1;
......@@ -25,7 +25,7 @@ const StudioProcess = (props: any) => {
cancelText: '取消',
onOk: async () => {
if (!clusterId) return;
let res = savepointJob(clusterId, currentItem.jid,key,key);
let res = savepointJob(clusterId, currentItem.jid,key,key,0);
res.then((result) => {
if (result.datas == true) {
message.success(key+"成功");
......
......@@ -174,6 +174,16 @@ export function showAlertInstance(dispatch: any) {
});
});
}
/*--- 刷新 报警组 ---*/
export function showAlertGroup(dispatch: any) {
const res = getData('api/alertGroup/listEnabledAll');
res.then((result) => {
result.datas && dispatch && dispatch({
type: "Alert/saveGroup",
payload: result.datas,
});
});
}
/*--- 刷新 元数据表 ---*/
export function showMetaDataTable(id:number) {
return getData('api/database/getSchemasAndTables',{id:id});
......
......@@ -8,13 +8,14 @@ import {showTables} from "@/components/Studio/StudioEvent/DDL";
import {JarStateType} from "@/pages/Jar/model";
import {Scrollbars} from "react-custom-scrollbars";
import {RUN_MODE} from "@/components/Studio/conf";
import {AlertStateType} from "@/pages/AlertInstance/model";
const {Option} = Select;
const {Text} = Typography;
const StudioSetting = (props: any) => {
const {sessionCluster, clusterConfiguration, current, form, dispatch, tabs, currentSession, env, toolHeight} = props;
const {sessionCluster, clusterConfiguration, current, form, dispatch, tabs, currentSession, env,group, toolHeight} = props;
const getClusterOptions = () => {
const itemList = [];
......@@ -52,6 +53,18 @@ const StudioSetting = (props: any) => {
return itemList;
};
const getGroupOptions = () => {
const itemList = [<Option key={0} value={0} label='禁用'>
禁用
</Option>];
for (const item of group) {
itemList.push(<Option key={item.id} value={item.id} label={item.name}>
{item.name}
</Option>)
}
return itemList;
};
useEffect(() => {
form.setFieldsValue(current.task);
}, [current.task]);
......@@ -239,6 +252,21 @@ const StudioSetting = (props: any) => {
<Input placeholder="hdfs://..."/>
</Form.Item>) : ''
}
<Row>
<Col span={24}>
<Form.Item label="报警组" tooltip={`选择报警组`} name="alertGroupId"
className={styles.form_item}>
<Select
style={{width: '100%'}}
placeholder="选择报警组"
optionLabelProp="label"
defaultValue={0}
>
{getGroupOptions()}
</Select>
</Form.Item>
</Col>
</Row>
<Form.Item
label="其他配置" className={styles.form_item}
tooltip={{title: '其他配置项,将被应用于执行环境,如 pipeline.name', icon: <InfoCircleOutlined/>}}
......@@ -282,7 +310,7 @@ const StudioSetting = (props: any) => {
);
};
export default connect(({Studio, Jar}: { Studio: StateType, Jar: JarStateType }) => ({
export default connect(({Studio, Jar, Alert}: { Studio: StateType, Jar: JarStateType , Alert: AlertStateType }) => ({
sessionCluster: Studio.sessionCluster,
clusterConfiguration: Studio.clusterConfiguration,
current: Studio.current,
......@@ -292,4 +320,5 @@ export default connect(({Studio, Jar}: { Studio: StateType, Jar: JarStateType })
toolHeight: Studio.toolHeight,
jars: Jar.jars,
env: Studio.env,
group: Alert.group,
}))(StudioSetting);
......@@ -12,7 +12,7 @@ import StudioLeftTool from "./StudioLeftTool";
import StudioRightTool from "./StudioRightTool";
import {
listSession, showCluster, showDataBase, getFillAllByVersion,
showClusterConfiguration, showSessionCluster, showJars, showEnv,showAlertInstance
showClusterConfiguration, showSessionCluster, showJars, showEnv, showAlertInstance, showAlertGroup
} from "@/components/Studio/StudioEvent/DDL";
import {loadSettings} from "@/pages/Settings/function";
import DraggleLayout from "@/components/DraggleLayout";
......@@ -60,6 +60,7 @@ const Studio = (props: any) => {
listSession(dispatch);
showJars(dispatch);
showAlertInstance(dispatch);
showAlertGroup(dispatch);
showEnv(dispatch);
onResize();
}, []);
......
import {Effect, Reducer} from "umi";
import {AlertInstanceTableListItem} from "@/pages/AlertInstance/data";
import {AlertGroupTableListItem} from "@/pages/AlertGroup/data";
export type AlertStateType = {
instance:AlertInstanceTableListItem[],
group:AlertGroupTableListItem[]
};
export type AlertModelType = {
......@@ -12,6 +14,7 @@ export type AlertModelType = {
};
reducers: {
saveInstance: Reducer<AlertStateType>;
saveGroup: Reducer<AlertStateType>;
};
};
......@@ -19,6 +22,7 @@ const AlertModel: AlertModelType = {
namespace: 'Alert',
state: {
instance:[],
group:[],
},
effects: {
......@@ -32,7 +36,12 @@ const AlertModel: AlertModelType = {
instance: payload,
};
},
saveGroup(state, {payload}) {
return {
...state,
group: payload,
};
},
},
};
......
......@@ -107,9 +107,9 @@ const JobInfo = (props: any) => {
FlinkWebUI
</Link></Button>);
}
buttons.push(<Button key="autorestart" type="primary">智能重启</Button>);
buttons.push(<Button key="autorestart" type="primary">重新上线</Button>);
if(!isStatusDone(job?.instance?.status as string)){
buttons.push(<Button key="autostop" type="primary" danger onClick={()=>{handleSavepoint('cancel')}}>智能停止</Button>);
buttons.push(<Button key="autostop" type="primary" danger onClick={()=>{handleSavepoint('cancel')}}>下线</Button>);
buttons.push(<Dropdown
key="dropdown"
trigger={['click']}
......
import {Typography, Divider, Badge, Empty,Tag} from "antd";
import ProCard, { StatisticCard } from '@ant-design/pro-card';
import type { StatisticProps } from '@ant-design/pro-card';
import JobInstanceTable from "./JobInstanceTable";
......@@ -11,11 +10,10 @@ const { Statistic } = StatisticCard;
const DevOps = (props:any) => {
// const {current} = props;
const statusCountDefault = [
{ key: '', title: '全部', value: 0, total: true },
{ key: JOB_STATUS.CREATED, status: 'default', title: '已创建', value: 0 },
{ key: JOB_STATUS.INITIALIZING, status: 'default', title: '初始化', value: 0 },
{ key: JOB_STATUS.CREATED, status: 'default', title: '创建中', value: 0 },
{ key: JOB_STATUS.RUNNING, status: 'success', title: '运行中', value: 0 },
{ key: JOB_STATUS.FINISHED, status: 'processing', title: '已完成', value: 0 },
{ key: JOB_STATUS.FAILING, status: 'error', title: '异常中', value: 0 },
......@@ -35,8 +33,8 @@ const DevOps = (props:any) => {
const statusCountData: StatusCount = result.datas;
const items: any = [
{ key: '', title: '全部', value: statusCountData.all, total: true },
{ key: JOB_STATUS.CREATED, status: 'default', title: '已创建', value: statusCountData.created },
{ key: JOB_STATUS.INITIALIZING, status: 'default', title: '初始化', value: statusCountData.initializing },
{ key: JOB_STATUS.CREATED, status: 'default', title: '创建中', value: statusCountData.created },
{ key: JOB_STATUS.RUNNING, status: 'success', title: '运行中', value: statusCountData.running },
{ key: JOB_STATUS.FINISHED, status: 'processing', title: '已完成', value: statusCountData.finished },
{ key: JOB_STATUS.FAILING, status: 'error', title: '异常中', value: statusCountData.failing },
......
......@@ -698,6 +698,9 @@ export default (): React.ReactNode => {
<li>
<Link>新增 运维中心任务监控的配置信息</Link>
</li>
<li>
<Link>新增 运维中心任务监控的智能重启和报警推送</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment