Commit 60ca28fd authored by wenmo's avatar wenmo

优化逻辑和扩展openAPI

1.优化sql校验、获取执行图等支持语句集
2.扩展校验sql、计划图、预览数据等openAPI
3.优化 savepoint 的restAPI
parent a5ba689d
......@@ -2,14 +2,12 @@ package com.dlink.controller;
import com.dlink.common.result.Result;
import com.dlink.dto.APIExecuteSqlDTO;
import com.dlink.job.JobResult;
import com.dlink.dto.APIExplainSqlDTO;
import com.dlink.service.APIService;
import com.dlink.service.StudioService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
/**
* APIController
......@@ -24,9 +22,31 @@ public class APIController {
@Autowired
private APIService apiService;
@PostMapping("/executeSql")
@Autowired
private StudioService studioService;
@PostMapping("/executeSql")
public Result executeSql(@RequestBody APIExecuteSqlDTO apiExecuteSqlDTO) {
return Result.succeed(apiService.executeSql(apiExecuteSqlDTO),"执行成功");
}
@PostMapping("/explainSql")
public Result explainSql(@RequestBody APIExplainSqlDTO apiExecuteSqlDTO) {
return Result.succeed(apiService.explainSql(apiExecuteSqlDTO),"执行成功");
}
@PostMapping("/getJobPlan")
public Result getJobPlan(@RequestBody APIExplainSqlDTO apiExecuteSqlDTO) {
return Result.succeed(apiService.getJobPlan(apiExecuteSqlDTO),"执行成功");
}
@PostMapping("/getStreamGraph")
public Result getStreamGraph(@RequestBody APIExplainSqlDTO apiExecuteSqlDTO) {
return Result.succeed(apiService.getStreamGraph(apiExecuteSqlDTO),"执行成功");
}
@GetMapping("/getJobData")
public Result getJobData(@RequestParam String jobId) {
return Result.succeed(studioService.getJobData(jobId),"获取成功");
}
}
......@@ -36,8 +36,6 @@ public class APIExecuteSqlDTO {
private Map<String, String> configuration;
private GatewayConfig gatewayConfig;
private static final ObjectMapper mapper = new ObjectMapper();
public JobConfig getJobConfig() {
Integer savePointStrategy = 0;
if (Asserts.isNotNullString(savePointPath)) {
......
package com.dlink.dto;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.job.JobConfig;
import lombok.Getter;
import lombok.Setter;
import java.util.Map;
/**
* APIExplainSqlDTO
*
* @author wenmo
* @since 2021/12/12 13:01
*/
@Getter
@Setter
public class APIExplainSqlDTO {
private boolean useStatementSet = false;
private boolean fragment = false;
private String statement;
private Integer parallelism;
private Map<String, String> configuration;
public JobConfig getJobConfig() {
return new JobConfig("local", false, false, fragment, useStatementSet, parallelism, configuration);
}
}
......@@ -23,7 +23,7 @@ public class StudioExecuteDTO {
// RUN_MODE
private String type;
private boolean useResult;
private boolean useStatementSet;
private boolean statementSet;
private boolean useSession;
private String session;
private boolean useRemote;
......@@ -58,7 +58,7 @@ public class StudioExecuteDTO {
}
return new JobConfig(
type,useResult, useSession, session, useRemote, clusterId,
clusterConfigurationId,jarId, taskId, jobName, fragment,useStatementSet,
clusterConfigurationId,jarId, taskId, jobName, fragment,statementSet,
maxRowNum, checkPoint, parallelism,savePointStrategy, savePointPath,config);
}
}
......@@ -80,13 +80,6 @@ public class Task extends SuperEntity{
}
return config;
}
/*public ExecutorSetting buildExecutorSetting(){
HashMap configMap = new HashMap();
if(config!=null&&!"".equals(clusterName)) {
configMap = JSONUtil.toBean(config, HashMap.class);
}
return new ExecutorSetting(checkPoint,parallelism,fragment,savePointPath,alias,configMap);
}*/
public JobConfig buildSubmitConfig(){
boolean useRemote = true;
......
package com.dlink.service;
import com.dlink.dto.APIExecuteSqlDTO;
import com.dlink.dto.APIExplainSqlDTO;
import com.dlink.result.APIJobResult;
import com.dlink.result.ExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* APIService
......@@ -12,4 +15,10 @@ import com.dlink.result.APIJobResult;
public interface APIService {
APIJobResult executeSql(APIExecuteSqlDTO apiExecuteSqlDTO);
ExplainResult explainSql(APIExplainSqlDTO apiExplainSqlDTO);
ObjectNode getJobPlan(APIExplainSqlDTO apiExplainSqlDTO);
ObjectNode getStreamGraph(APIExplainSqlDTO apiExplainSqlDTO);
}
package com.dlink.service.impl;
import com.dlink.dto.APIExecuteSqlDTO;
import com.dlink.dto.APIExplainSqlDTO;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.result.APIJobResult;
import com.dlink.result.ExplainResult;
import com.dlink.service.APIService;
import com.dlink.utils.RunTimeUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.stereotype.Service;
/**
......@@ -27,4 +33,39 @@ public class APIServiceImpl implements APIService {
RunTimeUtil.recovery(jobManager);
return apiJobResult;
}
@Override
public ExplainResult explainSql(APIExplainSqlDTO apiExplainSqlDTO) {
JobConfig config = apiExplainSqlDTO.getJobConfig();
JobManager jobManager = JobManager.buildPlanMode(config);
ExplainResult explainResult = jobManager.explainSql(apiExplainSqlDTO.getStatement());
RunTimeUtil.recovery(jobManager);
return explainResult;
}
@Override
public ObjectNode getJobPlan(APIExplainSqlDTO apiExplainSqlDTO) {
JobConfig config = apiExplainSqlDTO.getJobConfig();
JobManager jobManager = JobManager.buildPlanMode(config);
String planJson = jobManager.getJobPlanJson(apiExplainSqlDTO.getStatement());
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(planJson);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
RunTimeUtil.recovery(jobManager);
return objectNode;
}
}
@Override
public ObjectNode getStreamGraph(APIExplainSqlDTO apiExplainSqlDTO) {
JobConfig config = apiExplainSqlDTO.getJobConfig();
JobManager jobManager = JobManager.buildPlanMode(config);
ObjectNode streamGraph = jobManager.getStreamGraph(apiExplainSqlDTO.getStatement());
RunTimeUtil.recovery(jobManager);
return streamGraph;
}
}
......@@ -90,7 +90,7 @@ public class StudioServiceImpl implements StudioService {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.explainSql(studioExecuteDTO.getStatement());
return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
}
@Override
......
/* http://127.0.0.1:8888/openapi/executeSql */
{
/* required-start */
"type":"yarn-session",
"type":"yarn-session", // standalone|local
"address":"10.1.51.24:8081",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
......
/* http://127.0.0.1:8888/openapi/explainSql */
{
/* required-start */
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useStatementSet":false,
"fragment":false,
"parallelism":1,
/* default-start */
/* custom-start */
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}
\ No newline at end of file
/* http://127.0.0.1:8888/openapi/getJobPlan */
{
/* required-start */
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useStatementSet":false,
"fragment":false,
"parallelism":1,
/* default-start */
/* custom-start */
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}
\ No newline at end of file
/* http://127.0.0.1:8888/openapi/getStreamGraph */
{
/* required-start */
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useStatementSet":false,
"fragment":false,
"parallelism":1,
/* default-start */
/* custom-start */
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}
\ No newline at end of file
......@@ -117,7 +117,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
List<Operation> operations = getParser().parse(statement);
......@@ -134,13 +134,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
return streamGraph.getJobGraph();
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
return getStreamGraphFromInserts(statements).getJobGraph();
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = parser.parse(statement);
......
......@@ -117,7 +117,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
List<Operation> operations = getParser().parse(statement);
......@@ -134,13 +134,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
return streamGraph.getJobGraph();
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
return getStreamGraphFromInserts(statements).getJobGraph();
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = parser.parse(statement);
......
......@@ -120,7 +120,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
List<Operation> operations = getParser().parse(statement);
......@@ -137,13 +137,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
return streamGraph.getJobGraph();
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
return getStreamGraphFromInserts(statements).getJobGraph();
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = getParser().parse(statement);
......
......@@ -175,7 +175,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
List<Operation> operations = getParser().parse(statement);
......@@ -192,13 +192,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof DefaultExecutor){
StreamGraph streamGraph = ((DefaultExecutor) execEnv).getExecutionEnvironment().generateStreamGraph(trans);
return streamGraph.getJobGraph();
return ((DefaultExecutor) execEnv).getExecutionEnvironment().generateStreamGraph(trans);
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
return getStreamGraphFromInserts(statements).getJobGraph();
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = getParser().parse(statement);
......
package com.dlink.result;
import java.util.List;
/**
* ExplainResult
*
* @author wenmo
* @since 2021/12/12 13:11
*/
public class ExplainResult {
private boolean correct;
private int total;
private List<SqlExplainResult> sqlExplainResults;
public ExplainResult(boolean correct, int total, List<SqlExplainResult> sqlExplainResults) {
this.correct = correct;
this.total = total;
this.sqlExplainResults = sqlExplainResults;
}
public boolean isCorrect() {
return correct;
}
public void setCorrect(boolean correct) {
this.correct = correct;
}
public int getTotal() {
return total;
}
public void setTotal(int total) {
this.total = total;
}
public List<SqlExplainResult> getSqlExplainResults() {
return sqlExplainResults;
}
public void setSqlExplainResults(List<SqlExplainResult> sqlExplainResults) {
this.sqlExplainResults = sqlExplainResults;
}
}
package com.dlink.result;
import java.time.LocalDateTime;
import java.util.Date;
/**
......@@ -17,7 +18,7 @@ public class SqlExplainResult {
private String error;
private boolean parseTrue;
private boolean explainTrue;
private Date explainTime;
private LocalDateTime explainTime;
public Integer getIndex() {
return index;
......@@ -83,11 +84,11 @@ public class SqlExplainResult {
this.explainTrue = explainTrue;
}
public Date getExplainTime() {
public LocalDateTime getExplainTime() {
return explainTime;
}
public void setExplainTime(Date explainTime) {
public void setExplainTime(LocalDateTime explainTime) {
this.explainTime = explainTime;
}
......
......@@ -2,6 +2,7 @@ package com.dlink.api;
import cn.hutool.http.HttpUtil;
import cn.hutool.http.Method;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkRestAPIConstant;
import com.dlink.constant.NetConstant;
import com.dlink.gateway.GatewayType;
......@@ -106,23 +107,30 @@ public class FlinkAPI {
} catch (JsonProcessingException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String triggerid = json.get("request-id").asText();
while (triggerid != null)
{
while (triggerid != null) {
try {
Thread.sleep(1000);
JsonNode node = get(FlinkRestAPIConstant.JOBS + jobId + FlinkRestAPIConstant.SAVEPOINTS + NetConstant.SLASH + triggerid);
JsonNode operation = node.get("operation");
String location = operation.get("location").toString();
List<JobInfo> jobInfos = new ArrayList<>();
jobInfo.setSavePoint(location);
jobInfos.add(jobInfo);
result.setJobInfos(jobInfos);
break;
String status = node.get("status").get("id").asText();
if(Asserts.isEquals(status,"IN_PROGRESS")){
continue;
}
if(node.get("operation").has("failure-cause")) {
String failureCause = node.get("operation").get("failure-cause").asText();
if (Asserts.isNotNullString(failureCause)) {
result.fail(failureCause);
break;
}
}
if(node.get("operation").has("location")) {
String location = node.get("operation").get("location").asText();
List<JobInfo> jobInfos = new ArrayList<>();
jobInfo.setSavePoint(location);
jobInfos.add(jobInfo);
result.setJobInfos(jobInfos);
break;
}
} catch (Exception e) {
e.printStackTrace();
result.fail(e.getMessage());
......
......@@ -7,7 +7,10 @@ import com.dlink.explainer.ca.*;
import com.dlink.explainer.trans.Trans;
import com.dlink.explainer.trans.TransGenerator;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.job.JobParam;
import com.dlink.job.StatementParam;
import com.dlink.parser.SqlType;
import com.dlink.result.ExplainResult;
import com.dlink.result.SqlExplainResult;
import com.dlink.trans.Operations;
import com.dlink.utils.FlinkUtil;
......@@ -17,6 +20,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.table.catalog.CatalogManager;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
......@@ -30,16 +34,48 @@ import java.util.List;
public class Explainer {
private Executor executor;
private boolean useStatementSet;
private ObjectMapper mapper = new ObjectMapper();
public Explainer(Executor executor) {
this.executor = executor;
}
public Explainer(Executor executor, boolean useStatementSet) {
this.executor = executor;
this.useStatementSet = useStatementSet;
}
public static Explainer build(Executor executor){
return new Explainer(executor);
return new Explainer(executor,false);
}
public static Explainer build(Executor executor, boolean useStatementSet){
return new Explainer(executor,useStatementSet);
}
public JobParam pretreatStatements(String[] statements) {
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
for (String item : statements) {
String statement = executor.pretreatStatement(item);
if (statement.isEmpty()) {
continue;
}
SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
trans.add(new StatementParam(statement, operationType));
if (!useStatementSet) {
break;
}
} else {
ddl.add(new StatementParam(statement, operationType));
}
}
return new JobParam(ddl, trans);
}
@Deprecated
public List<SqlExplainResult> explainSqlResult(String statement) {
String[] sqls = SqlUtil.getStatements(statement);
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
......@@ -69,14 +105,14 @@ public class Explainer {
e.printStackTrace();
record.setError(e.getMessage());
record.setExplainTrue(false);
record.setExplainTime(new Date());
record.setExplainTime(LocalDateTime.now());
record.setSql(sql);
record.setIndex(index);
sqlExplainRecords.add(record);
break;
}
record.setExplainTrue(true);
record.setExplainTime(new Date());
record.setExplainTime(LocalDateTime.now());
record.setSql(sql);
record.setIndex(index++);
sqlExplainRecords.add(record);
......@@ -84,29 +120,120 @@ public class Explainer {
return sqlExplainRecords;
}
public ExplainResult explainSql(String statement) {
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
int index = 1;
boolean correct = true;
for (StatementParam item : jobParam.getDdl()) {
SqlExplainResult record = new SqlExplainResult();
try {
record = executor.explainSqlRecord(item.getValue());
if (Asserts.isNull(record)) {
continue;
}
executor.executeSql(item.getValue());
}catch (Exception e){
e.printStackTrace();
record.setError(e.getMessage());
record.setExplainTrue(false);
record.setExplainTime(LocalDateTime.now());
record.setSql(item.getValue());
record.setIndex(index);
sqlExplainRecords.add(record);
correct = false;
break;
}
record.setExplainTrue(true);
record.setExplainTime(LocalDateTime.now());
record.setSql(item.getValue());
record.setIndex(index++);
sqlExplainRecords.add(record);
}
if (correct && jobParam.getTrans().size() > 0) {
if (useStatementSet) {
SqlExplainResult record = new SqlExplainResult();
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
if (item.getType().equals(SqlType.INSERT)) {
inserts.add(item.getValue());
}
}
if (inserts.size() > 0) {
String sqlSet = String.join(FlinkSQLConstant.SEPARATOR, inserts);
try {
record.setExplain(executor.explainStatementSet(inserts));
record.setParseTrue(true);
record.setExplainTrue(true);
}catch (Exception e){
e.printStackTrace();
record.setError(e.getMessage());
record.setParseTrue(false);
record.setExplainTrue(false);
correct = false;
}finally {
record.setType("Modify DML");
record.setExplainTime(LocalDateTime.now());
record.setSql(sqlSet);
record.setIndex(index);
sqlExplainRecords.add(record);
}
}
}else{
for (StatementParam item : jobParam.getTrans()) {
SqlExplainResult record = new SqlExplainResult();
try {
record.setExplain(executor.explainSql(item.getValue()));
record.setParseTrue(true);
record.setExplainTrue(true);
}catch (Exception e){
e.printStackTrace();
record.setError(e.getMessage());
record.setParseTrue(false);
record.setExplainTrue(false);
correct = false;
}finally {
record.setType("Modify DML");
record.setExplainTime(LocalDateTime.now());
record.setSql(item.getValue());
record.setIndex(index++);
sqlExplainRecords.add(record);
}
}
}
}
return new ExplainResult(correct,sqlExplainRecords.size(),sqlExplainRecords);
}
public ObjectNode getStreamGraph(String statement){
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults();
List<String> strPlans = new ArrayList<>();
for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) {
strPlans.add(item.getSql());
String[] statements = SqlUtil.getStatements(item.getSql());
for(String str : statements){
strPlans.add(str);
}
}
}
if(strPlans.size()>0){
return translateObjectNode(strPlans.get(0));
return executor.getStreamGraph(strPlans);
}else{
return mapper.createObjectNode();
}
}
public JobPlanInfo getJobPlanInfo(String statement){
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults();
List<String> strPlans = new ArrayList<>();
for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) {
strPlans.add(item.getSql());
String[] statements = SqlUtil.getStatements(item.getSql());
for(String str : statements){
strPlans.add(str);
}
}
}
if(strPlans.size()>0){
......
......@@ -49,8 +49,18 @@ public class JobConfig {
public JobConfig() {
}
public JobConfig(String type, boolean useSession, boolean useRemote, boolean useSqlFragment, boolean useStatementSet, Integer parallelism, Map<String, String> config) {
this.type = type;
this.useSession = useSession;
this.useRemote = useRemote;
this.useSqlFragment = useSqlFragment;
this.useStatementSet = useStatementSet;
this.parallelism = parallelism;
this.config = config;
}
public JobConfig(String type, boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId,Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint, Integer parallelism,
Integer savePointStrategyValue, String savePointPath, Map<String,String> config) {
this.type = type;
......
......@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
......@@ -209,7 +210,8 @@ public class JobManager {
JobContextHolder.setJob(job);
ready();
String currentSql = "";
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
// JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
JobParam jobParam = Explainer.build(executor,useStatementSet).pretreatStatements(SqlUtil.getStatements(statement));
try {
for (StatementParam item : jobParam.getDdl()) {
currentSql = item.getValue();
......@@ -313,27 +315,6 @@ public class JobManager {
}
}
private JobParam pretreatStatements(String[] statements) {
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
for (String item : statements) {
String statement = executor.pretreatStatement(item);
if (statement.isEmpty()) {
continue;
}
SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
trans.add(new StatementParam(statement, operationType));
if (!useStatementSet) {
break;
}
} else {
ddl.add(new StatementParam(statement, operationType));
}
}
return new JobParam(ddl, trans);
}
public IResult executeDDL(String statement) {
String[] statements = SqlUtil.getStatements(statement);
try {
......@@ -381,16 +362,16 @@ public class JobManager {
return SessionPool.filter(createUser);
}
public List<SqlExplainResult> explainSql(String statement) {
return Explainer.build(executor).explainSqlResult(statement);
public ExplainResult explainSql(String statement) {
return Explainer.build(executor,useStatementSet).explainSql(statement);
}
public ObjectNode getStreamGraph(String statement) {
return Explainer.build(executor).getStreamGraph(statement);
return Explainer.build(executor,useStatementSet).getStreamGraph(statement);
}
public String getJobPlanJson(String statement) {
return Explainer.build(executor).getJobPlanInfo(statement).getJsonPlan();
return Explainer.build(executor,useStatementSet).getJobPlanInfo(statement).getJsonPlan();
}
public boolean cancel(String jobId) {
......
......@@ -38,45 +38,6 @@ public class SelectResultBuilder implements ResultBuilder {
}else{
return SelectResult.buildFailed();
}
/*String jobId = null;
if (tableResult.getJobClient().isPresent()) {
jobId = tableResult.getJobClient().get().getJobID().toHexString();
}
List<TableColumn> columns = tableResult.getTableSchema().getTableColumns();
int totalCount = 0;
Set<String> column = new LinkedHashSet();
String[] columnNames = columns.stream().map(TableColumn::getName).map(s -> s.replace(" ", "")).toArray((x$0) -> {
return (new String[x$0]);
});
if (printRowKind) {
columnNames = Stream.concat(Stream.of("op"), Arrays.stream(columnNames)).toArray((x$0) -> {
return new String[x$0];
});
}
long numRows = 0L;
List<Map<String, Object>> rows = new ArrayList<>();
Iterator<Row> it = tableResult.collect();
while (it.hasNext()) {
if (numRows < maxRowNum) {
String[] cols = rowToString(it.next());
Map<String, Object> row = new HashMap<>();
for (int i = 0; i < cols.length; i++) {
if (i > columnNames.length) {
column.add("UKN" + i);
row.put("UKN" + i, cols[i]);
} else {
column.add(columnNames[i]);
row.put(columnNames[i], cols[i]);
}
}
rows.add(row);
} else {
break;
}
numRows++;
totalCount++;
}
return new SelectResult(rows, totalCount, rows.size(), column, jobId, true);*/
}
}
......@@ -5,11 +5,15 @@ import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.custom.CustomTableResultImpl;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
......@@ -206,6 +210,21 @@ public abstract class Executor {
}
}
public ObjectNode getStreamGraph(List<String> statements){
StreamGraph streamGraph = stEnvironment.getStreamGraphFromInserts(statements);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
return objectNode;
}
}
public JobPlanInfo getJobPlanInfo(List<String> statements){
return stEnvironment.getJobPlanInfo(statements);
}
......@@ -238,6 +257,14 @@ public abstract class Executor {
return statementSet.execute();
}
public String explainStatementSet(List<String> statements){
StatementSet statementSet = stEnvironment.createStatementSet();
for (String item : statements) {
statementSet.addInsertSql(item);
}
return statementSet.explain();
}
public void submitSql(String statements){
executeSql(statements);
}
......
import {Effect, Reducer} from "umi";
import type {Effect, Reducer} from "umi";
import {
handleAddOrUpdate
} from "@/components/Common/crud";
import {SqlMetaData} from "@/components/Studio/StudioEvent/data";
import type {SqlMetaData} from "@/components/Studio/StudioEvent/data";
export type ClusterType = {
id: number,
......@@ -67,7 +67,7 @@ export type TaskType = {
clusterName?: string,
clusterConfigurationId?: number,
clusterConfigurationName?: string,
jarId?:number,
jarId?: number,
note?: string,
enabled?: boolean,
createTime?: Date,
......@@ -76,9 +76,9 @@ export type TaskType = {
session: string;
maxRowNum: number;
jobName: string;
useResult:boolean;
useSession:boolean;
useRemote:boolean;
useResult: boolean;
useSession: boolean;
useRemote: boolean;
};
export type ConsoleType = {
......@@ -94,8 +94,8 @@ export type TabsItemType = {
task?: TaskType;
console: ConsoleType;
monaco?: any;
isModified:boolean;
sqlMetaData?:SqlMetaData;
isModified: boolean;
sqlMetaData?: SqlMetaData;
}
export type TabsType = {
......@@ -109,7 +109,7 @@ export type ConnectorType = {
export type SessionType = {
session?: string;
sessionConfig?:{
sessionConfig?: {
type?: string;
useRemote?: boolean;
clusterId?: number;
......@@ -136,10 +136,10 @@ export type StateType = {
currentPath?: string[];
tabs?: TabsType;
session?: SessionType[];
result?:{};
result?: {};
rightClickMenu?: boolean;
refs?:{
history:any;
refs?: {
history: any;
};
};
......@@ -271,7 +271,7 @@ const Model: ModelType = {
effects: {
* saveTask({payload}, {call, put}) {
let para = payload;
const para = payload;
para.configJson = JSON.stringify(payload.config);
yield call(handleAddOrUpdate, 'api/task', para);
yield put({
......@@ -299,8 +299,8 @@ const Model: ModelType = {
};
},
saveSql(state, {payload}) {
const tabs = state.tabs;
let newCurrent = state.current;
const {tabs} = state;
const newCurrent = state.current;
newCurrent.value = payload;
for (let i = 0; i < tabs.panes.length; i++) {
if (tabs.panes[i].key == tabs.activeKey) {
......@@ -333,8 +333,8 @@ const Model: ModelType = {
};
},
saveSqlMetaData(state, {payload}) {
let newCurrent = state.current;
let newTabs = state.tabs;
const newCurrent = state.current;
const newTabs = state.tabs;
if(newCurrent.key == payload.activeKey){
newCurrent.sqlMetaData = payload.sqlMetaData;
newCurrent.isModified = payload.isModified;
......@@ -371,14 +371,14 @@ const Model: ModelType = {
};
},
deleteTabByKey(state, {payload}) {
let newTabs = state.tabs;
const newTabs = state.tabs;
for (let i = 0; i < newTabs.panes.length; i++) {
if (newTabs.panes[i].key == payload) {
newTabs.panes.splice(i, 1);
break;
}
}
let newCurrent = newTabs.panes[newTabs.panes.length - 1];
const newCurrent = newTabs.panes[newTabs.panes.length - 1];
if (newTabs.activeKey == payload) {
newTabs.activeKey = newCurrent.key;
}
......@@ -393,7 +393,7 @@ const Model: ModelType = {
};
},
changeActiveKey(state, {payload}) {
let tabs = state.tabs;
const {tabs} = state;
tabs.activeKey = payload;
let newCurrent = state.current;
for (let i = 0; i < tabs.panes.length; i++) {
......@@ -413,7 +413,7 @@ const Model: ModelType = {
};
},
saveTaskData(state, {payload}) {
let newTabs = state.tabs;
const newTabs = state.tabs;
for (let i = 0; i < newTabs.panes.length; i++) {
if (newTabs.panes[i].key == newTabs.activeKey) {
newTabs.panes[i].task = payload;
......
......@@ -470,10 +470,10 @@ export default (): React.ReactNode => {
<Link>优化所有模式的所有功能的执行逻辑</Link>
</li>
<li>
<Link>新增 trigger 的 restAPI 实现</Link>
<Link>新增 SavePoint 的 restAPI 实现</Link>
</li>
<li>
<Link>新增 OpenAPI 的执行 sql 接口</Link>
<Link>新增 OpenAPI 的执行sql、校验sql、获取计划图、获取StreamGraph、获取预览数据接口</Link>
</li>
</ul>
</Paragraph>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment