Commit 53710e6e authored by wenmo's avatar wenmo

新增 作业生命周期与任务实例同步联动

parent 1b21aa6f
...@@ -159,7 +159,7 @@ public class TaskController { ...@@ -159,7 +159,7 @@ public class TaskController {
*/ */
@GetMapping(value = "/cancelTask") @GetMapping(value = "/cancelTask")
public Result cancelTask(@RequestParam Integer id) { public Result cancelTask(@RequestParam Integer id) {
return Result.succeed(taskService.cancelTask(id),"操作成功"); return taskService.cancelTask(id);
} }
/** /**
......
...@@ -46,7 +46,7 @@ public interface TaskService extends ISuperService<Task> { ...@@ -46,7 +46,7 @@ public interface TaskService extends ISuperService<Task> {
Result offLineTask(Integer id, String type); Result offLineTask(Integer id, String type);
boolean cancelTask(Integer id); Result cancelTask(Integer id);
boolean recoveryTask(Integer id); boolean recoveryTask(Integer id);
......
...@@ -349,7 +349,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -349,7 +349,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.setStep(JobLifeCycle.ONLINE.getValue()); task.setStep(JobLifeCycle.ONLINE.getValue());
task.setJobInstanceId(jobResult.getJobInstanceId()); task.setJobInstanceId(jobResult.getJobInstanceId());
if (updateById(task)) { if (updateById(task)) {
return Result.succeed("上线成功"); return Result.succeed(jobResult,"上线成功");
} else { } else {
return Result.failed("由于未知原因,上线失败"); return Result.failed("由于未知原因,上线失败");
} }
...@@ -383,14 +383,21 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -383,14 +383,21 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
@Override @Override
public boolean cancelTask(Integer id) { public Result cancelTask(Integer id) {
Task task = getById(id); Task task = getById(id);
Assert.check(task); Assert.check(task);
if (JobLifeCycle.ONLINE != JobLifeCycle.get(task.getStep())) { if (JobLifeCycle.ONLINE != JobLifeCycle.get(task.getStep())) {
if(Asserts.isNotNull(task.getJobInstanceId())&&task.getJobInstanceId()!=0){
return Result.failed("当前有作业正在运行,注销失败,请停止后注销");
}
task.setStep(JobLifeCycle.CANCEL.getValue()); task.setStep(JobLifeCycle.CANCEL.getValue());
return updateById(task); if (updateById(task)) {
return Result.succeed("注销成功");
} else {
return Result.failed("由于未知原因,注销失败");
} }
return false; }
return Result.failed("当前有作业已上线,无法注销,请下线后注销");
} }
@Override @Override
......
...@@ -26,7 +26,16 @@ import { ...@@ -26,7 +26,16 @@ import {
} from "@/components/Studio/StudioEvent/DDL"; } from "@/components/Studio/StudioEvent/DDL";
import React, {useCallback, useEffect, useState} from "react"; import React, {useCallback, useEffect, useState} from "react";
import StudioExplain from "../StudioConsole/StudioExplain"; import StudioExplain from "../StudioConsole/StudioExplain";
import {DIALECT, isOnline, isSql, TASKSTEPS} from "@/components/Studio/conf"; import {
DIALECT,
isDeletedTask,
isExecuteSql,
isOnline,
isRunningTask,
isSql,
isTask,
TASKSTEPS
} from "@/components/Studio/conf";
import { import {
ModalForm, ModalForm,
} from '@ant-design/pro-form'; } from '@ant-design/pro-form';
...@@ -110,7 +119,7 @@ const StudioMenu = (props: any) => { ...@@ -110,7 +119,7 @@ const StudioMenu = (props: any) => {
result.then(res => { result.then(res => {
notification.close(taskKey); notification.close(taskKey);
if (res.datas.success) { if (res.datas.success) {
props.changeTaskJobInstance(current.task.id,res.datas.jobInstanceId); res.datas?.jobInstanceId&&props.changeTaskJobInstance(current.task.id,res.datas?.jobInstanceId);
message.success('执行成功'); message.success('执行成功');
} else { } else {
message.error('执行失败'); message.error('执行失败');
...@@ -155,7 +164,7 @@ const StudioMenu = (props: any) => { ...@@ -155,7 +164,7 @@ const StudioMenu = (props: any) => {
const res = await postDataArray('/api/task/submit', [task.id]); const res = await postDataArray('/api/task/submit', [task.id]);
notification.close(taskKey); notification.close(taskKey);
if (res.datas[0].success) { if (res.datas[0].success) {
props.changeTaskJobInstance(current.task.id,res.datas[0].jobInstanceId); res.datas[0].jobInstanceId && props.changeTaskJobInstance(current.task.id,res.datas[0].jobInstanceId);
message.success('异步提交成功'); message.success('异步提交成功');
} else { } else {
message.success('异步提交失败'); message.success('异步提交失败');
...@@ -317,8 +326,9 @@ const StudioMenu = (props: any) => { ...@@ -317,8 +326,9 @@ const StudioMenu = (props: any) => {
onOk: async () => { onOk: async () => {
const res = onLineTask(current.task.id); const res = onLineTask(current.task.id);
res.then((result) => { res.then((result) => {
if(result.code == CODE.SUCCESS) { if(result.code === CODE.SUCCESS) {
props.changeTaskStep(current.task.id,TASKSTEPS.ONLINE); props.changeTaskStep(current.task.id,TASKSTEPS.ONLINE);
result.datas?.jobInstanceId && props.changeTaskJobInstance(current.task.id,result.datas?.jobInstanceId);
message.success(`上线作业【${current.task.alias}】成功`); message.success(`上线作业【${current.task.alias}】成功`);
}else { }else {
message.error(`上线作业【${current.task.alias}】失败,原因:\n${result.msg}`); message.error(`上线作业【${current.task.alias}】失败,原因:\n${result.msg}`);
...@@ -337,7 +347,10 @@ const StudioMenu = (props: any) => { ...@@ -337,7 +347,10 @@ const StudioMenu = (props: any) => {
onOk: async () => { onOk: async () => {
const res = offLineTask(current.task.id,type); const res = offLineTask(current.task.id,type);
res.then((result) => { res.then((result) => {
if(result.code == CODE.SUCCESS) { if(result.code === CODE.SUCCESS) {
if(current.task.step === TASKSTEPS.ONLINE){
props.changeTaskStep(current.task.id,TASKSTEPS.RELEASE);
}
props.changeTaskJobInstance(current.task.id,0); props.changeTaskJobInstance(current.task.id,0);
message.success(`停止作业【${current.task.alias}】成功`); message.success(`停止作业【${current.task.alias}】成功`);
}else { }else {
...@@ -357,8 +370,9 @@ const StudioMenu = (props: any) => { ...@@ -357,8 +370,9 @@ const StudioMenu = (props: any) => {
onOk: async () => { onOk: async () => {
const res = offLineTask(current.task.id,type); const res = offLineTask(current.task.id,type);
res.then((result) => { res.then((result) => {
if(result.code == CODE.SUCCESS) { if(result.code === CODE.SUCCESS) {
props.changeTaskStep(current.task.id,TASKSTEPS.RELEASE); props.changeTaskStep(current.task.id,TASKSTEPS.RELEASE);
props.changeTaskJobInstance(current.task.id,0);
message.success(`下线作业【${current.task.alias}】成功`); message.success(`下线作业【${current.task.alias}】成功`);
}else { }else {
message.error(`下线作业【${current.task.alias}】失败,原因:\n${result.msg}`); message.error(`下线作业【${current.task.alias}】失败,原因:\n${result.msg}`);
...@@ -377,9 +391,11 @@ const StudioMenu = (props: any) => { ...@@ -377,9 +391,11 @@ const StudioMenu = (props: any) => {
onOk: async () => { onOk: async () => {
const res = cancelTask(current.task.id); const res = cancelTask(current.task.id);
res.then((result) => { res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.CANCEL); if(result.code === CODE.SUCCESS) {
if(result.code == CODE.SUCCESS) { props.changeTaskStep(current.task.id,TASKSTEPS.CANCEL);
message.success(`注销作业【${current.task.alias}】成功`); message.success(`注销作业【${current.task.alias}】成功`);
}else {
message.error(`注销作业【${current.task.alias}】失败,原因:\n${result.msg}`);
} }
}); });
} }
...@@ -404,6 +420,22 @@ const StudioMenu = (props: any) => { ...@@ -404,6 +420,22 @@ const StudioMenu = (props: any) => {
}); });
}; };
const isShowGetStreamGraphBtn = () => {
return (!current.task.dialect||current.task.dialect === DIALECT.FLINKSQL);
};
const isShowExecuteBtn = () => {
return !isDeletedTask(current.task.step) && isExecuteSql( current.task.dialect ) && !isRunningTask(current.task.jobInstanceId);
};
const isShowSubmitBtn = () => {
return !isDeletedTask(current.task.step) && isTask( current.task.dialect ) && !isRunningTask(current.task.jobInstanceId);
};
const isShowCancelTaskBtn = () => {
return !isDeletedTask(current.task.step) && isTask( current.task.dialect ) && isRunningTask(current.task.jobInstanceId);
};
const runMenu = ( const runMenu = (
<Menu> <Menu>
<Menu.Item onClick={execute}>SQL 查询</Menu.Item> <Menu.Item onClick={execute}>SQL 查询</Menu.Item>
...@@ -414,7 +446,7 @@ const StudioMenu = (props: any) => { ...@@ -414,7 +446,7 @@ const StudioMenu = (props: any) => {
const getPathItem = (paths) => { const getPathItem = (paths) => {
let itemList = []; let itemList = [];
for (let item of paths) { for (let item of paths) {
itemList.push(<Breadcrumb.Item>{item}</Breadcrumb.Item>) itemList.push(<Breadcrumb.Item key={item}>{item}</Breadcrumb.Item>)
} }
return itemList; return itemList;
}; };
...@@ -430,6 +462,7 @@ const StudioMenu = (props: any) => { ...@@ -430,6 +462,7 @@ const StudioMenu = (props: any) => {
}, },
}); });
}; };
return ( return (
<Row className={styles.container}> <Row className={styles.container}>
<Col span={24}> <Col span={24}>
...@@ -523,7 +556,7 @@ const StudioMenu = (props: any) => { ...@@ -523,7 +556,7 @@ const StudioMenu = (props: any) => {
onClick={onCheckSql} onClick={onCheckSql}
/> />
</Tooltip> </Tooltip>
{(!current.task.dialect||current.task.dialect === DIALECT.FLINKSQL) &&( {isShowGetStreamGraphBtn() &&(
<Tooltip title="获取当前的 FlinkSql 的执行图"> <Tooltip title="获取当前的 FlinkSql 的执行图">
<Button <Button
type="text" type="text"
...@@ -531,15 +564,7 @@ const StudioMenu = (props: any) => { ...@@ -531,15 +564,7 @@ const StudioMenu = (props: any) => {
onClick={onGetStreamGraph} onClick={onGetStreamGraph}
/> />
</Tooltip>)} </Tooltip>)}
{ current.task.jobInstanceId&&(current.task.jobInstanceId != 0) ? {isShowExecuteBtn() &&(
<Tooltip title="停止">
<Button
type="text"
icon={<PauseCircleTwoTone />}
onClick={()=>handleCancelTask('canceljob')}
/>
</Tooltip>:<>
{(!current.task.dialect||current.task.dialect === DIALECT.FLINKSQL||isSql( current.task.dialect )) &&(
<Tooltip title="执行当前的 SQL"> <Tooltip title="执行当前的 SQL">
<Button <Button
type="text" type="text"
...@@ -548,7 +573,7 @@ const StudioMenu = (props: any) => { ...@@ -548,7 +573,7 @@ const StudioMenu = (props: any) => {
onClick={execute} onClick={execute}
/> />
</Tooltip>)} </Tooltip>)}
{(!current.task.dialect||current.task.dialect === DIALECT.FLINKSQL||current.task.dialect === DIALECT.FLINKJAR||isSql( current.task.dialect )) &&(<> {isShowSubmitBtn() &&(<>
<Tooltip title="提交当前的作业到集群,提交前请手动保存"> <Tooltip title="提交当前的作业到集群,提交前请手动保存">
<Button <Button
type="text" type="text"
...@@ -557,7 +582,14 @@ const StudioMenu = (props: any) => { ...@@ -557,7 +582,14 @@ const StudioMenu = (props: any) => {
/> />
</Tooltip> </Tooltip>
</>)} </>)}
</> {isShowCancelTaskBtn() &&
<Tooltip title="停止">
<Button
type="text"
icon={<PauseCircleTwoTone />}
onClick={()=>handleCancelTask('canceljob')}
/>
</Tooltip>
} }
<Divider type="vertical"/> <Divider type="vertical"/>
{current.task.step == TASKSTEPS.DEVELOP ? {current.task.step == TASKSTEPS.DEVELOP ?
...@@ -575,7 +607,8 @@ const StudioMenu = (props: any) => { ...@@ -575,7 +607,8 @@ const StudioMenu = (props: any) => {
icon={<EditTwoTone />} icon={<EditTwoTone />}
onClick={toDevelopTask} onClick={toDevelopTask}
/> />
</Tooltip><Tooltip title="上线,上线后自动恢复、告警等将生效"> </Tooltip>
<Tooltip title="上线,上线后自动恢复、告警等将生效">
<Button <Button
type="text" type="text"
icon={<CarryOutTwoTone />} icon={<CarryOutTwoTone />}
...@@ -688,7 +721,7 @@ const mapDispatchToProps = (dispatch: Dispatch)=>({ ...@@ -688,7 +721,7 @@ const mapDispatchToProps = (dispatch: Dispatch)=>({
id,step id,step
}, },
}),changeTaskJobInstance:(id: number, jobInstanceId: number)=>dispatch({ }),changeTaskJobInstance:(id: number, jobInstanceId: number)=>dispatch({
type: "Studio/changeTaskStep", type: "Studio/changeTaskJobInstance",
payload: { payload: {
id,jobInstanceId id,jobInstanceId
}, },
......
export const RUN_MODE = { export const RUN_MODE = {
LOCAL:'local', LOCAL: 'local',
STANDALONE:'standalone', STANDALONE: 'standalone',
YARN_SESSION:'yarn-session', YARN_SESSION: 'yarn-session',
YARN_PER_JOB:'yarn-per-job', YARN_PER_JOB: 'yarn-per-job',
YARN_APPLICATION:'yarn-application', YARN_APPLICATION: 'yarn-application',
KUBERNETES_SESSION:'kubernetes-session', KUBERNETES_SESSION: 'kubernetes-session',
KUBERNETES_APPLICATION:'kubernetes-application', KUBERNETES_APPLICATION: 'kubernetes-application',
}; };
export const DIALECT = { export const DIALECT = {
FLINKSQL:'FlinkSql', FLINKSQL: 'FlinkSql',
FLINKJAR:'FlinkJar', FLINKJAR: 'FlinkJar',
FLINKSQLENV:'FlinkSqlEnv', FLINKSQLENV: 'FlinkSqlEnv',
SQL:'Sql', SQL: 'Sql',
MYSQL:'Mysql', MYSQL: 'Mysql',
ORACLE:'Oracle', ORACLE: 'Oracle',
SQLSERVER:'SqlServer', SQLSERVER: 'SqlServer',
POSTGRESQL:'PostGreSql', POSTGRESQL: 'PostGreSql',
CLICKHOUSE:'ClickHouse', CLICKHOUSE: 'ClickHouse',
DORIS:'Doris', DORIS: 'Doris',
JAVA:'Java', JAVA: 'Java',
}; };
export const CHART = { export const CHART = {
LINE:'折线图', LINE: '折线图',
BAR:'条形图', BAR: '条形图',
PIE:'饼图', PIE: '饼图',
}; };
export const isSql = (dialect: string)=>{ export const isSql = (dialect: string) => {
switch (dialect){ switch (dialect) {
case DIALECT.SQL: case DIALECT.SQL:
case DIALECT.MYSQL: case DIALECT.MYSQL:
case DIALECT.ORACLE: case DIALECT.ORACLE:
...@@ -43,8 +43,54 @@ export const isSql = (dialect: string)=>{ ...@@ -43,8 +43,54 @@ export const isSql = (dialect: string)=>{
} }
}; };
export const isOnline = (type: string)=>{ export const isExecuteSql = (dialect: string) => {
switch (type){ if (!dialect) {
return true;
}
switch (dialect) {
case DIALECT.SQL:
case DIALECT.MYSQL:
case DIALECT.ORACLE:
case DIALECT.SQLSERVER:
case DIALECT.POSTGRESQL:
case DIALECT.CLICKHOUSE:
case DIALECT.DORIS:
case DIALECT.FLINKSQL:
return true;
default:
return false;
}
};
export const isTask = (dialect: string) => {
if (!dialect) {
return true;
}
switch (dialect) {
case DIALECT.SQL:
case DIALECT.MYSQL:
case DIALECT.ORACLE:
case DIALECT.SQLSERVER:
case DIALECT.POSTGRESQL:
case DIALECT.CLICKHOUSE:
case DIALECT.DORIS:
case DIALECT.FLINKSQL:
case DIALECT.FLINKJAR:
return true;
default:
return false;
}
};
export const isRunningTask = (jobInstanceId: number) => {
if (jobInstanceId && jobInstanceId != 0) {
return true;
}
return false;
};
export const isOnline = (type: string) => {
switch (type) {
case RUN_MODE.LOCAL: case RUN_MODE.LOCAL:
case RUN_MODE.STANDALONE: case RUN_MODE.STANDALONE:
case RUN_MODE.YARN_SESSION: case RUN_MODE.YARN_SESSION:
...@@ -65,3 +111,9 @@ export const TASKSTEPS = { ...@@ -65,3 +111,9 @@ export const TASKSTEPS = {
CANCEL: 6, CANCEL: 6,
}; };
export const isDeletedTask = (step: number) => {
if (step && step === TASKSTEPS.CANCEL) {
return true;
}
return false;
};
...@@ -743,6 +743,9 @@ export default (): React.ReactNode => { ...@@ -743,6 +743,9 @@ export default (): React.ReactNode => {
<li> <li>
<Link>修复 用户未登录时后台报错及鉴权问题</Link> <Link>修复 用户未登录时后台报错及鉴权问题</Link>
</li> </li>
<li>
<Link>新增 作业生命周期与任务实例同步联动</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment