Commit 5ee33806 authored by wenmo's avatar wenmo

新增 作业发布时进行语法校验和逻辑检查

parent 04de7e61
...@@ -29,7 +29,7 @@ public class APIController { ...@@ -29,7 +29,7 @@ public class APIController {
@GetMapping(value = "/submitTask") @GetMapping(value = "/submitTask")
public Result submitTask(@RequestParam Integer id) { public Result submitTask(@RequestParam Integer id) {
return Result.succeed(taskService.submitByTaskId(id),"执行成功"); return Result.succeed(taskService.submitTask(id),"执行成功");
} }
@PostMapping("/executeSql") @PostMapping("/executeSql")
......
...@@ -81,7 +81,7 @@ public class TaskController { ...@@ -81,7 +81,7 @@ public class TaskController {
List<Integer> error = new ArrayList<>(); List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){ for (final JsonNode item : para){
Integer id = item.asInt(); Integer id = item.asInt();
JobResult result = taskService.submitByTaskId(id); JobResult result = taskService.submitTask(id);
if(!result.isSuccess()){ if(!result.isSuccess()){
error.add(id); error.add(id);
} }
...@@ -127,7 +127,7 @@ public class TaskController { ...@@ -127,7 +127,7 @@ public class TaskController {
*/ */
@GetMapping(value = "/releaseTask") @GetMapping(value = "/releaseTask")
public Result releaseTask(@RequestParam Integer id) { public Result releaseTask(@RequestParam Integer id) {
return Result.succeed(taskService.releaseTask(id),"操作成功"); return taskService.releaseTask(id);
} }
/** /**
...@@ -175,7 +175,7 @@ public class TaskController { ...@@ -175,7 +175,7 @@ public class TaskController {
*/ */
@GetMapping(value = "/restartTask") @GetMapping(value = "/restartTask")
public Result restartTask(@RequestParam Integer id) { public Result restartTask(@RequestParam Integer id) {
return Result.succeed(taskService.restartByTaskId(id),"操作成功"); return Result.succeed(taskService.restartTask(id),"操作成功");
} }
} }
package com.dlink.service; package com.dlink.service;
import com.dlink.common.result.Result;
import com.dlink.db.service.ISuperService; import com.dlink.db.service.ISuperService;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.model.JobInfoDetail; import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance; import com.dlink.model.JobInstance;
import com.dlink.model.Task; import com.dlink.model.Task;
import com.dlink.result.SqlExplainResult;
import java.util.List; import java.util.List;
...@@ -17,9 +20,11 @@ import java.util.List; ...@@ -17,9 +20,11 @@ import java.util.List;
*/ */
public interface TaskService extends ISuperService<Task> { public interface TaskService extends ISuperService<Task> {
JobResult submitByTaskId(Integer id); JobResult submitTask(Integer id);
JobResult restartByTaskId(Integer id); JobResult restartTask(Integer id);
List<SqlExplainResult> explainTask(Integer id);
Task getTaskInfoById(Integer id); Task getTaskInfoById(Integer id);
...@@ -31,7 +36,7 @@ public interface TaskService extends ISuperService<Task> { ...@@ -31,7 +36,7 @@ public interface TaskService extends ISuperService<Task> {
Task getUDFByClassName(String className); Task getUDFByClassName(String className);
boolean releaseTask(Integer id); Result releaseTask(Integer id);
boolean developTask(Integer id); boolean developTask(Integer id);
......
...@@ -8,6 +8,7 @@ import com.dlink.alert.AlertResult; ...@@ -8,6 +8,7 @@ import com.dlink.alert.AlertResult;
import com.dlink.assertion.Assert; import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.assertion.Tips; import com.dlink.assertion.Tips;
import com.dlink.common.result.Result;
import com.dlink.config.Dialect; import com.dlink.config.Dialect;
import com.dlink.constant.FlinkRestResultConstant; import com.dlink.constant.FlinkRestResultConstant;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
...@@ -25,6 +26,7 @@ import com.dlink.mapper.TaskMapper; ...@@ -25,6 +26,7 @@ import com.dlink.mapper.TaskMapper;
import com.dlink.metadata.driver.Driver; import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.metadata.result.JdbcSelectResult;
import com.dlink.model.*; import com.dlink.model.*;
import com.dlink.result.SqlExplainResult;
import com.dlink.service.*; import com.dlink.service.*;
import com.dlink.utils.CustomStringJavaCompiler; import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
...@@ -81,7 +83,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -81,7 +83,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
@Override @Override
public JobResult submitByTaskId(Integer id) { public JobResult submitTask(Integer id) {
Task task = this.getTaskInfoById(id); Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST); Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if (Dialect.isSql(task.getDialect())) { if (Dialect.isSql(task.getDialect())) {
...@@ -98,7 +100,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -98,7 +100,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
@Override @Override
public JobResult restartByTaskId(Integer id) { public JobResult restartTask(Integer id) {
Task task = this.getTaskInfoById(id); Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST); Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if(Asserts.isNotNull(task.getJobInstanceId())&&task.getJobInstanceId()!=0){ if(Asserts.isNotNull(task.getJobInstanceId())&&task.getJobInstanceId()!=0){
...@@ -150,6 +152,42 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -150,6 +152,42 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
} }
@Override
public List<SqlExplainResult> explainTask(Integer id) {
Task task = getTaskInfoById(id);
if (Dialect.isSql(task.getDialect())) {
return explainCommonSqlTask(task);
} else {
return explainFlinkSqlTask(task);
}
}
private List<SqlExplainResult> explainFlinkSqlTask(Task task) {
JobConfig config = buildJobConfig(task);
config.buildLocal();
JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.explainSql(task.getStatement()).getSqlExplainResults();
}
private List<SqlExplainResult> explainCommonSqlTask(Task task) {
if (Asserts.isNull(task.getDatabaseId())) {
return new ArrayList<SqlExplainResult>() {{
add(SqlExplainResult.fail(task.getStatement(), "请指定数据源"));
}};
} else {
DataBase dataBase = dataBaseService.getById(task.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return new ArrayList<SqlExplainResult>() {{
add(SqlExplainResult.fail(task.getStatement(), "数据源不存在"));
}};
}
Driver driver = Driver.build(dataBase.getDriverConfig());
List<SqlExplainResult> sqlExplainResults = driver.explain(task.getStatement());
driver.close();
return sqlExplainResults;
}
}
@Override @Override
public Task getTaskInfoById(Integer id) { public Task getTaskInfoById(Integer id) {
Task task = this.getById(id); Task task = this.getById(id);
...@@ -248,14 +286,24 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -248,14 +286,24 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
@Override @Override
public boolean releaseTask(Integer id) { public Result releaseTask(Integer id) {
Task task = getById(id); Task task = getById(id);
Assert.check(task); Assert.check(task);
if (JobLifeCycle.DEVELOP.equalsValue(task.getStep())) { if (JobLifeCycle.DEVELOP.equalsValue(task.getStep())) {
List<SqlExplainResult> sqlExplainResults = explainTask(id);
for(SqlExplainResult sqlExplainResult: sqlExplainResults){
if(!sqlExplainResult.isParseTrue()||!sqlExplainResult.isExplainTrue()){
return Result.failed("语法校验和逻辑检查有误,发布失败");
}
}
task.setStep(JobLifeCycle.RELEASE.getValue()); task.setStep(JobLifeCycle.RELEASE.getValue());
return updateById(task); if(updateById(task)){
return Result.succeed("发布成功");
}else {
return Result.failed("由于未知原因,发布失败");
}
} }
return false; return Result.succeed("发布成功");
} }
@Override @Override
......
...@@ -277,9 +277,11 @@ const StudioMenu = (props: any) => { ...@@ -277,9 +277,11 @@ const StudioMenu = (props: any) => {
onOk: async () => { onOk: async () => {
const res = releaseTask(current.task.id); const res = releaseTask(current.task.id);
res.then((result) => { res.then((result) => {
result.datas && props.changeTaskStep(current.task.id,TASKSTEPS.RELEASE);
if(result.code == CODE.SUCCESS) { if(result.code == CODE.SUCCESS) {
props.changeTaskStep(current.task.id,TASKSTEPS.RELEASE);
message.success(`发布作业【${current.task.alias}】成功`); message.success(`发布作业【${current.task.alias}】成功`);
}else {
message.error(`发布作业【${current.task.alias}】失败,原因:\n${result.msg}`);
} }
}); });
} }
......
...@@ -731,6 +731,9 @@ export default (): React.ReactNode => { ...@@ -731,6 +731,9 @@ export default (): React.ReactNode => {
<li> <li>
<Link>优化 当提交作业无法获取JID时变为提交失败</Link> <Link>优化 当提交作业无法获取JID时变为提交失败</Link>
</li> </li>
<li>
<Link>新增 作业发布时进行语法校验和逻辑检查</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment