Commit 17b315ac authored by wenmo's avatar wenmo

新增作业生命周期管理

parent 577ab054
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.JobInstance;
import com.dlink.service.JobInstanceService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
/**
* JobInstanceController
*
* @author wenmo
* @since 2022/2/2 14:02
*/
@Slf4j
@RestController
@RequestMapping("/api/jobInstance")
public class JobInstanceController {
@Autowired
private JobInstanceService JobInstanceService;
/**
* 动态查询列表
*/
@PostMapping
public ProTableResult<JobInstance> listJobInstances(@RequestBody JsonNode para) {
return JobInstanceService.selectForProTable(para);
}
/**
* 批量删除
*/
@DeleteMapping
public Result deleteMul(@RequestBody JsonNode para) {
if (para.size()>0){
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
if(!JobInstanceService.removeById(id)){
error.add(id);
}
}
if(error.size()==0) {
return Result.succeed("删除成功");
}else {
return Result.succeed("删除部分成功,但"+error.toString()+"删除失败,共"+error.size()+"次失败。");
}
}else{
return Result.failed("请选择要删除的记录");
}
}
/**
* 获取指定ID的信息
*/
@PostMapping("/getOneById")
public Result getOneById(@RequestBody JobInstance JobInstance) throws Exception {
JobInstance = JobInstanceService.getById(JobInstance.getId());
return Result.succeed(JobInstance,"获取成功");
}
}
......@@ -121,5 +121,53 @@ public class TaskController {
public Result exportSql(@RequestParam Integer id) {
return Result.succeed(taskService.exportSql(id),"获取成功");
}
/**
* 发布任务
*/
@GetMapping(value = "/releaseTask")
public Result releaseTask(@RequestParam Integer id) {
return Result.succeed(taskService.releaseTask(id),"操作成功");
}
/**
* 维护任务
*/
@GetMapping(value = "/developTask")
public Result developTask(@RequestParam Integer id) {
return Result.succeed(taskService.developTask(id),"操作成功");
}
/**
* 上线任务
*/
@GetMapping(value = "/onLineTask")
public Result onLineTask(@RequestParam Integer id) {
return Result.succeed(taskService.onLineTask(id),"操作成功");
}
/**
* 下线任务
*/
@GetMapping(value = "/offLineTask")
public Result offLineTask(@RequestParam Integer id) {
return Result.succeed(taskService.offLineTask(id),"操作成功");
}
/**
* 注销任务
*/
@GetMapping(value = "/cancelTask")
public Result cancelTask(@RequestParam Integer id) {
return Result.succeed(taskService.cancelTask(id),"操作成功");
}
/**
* 恢复任务
*/
@GetMapping(value = "/recoveryTask")
public Result recoveryTask(@RequestParam Integer id) {
return Result.succeed(taskService.recoveryTask(id),"操作成功");
}
}
package com.dlink.exception;
import com.dlink.common.result.Result;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* WebExceptionHandler
*
* @author wenmo
* @since 2022/2/2 22:22
*/
@ControllerAdvice
@ResponseBody
public class WebExceptionHandler {
@ExceptionHandler
public Result busException(BusException e) {
return Result.failed(e.getMessage());
}
@ExceptionHandler
public Result unknownException(Exception e) {
return Result.failed("系统出现错误, 请联系网站管理员!");
}
}
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.JobInstance;
import org.apache.ibatis.annotations.Mapper;
/**
* JobInstanceMapper
*
* @author wenmo
* @since 2022/2/2 13:02
*/
@Mapper
public interface JobInstanceMapper extends SuperMapper<JobInstance> {
}
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* JobInstance
*
* @author wenmo
* @since 2022/2/1 16:46
*/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_job_instance")
public class JobInstance implements Serializable {
private static final long serialVersionUID = -3410230507904303730L;
private Integer id;
private String name;
private Integer taskId;
private Integer clusterId;
private String jid;
private Integer status;
private Integer historyId;
private String error;
private LocalDateTime createTime;
private LocalDateTime updateTime;
private LocalDateTime finishTime;
private Integer failed_restart_count;
}
......@@ -62,6 +62,8 @@ public class Task extends SuperEntity{
private String note;
private Integer step;
@TableField(exist = false)
private String statement;
......
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.JobInstance;
/**
* JobInstanceService
*
* @author wenmo
* @since 2022/2/2 13:52
*/
public interface JobInstanceService extends ISuperService<JobInstance> {
}
......@@ -26,4 +26,16 @@ public interface TaskService extends ISuperService<Task> {
String exportSql(Integer id);
Task getUDFByClassName(String className);
boolean releaseTask(Integer id);
boolean developTask(Integer id);
boolean onLineTask(Integer id);
boolean offLineTask(Integer id);
boolean cancelTask(Integer id);
boolean recoveryTask(Integer id);
}
package com.dlink.service.impl;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.JobInstanceMapper;
import com.dlink.model.JobInstance;
import com.dlink.service.JobInstanceService;
import org.springframework.stereotype.Service;
/**
* JobInstanceServiceImpl
*
* @author wenmo
* @since 2022/2/2 13:52
*/
@Service
public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, JobInstance> implements JobInstanceService {
}
......@@ -7,6 +7,7 @@ import com.dlink.assertion.Tips;
import com.dlink.config.Dialect;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.dto.SqlDTO;
import com.dlink.exception.BusException;
import com.dlink.gateway.GatewayType;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.job.JobConfig;
......@@ -65,15 +66,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
public JobResult submitByTaskId(Integer id) {
Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if(Dialect.isSql(task.getDialect())){
if (Dialect.isSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(),null));
task.getDatabaseId(), null));
}
JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config);
if(!config.isJarTask()) {
if (!config.isJarTask()) {
return jobManager.executeSql(task.getStatement());
}else{
} else {
return jobManager.executeJar();
}
}
......@@ -82,26 +83,26 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement());
result.setStartTime(LocalDateTime.now());
if(Asserts.isNull(sqlDTO.getDatabaseId())){
if (Asserts.isNull(sqlDTO.getDatabaseId())) {
result.setSuccess(false);
result.setError("请指定数据源");
result.setEndTime(LocalDateTime.now());
return result;
}else{
} else {
DataBase dataBase = dataBaseService.getById(sqlDTO.getDatabaseId());
if(Asserts.isNull(dataBase)){
if (Asserts.isNull(dataBase)) {
result.setSuccess(false);
result.setError("数据源不存在");
result.setEndTime(LocalDateTime.now());
return result;
}
Driver driver = Driver.build(dataBase.getDriverConfig()).connect();
JdbcSelectResult selectResult = driver.executeSql(sqlDTO.getStatement(),sqlDTO.getMaxRowNum());
JdbcSelectResult selectResult = driver.executeSql(sqlDTO.getStatement(), sqlDTO.getMaxRowNum());
driver.close();
result.setResult(selectResult);
if(selectResult.isSuccess()){
if (selectResult.isSuccess()) {
result.setSuccess(true);
}else{
} else {
result.setSuccess(false);
result.setError(selectResult.getError());
}
......@@ -110,8 +111,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
}
private boolean isJarTask(Task task){
return (GatewayType.YARN_APPLICATION.equalsValue(task.getType())||GatewayType.KUBERNETES_APPLICATION.equalsValue(task.getType()))&&Asserts.isNotNull(task.getJarId());
private boolean isJarTask(Task task) {
return (GatewayType.YARN_APPLICATION.equalsValue(task.getType()) || GatewayType.KUBERNETES_APPLICATION.equalsValue(task.getType())) && Asserts.isNotNull(task.getJarId());
}
@Override
......@@ -136,12 +137,22 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public boolean saveOrUpdateTask(Task task) {
if(Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.equalsVal(task.getDialect())
&& Asserts.isNotNullString(task.getStatement()) ){
// to compiler java udf
if (Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.equalsVal(task.getDialect())
&& Asserts.isNotNullString(task.getStatement())) {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement());
task.setSavePointPath(compiler.getFullClassName());
}
// if modify task else create task
if (task.getId() != null) {
Task taskInfo = getById(task.getId());
Assert.check(taskInfo);
if (JobLifeCycle.RELEASE.equalsValue(taskInfo.getStep()) ||
JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep()) ||
JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
throw new BusException("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止修改!");
}
task.setStep(JobLifeCycle.DEVELOP.getValue());
this.updateById(task);
if (task.getStatement() != null) {
Statement statement = new Statement();
......@@ -150,6 +161,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
statementService.updateById(statement);
}
} else {
task.setStep(JobLifeCycle.CREATE.getValue());
if (task.getCheckPoint() == null) {
task.setCheckPoint(0);
}
......@@ -173,21 +185,21 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public List<Task> listFlinkSQLEnv() {
return this.list(new QueryWrapper<Task>().eq("dialect", Dialect.FLINKSQLENV).eq("enabled",1));
return this.list(new QueryWrapper<Task>().eq("dialect", Dialect.FLINKSQLENV).eq("enabled", 1));
}
@Override
public String exportSql(Integer id) {
Task task = getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if(Dialect.isSql(task.getDialect())){
if (Dialect.isSql(task.getDialect())) {
return task.getStatement();
}
JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config);
if(!config.isJarTask()) {
if (!config.isJarTask()) {
return jobManager.exportSql(task.getStatement());
}else{
} else {
return "";
}
}
......@@ -200,30 +212,96 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return task;
}
private JobConfig buildJobConfig(Task task){
@Override
public boolean releaseTask(Integer id) {
Task task = getById(id);
Assert.check(task);
if (JobLifeCycle.DEVELOP.equalsValue(task.getStep())) {
task.setStep(JobLifeCycle.RELEASE.getValue());
return updateById(task);
}
return false;
}
@Override
public boolean developTask(Integer id) {
Task task = getById(id);
Assert.check(task);
if (JobLifeCycle.RELEASE.equalsValue(task.getStep())) {
task.setStep(JobLifeCycle.DEVELOP.getValue());
return updateById(task);
}
return false;
}
@Override
public boolean onLineTask(Integer id) {
Task task = getById(id);
Assert.check(task);
if (JobLifeCycle.RELEASE.equalsValue(task.getStep())) {
task.setStep(JobLifeCycle.ONLINE.getValue());
return updateById(task);
}
return false;
}
@Override
public boolean offLineTask(Integer id) {
Task task = getById(id);
Assert.check(task);
if (JobLifeCycle.ONLINE.equalsValue(task.getStep())) {
task.setStep(JobLifeCycle.RELEASE.getValue());
return updateById(task);
}
return false;
}
@Override
public boolean cancelTask(Integer id) {
Task task = getById(id);
Assert.check(task);
if (JobLifeCycle.ONLINE != JobLifeCycle.get(task.getStep())) {
task.setStep(JobLifeCycle.CANCEL.getValue());
return updateById(task);
}
return false;
}
@Override
public boolean recoveryTask(Integer id) {
Task task = getById(id);
Assert.check(task);
if (JobLifeCycle.CANCEL == JobLifeCycle.get(task.getStep())) {
task.setStep(JobLifeCycle.DEVELOP.getValue());
return updateById(task);
}
return false;
}
private JobConfig buildJobConfig(Task task) {
boolean isJarTask = isJarTask(task);
if(!isJarTask&&Asserts.isNotNull(task.getEnvId())&&task.getEnvId()!=0){
if (!isJarTask && Asserts.isNotNull(task.getEnvId()) && task.getEnvId() != 0) {
Task envTask = getTaskInfoById(task.getEnvId());
if(Asserts.isNotNull(envTask)&&Asserts.isNotNullString(envTask.getStatement())) {
if (Asserts.isNotNull(envTask) && Asserts.isNotNullString(envTask.getStatement())) {
task.setStatement(envTask.getStatement() + "\r\n" + task.getStatement());
}
}
JobConfig config = task.buildSubmitConfig();
config.setJarTask(isJarTask);
if (!JobManager.useGateway(config.getType())) {
if(GatewayType.LOCAL.equalsValue(config.getType())){
if (GatewayType.LOCAL.equalsValue(config.getType())) {
config.setUseRemote(false);
}
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
} else {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())||GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if(!isJarTask) {
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType()) || GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if (!isJarTask) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas", systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarMainAppClass", systemConfiguration.getSqlSubmitJarMainAppClass());
}else{
} else {
Jar jar = jarService.getById(task.getJarId());
Assert.check(jar);
gatewayConfig.put("userJarPath", jar.getPath());
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.JobInstanceMapper">
<select id="selectForProTable" resultType="com.dlink.model.JobInstance">
select
a.*
from
dlink_job_instance a
<where>
1=1
<if test='param.createTime!=null and param.createTime!=""'>
and a.create_time <![CDATA[>=]]> str_to_date( #{param.createTime},'%Y-%m-%d %H:%i:%s')
and a.create_time <![CDATA[<=]]> str_to_date( #{param.createTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='param.updateTime!=null and param.updateTime!=""'>
and a.update_time <![CDATA[>=]]> str_to_date( #{param.updateTime[0]},'%Y-%m-%d %H:%i:%s')
and a.update_time <![CDATA[<=]]> str_to_date( #{param.updateTime[1]},'%Y-%m-%d %H:%i:%s')
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
......@@ -22,17 +22,12 @@
<result column="env_id" property="envId" />
<result column="config_json" property="configJson" />
<result column="note" property="note" />
<result column="step" property="step" />
<result column="enabled" property="enabled" />
<result column="create_time" property="createTime" />
<result column="update_time" property="updateTime" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias,dialect, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,database_id,jar_id,env_id,config_json,note, enabled, create_time, update_time
</sql>
<select id="selectForProTable" resultType="com.dlink.model.Task">
select
a.*
......
package com.dlink.model;
import com.dlink.assertion.Asserts;
/**
* JobLifeCycle
*
* @author wenmo
* @since 2022/2/1 16:37
*/
public enum JobLifeCycle {
UNKNOWN(0,"未知"),
CREATE(1,"创建"),
DEVELOP(2,"开发"),
DEBUG(3,"调试"),
RELEASE(4,"发布"),
ONLINE(5,"上线"),
CANCEL(6,"注销");
private Integer value;
private String label;
JobLifeCycle(Integer value, String label) {
this.value = value;
this.label = label;
}
public Integer getValue() {
return value;
}
public String getLabel() {
return label;
}
public static JobLifeCycle get(Integer value){
for (JobLifeCycle item : JobLifeCycle.values()) {
if(item.getValue() == value){
return item;
}
}
return JobLifeCycle.UNKNOWN;
}
public boolean equalsValue(Integer step){
if(value == step){
return true;
}
return false;
}
}
......@@ -261,6 +261,7 @@ CREATE TABLE `dlink_task` (
`env_id` int(11) NULL DEFAULT NULL COMMENT '环境ID',
`config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON',
`note` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '注释',
`step` int(11) NULL DEFAULT NULL COMMENT '作业生命周期',
`enabled` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否启用',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
......@@ -298,5 +299,25 @@ CREATE TABLE `dlink_user` (
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `dlink_user`(`id`, `username`, `password`, `nickname`, `worknum`, `avatar`, `mobile`, `enabled`, `is_delete`, `create_time`, `update_time`) VALUES (1, 'admin', '21232f297a57a5a743894a0e4a801fc3', 'Admin', NULL, NULL, NULL, 1, 0, '2021-11-28 17:19:27', '2021-11-28 17:19:31');
-- ----------------------------
-- Table structure for dlink_job_instance
-- ----------------------------
DROP TABLE IF EXISTS `dlink_job_instance`;
create table dlink_job_instance
(
id int auto_increment comment '自增主键'
primary key,
name varchar(50) null comment '作业实例名',
task_id int null comment 'taskID',
cluster_id int null comment '集群ID',
jid varchar(50) null comment 'FlinkJobId',
status int null comment '实例状态',
history_id int null comment '提交历史ID',
create_time datetime null comment '创建时间',
update_time datetime null comment '更新时间',
finish_time int null comment '完成时间',
error text null comment '异常日志',
failed_restart_count int null comment '重启次数'
) comment '作业实例';
SET FOREIGN_KEY_CHECKS = 1;
......@@ -491,4 +491,28 @@ ADD COLUMN `env_id` int(11) NULL COMMENT '环境ID' AFTER `jar_id`;
ALTER TABLE `dlink_database`
ADD COLUMN `flink_config` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '环境ID' AFTER `note`;
-- ----------------------------
-- Table structure for dlink_job_instance
-- ----------------------------
DROP TABLE IF EXISTS `dlink_job_instance`;
create table dlink_job_instance
(
id int auto_increment comment '自增主键'
primary key,
name varchar(50) null comment '作业实例名',
task_id int null comment 'taskID',
cluster_id int null comment '集群ID',
jid varchar(50) null comment 'FlinkJobId',
status int null comment '实例状态',
history_id int null comment '提交历史ID',
create_time datetime null comment '创建时间',
update_time datetime null comment '更新时间',
finish_time int null comment '完成时间',
error text null comment '异常日志',
failed_restart_count int null comment '重启次数'
) comment '作业实例';
ALTER TABLE `dlink_task`
ADD COLUMN `step` int(11) NULL COMMENT '作业生命周期' AFTER `note`;
SET FOREIGN_KEY_CHECKS = 1;
......@@ -2,6 +2,11 @@ import request from "umi-request";
import {TableListParams} from "@/components/Common/data";
import {message, Modal} from "antd";
export const CODE = {
SUCCESS: 0,
ERROR: 1,
};
export async function queryData(url:string,params?: TableListParams) {
return request(url, {
method: 'POST',
......@@ -69,9 +74,13 @@ export const handleAddOrUpdate = async (url:string,fields: any) => {
const tipsTitle = fields.id ? "修改" : "添加";
const hide = message.loading(`正在${tipsTitle}`);
try {
const {msg} = await addOrUpdateData(url,{...fields});
const {code,msg} = await addOrUpdateData(url,{...fields});
hide();
message.success(msg);
if(code == CODE.SUCCESS){
message.success(msg);
}else{
message.warn(msg);
}
return true;
} catch (error) {
hide();
......@@ -84,9 +93,13 @@ export const handleAddOrUpdateWithResult = async (url:string,fields: any) => {
const tipsTitle = fields.id ? "修改" : "添加";
const hide = message.loading(`正在${tipsTitle}`);
try {
const {msg,datas} = await addOrUpdateData(url,{...fields});
const {code, msg,datas} = await addOrUpdateData(url,{...fields});
hide();
message.success(msg);
if(code == CODE.SUCCESS){
message.success(msg);
}else{
message.warn(msg);
}
return datas;
} catch (error) {
hide();
......@@ -99,9 +112,13 @@ export const handleRemove = async (url:string,selectedRows: []) => {
const hide = message.loading('正在删除');
if (!selectedRows) return true;
try {
const {msg} = await removeData(url,selectedRows.map((row) => row.id));
const {code, msg} = await removeData(url,selectedRows.map((row) => row.id));
hide();
message.success(msg);
if(code == CODE.SUCCESS){
message.success(msg);
}else{
message.warn(msg);
}
return true;
} catch (error) {
hide();
......@@ -113,9 +130,13 @@ export const handleRemove = async (url:string,selectedRows: []) => {
export const handleRemoveById = async (url:string,id: number) => {
const hide = message.loading('正在删除');
try {
const {msg} = await removeData(url,[id]);
const {code, msg} = await removeData(url,[id]);
hide();
message.success(msg);
if(code == CODE.SUCCESS){
message.success(msg);
}else{
message.warn(msg);
}
return true;
} catch (error) {
hide();
......@@ -128,9 +149,13 @@ export const handleSubmit = async (url:string,title:string,selectedRows: any[])
const hide = message.loading('正在'+title);
if (!selectedRows) return true;
try {
const {msg} = await postDataArray(url,selectedRows.map((row) => row.id));
const {code, msg} = await postDataArray(url,selectedRows.map((row) => row.id));
hide();
message.success(msg);
if(code == CODE.SUCCESS){
message.success(msg);
}else{
message.warn(msg);
}
return true;
} catch (error) {
hide();
......@@ -148,9 +173,13 @@ export const updateEnabled = (url:string,selectedRows: [], enabled: boolean) =>
export const handleOption = async (url:string,title:string,param:any) => {
const hide = message.loading('正在'+title);
try {
const {msg} = await postAll(url,param);
const {code, msg} = await postAll(url,param);
hide();
message.success(msg);
if(code == CODE.SUCCESS){
message.success(msg);
}else{
message.warn(msg);
}
return true;
} catch (error) {
hide();
......
......@@ -200,3 +200,27 @@ export function showClusterConfiguration(dispatch: any) {
});
});
}
/*--- 发布作业 ---*/
export function releaseTask(id: number) {
return getData('api/task/releaseTask',{id});
}
/*--- 发布作业 ---*/
export function developTask(id: number) {
return getData('api/task/developTask',{id});
}
/*--- 上线作业 ---*/
export function onLineTask(id: number) {
return getData('api/task/onLineTask',{id});
}
/*--- 下线作业 ---*/
export function offLineTask(id: number) {
return getData('api/task/offLineTask',{id});
}
/*--- 注销作业 ---*/
export function cancelTask(id: number) {
return getData('api/task/cancelTask',{id});
}
/*--- 恢复作业 ---*/
export function recoveryTask(id: number) {
return getData('api/task/recoveryTask',{id});
}
import styles from "./index.less";
import {Menu, Dropdown, Tooltip, Row, Col, Popconfirm, notification, Modal, message} from "antd";
import {
PauseCircleTwoTone, CopyTwoTone, DeleteTwoTone, PlayCircleTwoTone, DiffTwoTone,SnippetsTwoTone,
PauseCircleTwoTone, CarryOutTwoTone, DeleteTwoTone, PlayCircleTwoTone, CameraTwoTone,SnippetsTwoTone,
FileAddTwoTone, FolderOpenTwoTone, SafetyCertificateTwoTone, SaveTwoTone, FlagTwoTone,CodeTwoTone,
EnvironmentOutlined, SmileOutlined, RocketTwoTone, QuestionCircleTwoTone, MessageOutlined, ClusterOutlined
, EditTwoTone, RestTwoTone
} from "@ant-design/icons";
import Space from "antd/es/space";
import Divider from "antd/es/divider";
......@@ -11,14 +12,21 @@ import Button from "antd/es/button/button";
import Breadcrumb from "antd/es/breadcrumb/Breadcrumb";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import { postDataArray} from "@/components/Common/crud";
import {CODE, postDataArray} from "@/components/Common/crud";
import {executeSql, getJobPlan} from "@/pages/FlinkSqlStudio/service";
import StudioHelp from "./StudioHelp";
import StudioGraph from "./StudioGraph";
import {showCluster, showTables} from "@/components/Studio/StudioEvent/DDL";
import {
cancelTask, developTask,
offLineTask,
onLineTask, recoveryTask,
releaseTask,
showCluster,
showTables
} from "@/components/Studio/StudioEvent/DDL";
import React, {useCallback, useEffect, useState} from "react";
import StudioExplain from "../StudioConsole/StudioExplain";
import {DIALECT, isOnline, isSql} from "@/components/Studio/conf";
import {DIALECT, isOnline, isSql, TASKSTEPS} from "@/components/Studio/conf";
import {
ModalForm,
} from '@ant-design/pro-form';
......@@ -260,6 +268,114 @@ const StudioMenu = (props: any) => {
handleExportModalVisible(true);
};
const toReleaseTask = () => {
Modal.confirm({
title: '发布作业',
content: `确定发布作业【${current.task.alias}】吗?请确认您的作业是否已经被保存!`,
okText: '确认',
cancelText: '取消',
onOk: async () => {
const res = releaseTask(current.task.id);
res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.RELEASE);
if(result.code == CODE.SUCCESS) {
message.success(`发布作业【${current.task.alias}】成功`);
}
});
}
});
};
const toDevelopTask = () => {
Modal.confirm({
title: '维护作业',
content: `确定维护作业【${current.task.alias}】吗?`,
okText: '确认',
cancelText: '取消',
onOk: async () => {
const res = developTask(current.task.id);
res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.DEVELOP);
if(result.code == CODE.SUCCESS) {
message.success(`维护作业【${current.task.alias}】成功`);
}
});
}
});
};
const toOnLineTask = () => {
Modal.confirm({
title: '上线作业',
content: `确定上线作业【${current.task.alias}】吗?`,
okText: '确认',
cancelText: '取消',
onOk: async () => {
const res = onLineTask(current.task.id);
res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.ONLINE);
if(result.code == CODE.SUCCESS) {
message.success(`上线作业【${current.task.alias}】成功`);
}
});
}
});
};
const toOffLineTask = () => {
Modal.confirm({
title: '下线作业',
content: `确定下线作业【${current.task.alias}】吗?`,
okText: '确认',
cancelText: '取消',
onOk: async () => {
const res = offLineTask(current.task.id);
res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.RELEASE);
if(result.code == CODE.SUCCESS) {
message.success(`下线作业【${current.task.alias}】成功`);
}
});
}
});
};
const toCancelTask = () => {
Modal.confirm({
title: '注销作业',
content: `确定注销作业【${current.task.alias}】吗?`,
okText: '确认',
cancelText: '取消',
onOk: async () => {
const res = cancelTask(current.task.id);
res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.CANCEL);
if(result.code == CODE.SUCCESS) {
message.success(`注销作业【${current.task.alias}】成功`);
}
});
}
});
};
const toRecoveryTask = () => {
Modal.confirm({
title: '恢复作业',
content: `确定恢复作业【${current.task.alias}】吗?`,
okText: '确认',
cancelText: '取消',
onOk: async () => {
const res = recoveryTask(current.task.id);
res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.DEVELOP);
if(result.code == CODE.SUCCESS) {
message.success(`恢复作业【${current.task.alias}】成功`);
}
});
}
});
};
const runMenu = (
<Menu>
<Menu.Item onClick={execute}>SQL 查询</Menu.Item>
......@@ -404,33 +520,55 @@ const StudioMenu = (props: any) => {
onClick={submit}
/>
</Tooltip>
<Popconfirm
title="您确定要停止所有的 FlinkSql 任务吗?"
// onConfirm={confirm}
//onCancel={cancel}
okText="停止"
cancelText="取消"
>
<Tooltip title="停止所有的 FlinkSql 任务,暂不可用">
<Button
type="text"
icon={<PauseCircleTwoTone twoToneColor="#ddd"/>}
/>
</Tooltip>
</Popconfirm></>)}
</>)}
<Divider type="vertical"/>
<Button
type="text"
icon={<DiffTwoTone twoToneColor="#ddd"/>}
/>
<Button
type="text"
icon={<CopyTwoTone twoToneColor="#ddd"/>}
/>
<Button
type="text"
icon={<DeleteTwoTone twoToneColor="#ddd"/>}
/>
{current.task.step == TASKSTEPS.DEVELOP ?
<Tooltip title="发布,发布后将无法修改">
<Button
type="text"
icon={<CameraTwoTone/>}
onClick={toReleaseTask}
/>
</Tooltip>:undefined
}{current.task.step == TASKSTEPS.RELEASE ?
<><Tooltip title="维护,点击进入编辑状态">
<Button
type="text"
icon={<EditTwoTone />}
onClick={toDevelopTask}
/>
</Tooltip><Tooltip title="上线,上线后自动恢复、告警等将生效">
<Button
type="text"
icon={<CarryOutTwoTone />}
onClick={toOnLineTask}
/>
</Tooltip></>:undefined
}{current.task.step == TASKSTEPS.ONLINE ?
<Tooltip title="下线,将进入最新发布状态">
<Button
type="text"
icon={<PauseCircleTwoTone />}
onClick={toOffLineTask}
/>
</Tooltip>:undefined
}{(current.task.step != TASKSTEPS.ONLINE && current.task.step != TASKSTEPS.CANCEL) ?
<Tooltip title="注销,将进入回收站">
<Button
type="text"
icon={<DeleteTwoTone />}
onClick={toCancelTask}
/>
</Tooltip>:undefined
}{current.task.step == TASKSTEPS.CANCEL ?
<Tooltip title="恢复,将进入维护模式">
<Button
type="text"
icon={<RestTwoTone />}
onClick={toRecoveryTask}
/>
</Tooltip>:undefined
}
<Tooltip title="查看使用帮助">
<Button
type="text"
......@@ -506,6 +644,11 @@ const mapDispatchToProps = (dispatch: Dispatch)=>({
}),changeFullScreen:(isFull: boolean)=>dispatch({
type: "Studio/changeFullScreen",
payload: isFull,
}),changeTaskStep:(id: number, step: number)=>dispatch({
type: "Studio/changeTaskStep",
payload: {
id,step
},
}),
});
......
......@@ -53,3 +53,14 @@ export const isOnline = (type: string)=>{
return false;
}
}
export const TASKSTEPS = {
UNKNOWN: 0,
CREATE: 1,
DEVELOP: 2,
DEBUG: 3,
RELEASE: 4,
ONLINE: 5,
CANCEL: 6,
};
......@@ -189,6 +189,7 @@ export type ModelType = {
saveDataBase: Reducer<StateType>;
saveEnv: Reducer<StateType>;
saveChart: Reducer<StateType>;
changeTaskStep: Reducer<StateType>;
};
};
......@@ -495,6 +496,23 @@ const Model: ModelType = {
tabs: {...newTabs},
};
},
changeTaskStep(state, {payload}) {
const newTabs = state.tabs;
let newCurrent = state.current;
for (let i = 0; i < newTabs.panes.length; i++) {
if (newTabs.panes[i].task.id == payload.id) {
newTabs.panes[i].task.step = payload.step;
if(newCurrent.key == newTabs.panes[i].key){
newCurrent = newTabs.panes[i];
}
}
}
return {
...state,
current: {...newCurrent},
tabs: {...newTabs},
};
},
},
};
......
......@@ -641,6 +641,9 @@ export default (): React.ReactNode => {
<li>
<Link>新增 CDCSOURCE 多源合并任务语法支持</Link>
</li>
<li>
<Link>新增 作业生命周期管理</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment