Commit ea7550b3 authored by godkaikai's avatar godkaikai

获取JobPlan

parent f342f8f7
...@@ -7,6 +7,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; ...@@ -7,6 +7,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
...@@ -114,6 +116,10 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -114,6 +116,10 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} }
} }
public JobPlanInfo getJobPlanInfo(List<String> statements) {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}
public JobGraph getJobGraphFromInserts(List<String> statements) { public JobGraph getJobGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList(); List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){ for(String statement : statements){
......
...@@ -14,6 +14,7 @@ import com.dlink.utils.FlinkUtil; ...@@ -14,6 +14,7 @@ import com.dlink.utils.FlinkUtil;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -99,6 +100,22 @@ public class Explainer { ...@@ -99,6 +100,22 @@ public class Explainer {
} }
} }
public JobPlanInfo getJobPlanInfo(String statement){
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>();
for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) {
strPlans.add(item.getSql());
}
}
if(strPlans.size()>0){
return executor.getJobPlanInfo(strPlans);
}else{
return new JobPlanInfo("");
}
}
private List<TableCAResult> generateTableCA(String statement, boolean onlyTable) { private List<TableCAResult> generateTableCA(String statement, boolean onlyTable) {
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement); List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>(); List<String> strPlans = new ArrayList<>();
......
...@@ -6,6 +6,7 @@ import com.dlink.explainer.ca.ColumnCAResult; ...@@ -6,6 +6,7 @@ import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.TableCAResult; import com.dlink.explainer.ca.TableCAResult;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -80,4 +81,8 @@ public class FlinkSqlPlus { ...@@ -80,4 +81,8 @@ public class FlinkSqlPlus {
return executor.getStreamGraph(statement); return executor.getStreamGraph(statement);
} }
public JobPlanInfo getJobPlanInfo(String statement) {
return explainer.getJobPlanInfo(statement);
}
} }
...@@ -9,6 +9,7 @@ import com.dlink.job.JobManager; ...@@ -9,6 +9,7 @@ import com.dlink.job.JobManager;
import com.dlink.parser.SingleSqlParserFactory; import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.plus.FlinkSqlPlus; import com.dlink.plus.FlinkSqlPlus;
import com.dlink.result.SubmitResult; import com.dlink.result.SubmitResult;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -22,39 +23,5 @@ import java.util.Map; ...@@ -22,39 +23,5 @@ import java.util.Map;
* @since 2021/6/23 10:37 * @since 2021/6/23 10:37
**/ **/
public class FlinkSqlPlusTest { public class FlinkSqlPlusTest {
@Test
public void tableCATest(){
String sql1 ="CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (sid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://10.1.51.25:3306/dataxweb?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" +
" 'username'='dfly',\n" +
" 'password'='Dareway',\n" +
" 'table-name' = 'student'\n" +
")";
String sql2 ="CREATE TABLE man (\n" +
" pid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (pid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://10.1.51.25:3306/dataxweb?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" +
" 'username'='dfly',\n" +
" 'password'='Dareway',\n" +
" 'table-name' = 'man'\n" +
")";
String sql3 = "INSERT INTO man SELECT sid as pid,name from student";
List<String> sqls = new ArrayList<>();
sqls.add(sql1);
sqls.add(sql2);
sqls.add(sql3);
FlinkSqlPlus plus = FlinkSqlPlus.build();
// List<TableCAResult> tableCAResults = plus.explainSqlTableColumnCA(String.join(";", sqls));
// List<TableCANode> tableCANodes = CABuilder.getOneTableCASByStatement(String.join(";", sqls));
List<TableCANode> tableCANodes = CABuilder.getOneTableColumnCAByStatement(String.join(";", sqls));
System.out.println(tableCANodes.toString());
}
} }
...@@ -8,6 +8,7 @@ import com.dlink.result.SqlExplainResult; ...@@ -8,6 +8,7 @@ import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.StatementSet;
...@@ -205,6 +206,10 @@ public abstract class Executor { ...@@ -205,6 +206,10 @@ public abstract class Executor {
} }
} }
public JobPlanInfo getJobPlanInfo(List<String> statements){
return stEnvironment.getJobPlanInfo(statements);
}
public void registerFunction(String name, ScalarFunction function){ public void registerFunction(String name, ScalarFunction function){
stEnvironment.registerFunction(name,function); stEnvironment.registerFunction(name,function);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment