Unverified Commit f0c13170 authored by mydq's avatar mydq Committed by GitHub

Pr select checkpoint restart (#724)

* select save point restart

* select save point restart

* select save point restart

* pulsar sink : 程序包com.sun.istack.internal不存在

* select save point restart

* select save point restart
parent 5fe68097
...@@ -9,4 +9,8 @@ package com.dlink.assertion; ...@@ -9,4 +9,8 @@ package com.dlink.assertion;
public class Tips { public class Tips {
public static final String TASK_NOT_EXIST = "作业不存在"; public static final String TASK_NOT_EXIST = "作业不存在";
public static final String SAVEPOINT_IS_NULL = "保存点为空";
} }
package com.dlink.controller; package com.dlink.controller;
import com.dlink.assertion.Asserts;
import com.dlink.model.JobInstance;
import com.dlink.model.Task;
import com.dlink.service.JobInstanceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.dto.APICancelDTO; import com.dlink.dto.APICancelDTO;
import com.dlink.dto.APIExecuteJarDTO; import com.dlink.dto.APIExecuteJarDTO;
...@@ -20,10 +8,17 @@ import com.dlink.dto.APIExplainSqlDTO; ...@@ -20,10 +8,17 @@ import com.dlink.dto.APIExplainSqlDTO;
import com.dlink.dto.APISavePointDTO; import com.dlink.dto.APISavePointDTO;
import com.dlink.dto.APISavePointTaskDTO; import com.dlink.dto.APISavePointTaskDTO;
import com.dlink.service.APIService; import com.dlink.service.APIService;
import com.dlink.service.JobInstanceService;
import com.dlink.service.StudioService; import com.dlink.service.StudioService;
import com.dlink.service.TaskService; import com.dlink.service.TaskService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/** /**
* APIController * APIController
...@@ -100,7 +95,15 @@ public class APIController { ...@@ -100,7 +95,15 @@ public class APIController {
*/ */
@GetMapping("/restartTask") @GetMapping("/restartTask")
public Result restartTask(@RequestParam Integer id) { public Result restartTask(@RequestParam Integer id) {
return Result.succeed(taskService.restartTask(id), "重启成功"); return Result.succeed(taskService.restartTask(id, null), "重启成功");
}
/**
* 选择保存点重启任务
*/
@GetMapping("/selectSavePointRestartTask")
public Result restartTask(@RequestParam Integer id, @RequestParam String savePointPath) {
return Result.succeed(taskService.restartTask(id, savePointPath), "重启成功");
} }
/** /**
...@@ -124,7 +127,16 @@ public class APIController { ...@@ -124,7 +127,16 @@ public class APIController {
*/ */
@GetMapping("/reOnLineTask") @GetMapping("/reOnLineTask")
public Result reOnLineTask(@RequestParam Integer id) { public Result reOnLineTask(@RequestParam Integer id) {
return taskService.reOnLineTask(id); return taskService.reOnLineTask(id, null);
}
/**
* 选择保存点重新上线任务
*/
@GetMapping("/selectSavePointReOnLineTask")
public Result selectSavePointReOnLineTask(@RequestParam Integer id, @RequestParam String savePointPath) {
return taskService.reOnLineTask(id, savePointPath);
} }
/** /**
......
...@@ -183,9 +183,21 @@ public class TaskController { ...@@ -183,9 +183,21 @@ public class TaskController {
@GetMapping(value = "/restartTask") @GetMapping(value = "/restartTask")
public Result restartTask(@RequestParam Integer id, @RequestParam Boolean isOnLine) { public Result restartTask(@RequestParam Integer id, @RequestParam Boolean isOnLine) {
if (isOnLine) { if (isOnLine) {
return taskService.reOnLineTask(id); return taskService.reOnLineTask(id, null);
} else { } else {
return Result.succeed(taskService.restartTask(id), "重启成功"); return Result.succeed(taskService.restartTask(id, null), "重启成功");
}
}
/**
* 选择保存点重启任务
*/
@GetMapping(value = "/selectSavePointRestartTask")
public Result selectSavePointRestartTask(@RequestParam Integer id, @RequestParam Boolean isOnLine, @RequestParam String savePointPath) {
if (isOnLine) {
return taskService.reOnLineTask(id, savePointPath);
} else {
return Result.succeed(taskService.restartTask(id, savePointPath), "重启成功");
} }
} }
......
package com.dlink.service; package com.dlink.service;
import java.util.List;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.db.service.ISuperService; import com.dlink.db.service.ISuperService;
import com.dlink.dto.TaskRollbackVersionDTO; import com.dlink.dto.TaskRollbackVersionDTO;
...@@ -12,6 +10,8 @@ import com.dlink.model.JobInstance; ...@@ -12,6 +10,8 @@ import com.dlink.model.JobInstance;
import com.dlink.model.Task; import com.dlink.model.Task;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import java.util.List;
/** /**
* 作业 服务类 * 作业 服务类
* *
...@@ -22,9 +22,9 @@ public interface TaskService extends ISuperService<Task> { ...@@ -22,9 +22,9 @@ public interface TaskService extends ISuperService<Task> {
JobResult submitTask(Integer id); JobResult submitTask(Integer id);
JobResult submitTaskToOnline(Integer id); JobResult submitTaskToOnline(Task dtoTask, Integer id);
JobResult restartTask(Integer id); JobResult restartTask(Integer id, String savePointPath);
List<SqlExplainResult> explainTask(Integer id); List<SqlExplainResult> explainTask(Integer id);
...@@ -46,7 +46,7 @@ public interface TaskService extends ISuperService<Task> { ...@@ -46,7 +46,7 @@ public interface TaskService extends ISuperService<Task> {
Result onLineTask(Integer id); Result onLineTask(Integer id);
Result reOnLineTask(Integer id); Result reOnLineTask(Integer id, String savePointPath);
Result offLineTask(Integer id, String type); Result offLineTask(Integer id, String type);
......
package com.dlink.service.impl; package com.dlink.service.impl;
import java.net.InetAddress; import cn.hutool.core.bean.BeanUtil;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.Alert; import com.dlink.alert.Alert;
...@@ -89,8 +70,25 @@ import com.dlink.service.TaskVersionService; ...@@ -89,8 +70,25 @@ import com.dlink.service.TaskVersionService;
import com.dlink.utils.CustomStringJavaCompiler; import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import cn.hutool.core.bean.BeanUtil; import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/** /**
* 任务 服务实现类 * 任务 服务实现类
...@@ -159,8 +157,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -159,8 +157,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
@Override @Override
public JobResult submitTaskToOnline(Integer id) { public JobResult submitTaskToOnline(Task dtoTask, Integer id) {
Task task = this.getTaskInfoById(id); final Task task = (dtoTask == null ? this.getTaskInfoById(id) : dtoTask);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST); Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
task.setStep(JobLifeCycle.ONLINE.getValue()); task.setStep(JobLifeCycle.ONLINE.getValue());
if (Dialect.isSql(task.getDialect())) { if (Dialect.isSql(task.getDialect())) {
...@@ -177,7 +175,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -177,7 +175,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
@Override @Override
public JobResult restartTask(Integer id) { public JobResult restartTask(Integer id, String savePointPath) {
Task task = this.getTaskInfoById(id); Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST); Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) { if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) {
...@@ -187,7 +185,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -187,7 +185,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return executeCommonSql(SqlDTO.build(task.getStatement(), return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null)); task.getDatabaseId(), null));
} }
if (StringUtils.isBlank(savePointPath)){
task.setSavePointStrategy(SavePointStrategy.LATEST.getValue()); task.setSavePointStrategy(SavePointStrategy.LATEST.getValue());
}else {
task.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue());
task.setSavePointPath(savePointPath);
updateById(task);
}
JobConfig config = buildJobConfig(task); JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
if (!config.isJarTask()) { if (!config.isJarTask()) {
...@@ -197,6 +201,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -197,6 +201,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
} }
private JobResult executeCommonSql(SqlDTO sqlDTO) { private JobResult executeCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult(); JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement()); result.setStatement(sqlDTO.getStatement());
...@@ -508,13 +514,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -508,13 +514,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override @Override
public Result onLineTask(Integer id) { public Result onLineTask(Integer id) {
Task task = getTaskInfoById(id); final Task task = getTaskInfoById(id);
Assert.check(task); Assert.check(task);
if (JobLifeCycle.RELEASE.equalsValue(task.getStep())) { if (JobLifeCycle.RELEASE.equalsValue(task.getStep())) {
if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) { if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) {
return Result.failed("当前发布状态下有作业正在运行,上线失败,请停止后上线"); return Result.failed("当前发布状态下有作业正在运行,上线失败,请停止后上线");
} }
JobResult jobResult = submitTaskToOnline(id); final JobResult jobResult = submitTaskToOnline(task, id);
if (Job.JobStatus.SUCCESS == jobResult.getStatus()) { if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
task.setStep(JobLifeCycle.ONLINE.getValue()); task.setStep(JobLifeCycle.ONLINE.getValue());
task.setJobInstanceId(jobResult.getJobInstanceId()); task.setJobInstanceId(jobResult.getJobInstanceId());
...@@ -533,13 +539,17 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -533,13 +539,17 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
@Override @Override
public Result reOnLineTask(Integer id) { public Result reOnLineTask(Integer id, String savePointPath) {
Task task = this.getTaskInfoById(id); final Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST); Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) { if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) {
savepointJobInstance(task.getJobInstanceId(), SavePointType.CANCEL.getValue()); savepointJobInstance(task.getJobInstanceId(), SavePointType.CANCEL.getValue());
} }
JobResult jobResult = submitTaskToOnline(id); if (StringUtils.isNotBlank(savePointPath)){
task.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue());
task.setSavePointPath(savePointPath);
}
final JobResult jobResult = submitTaskToOnline(task, id);
if (Job.JobStatus.SUCCESS == jobResult.getStatus()) { if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
task.setStep(JobLifeCycle.ONLINE.getValue()); task.setStep(JobLifeCycle.ONLINE.getValue());
task.setJobInstanceId(jobResult.getJobInstanceId()); task.setJobInstanceId(jobResult.getJobInstanceId());
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package com.dlink.connector.pulsar; package com.dlink.connector.pulsar;
import com.sun.istack.internal.Nullable;
import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.ChangelogMode;
...@@ -73,7 +72,7 @@ public class PulsarDynamicSink implements DynamicTableSink { ...@@ -73,7 +72,7 @@ public class PulsarDynamicSink implements DynamicTableSink {
/** /**
* Optional format for encoding to Pulsar. * Optional format for encoding to Pulsar.
*/ */
protected final @Nullable protected final
EncodingFormat<SerializationSchema<RowData>> encodingFormat; EncodingFormat<SerializationSchema<RowData>> encodingFormat;
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
...@@ -113,7 +112,7 @@ public class PulsarDynamicSink implements DynamicTableSink { ...@@ -113,7 +112,7 @@ public class PulsarDynamicSink implements DynamicTableSink {
public PulsarDynamicSink( public PulsarDynamicSink(
DataType physicalDataType, DataType physicalDataType,
@Nullable EncodingFormat<SerializationSchema<RowData>> encodingFormat, EncodingFormat<SerializationSchema<RowData>> encodingFormat,
String topic, String topic,
String service_url, String service_url,
String update_mode, String update_mode,
......
import {Button, Descriptions, Empty, Modal, Tabs, Tag} from 'antd'; import {Button, Descriptions, Empty, message, Modal, Tabs, Tag} from 'antd';
import { import {
CheckCircleOutlined, CheckCircleOutlined,
CloseCircleOutlined, CloseCircleOutlined,
...@@ -11,7 +11,10 @@ import {parseByteStr, parseMilliSecondStr, parseSecondStr} from "@/components/Co ...@@ -11,7 +11,10 @@ import {parseByteStr, parseMilliSecondStr, parseSecondStr} from "@/components/Co
import ProTable, {ActionType, ProColumns} from "@ant-design/pro-table"; import ProTable, {ActionType, ProColumns} from "@ant-design/pro-table";
import {useRef} from "react"; import {useRef} from "react";
import {CheckPointsDetailInfo, SavePointInfo} from "@/pages/DevOps/data"; import {CheckPointsDetailInfo, SavePointInfo} from "@/pages/DevOps/data";
import {queryData} from "@/components/Common/crud"; import {CODE, queryData} from "@/components/Common/crud";
import {selectSavePointRestartTask} from "@/pages/DevOps/service";
import {JOB_LIFE_CYCLE} from "@/components/Common/JobLifeCycle";
import {history, useLocation} from 'umi';
const {TabPane} = Tabs; const {TabPane} = Tabs;
...@@ -160,7 +163,15 @@ const CheckPoints = (props: any) => { ...@@ -160,7 +163,15 @@ const CheckPoints = (props: any) => {
onOk: async () => { onOk: async () => {
// TODO: handleRecoveryCheckPoint // TODO: handleRecoveryCheckPoint
// await handleRecoveryCheckPoint('api/task/recoveryCheckPoint', [row]); // await handleRecoveryCheckPoint('api/task/recoveryCheckPoint', [row]);
actionRef.current?.reload?.(); const res = selectSavePointRestartTask(job?.instance?.taskId, job?.instance?.step == JOB_LIFE_CYCLE.ONLINE, row.external_path);
res.then((result) => {
if (result.code == CODE.SUCCESS) {
message.success("恢复作业成功");
history.goBack();
} else {
message.error("恢复作业失败");
}
});
} }
}); });
} }
...@@ -245,7 +256,7 @@ const CheckPoints = (props: any) => { ...@@ -245,7 +256,7 @@ const CheckPoints = (props: any) => {
render: (dom, entity) => { render: (dom, entity) => {
return <> return <>
{entity.status === 'COMPLETED' ? {entity.status === 'COMPLETED' ?
<Button disabled title="暂不可用" onClick={() => recoveryCheckPoint(entity)}>此处恢复</Button> : undefined} <Button title="暂不可用" onClick={() => recoveryCheckPoint(entity)}>此处恢复</Button> : undefined}
</> </>
}, },
}, },
......
...@@ -23,3 +23,7 @@ export function getJobManagerInfo(address: string) { ...@@ -23,3 +23,7 @@ export function getJobManagerInfo(address: string) {
export function getTaskManagerInfo(address: string) { export function getTaskManagerInfo(address: string) {
return getData("api/jobInstance/getTaskManagerInfo", {address}); return getData("api/jobInstance/getTaskManagerInfo", {address});
} }
export function selectSavePointRestartTask(id: number,isOnLine: boolean, savePointPath: string) {
return getData("api/task/selectSavePointRestartTask", {id,isOnLine,savePointPath});
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment