Commit 0d660379 authored by wenmo's avatar wenmo

血缘分析代码优化

parent 88912918
......@@ -63,7 +63,7 @@ public class StudioController {
public Result getCAByStatement(@RequestBody StudioCADTO studioCADTO) {
switch (studioCADTO.getType()){
case 1:return Result.succeed(studioService.getOneTableColumnCAByStatement(studioCADTO.getStatement()),"执行成功");
// case 2:return Result.succeed(studioService.getColumnCAByStatement(studioCADTO.getStatement()),"执行成功");
case 2:return Result.succeed(studioService.getColumnCAByStatement(studioCADTO.getStatement()),"执行成功");
default:return Result.failed("敬请期待");
}
}
......
......@@ -10,10 +10,28 @@ import com.dlink.exception.JobException;
*/
public class Asserts {
public static boolean checkNotNull(Object object){
public static boolean isNotNull(Object object){
return object!=null;
}
public static boolean isNull(Object object){
return object==null;
}
public static boolean isNullString(String str){
return str==null||"".equals(str);
}
public static boolean isEquals(String str1,String str2){
if(isNull(str1)&&isNull(str2)){
return true;
}else if(isNull(str1)||isNull(str2)){
return false;
}else{
return str1.equals(str2);
}
}
public static void checkNull(String key,String msg) {
if (key == null||"".equals(key)) {
throw new JobException(msg);
......
......@@ -20,10 +20,6 @@ public interface FlinkConstant {
* flink加载因子
*/
Double DEFAULT_FACTOR = 0.75;
/**
* flink运行节点
*/
String FLINK_JOB_MANAGER_HOST = "flinkJobManagerHOST";
/**
* 本地模式host
*/
......
package com.dlink.constant;
/**
* flink任务的常量
*/
public interface FlinkJobConstant {
/**
* flink job id
*/
String FLINK_JOB_ID = "jobId";
/**
* flink job error
*/
String FLINK_JOB_ERROR = "error";
/**
* 默认空串
*/
String DEFAULT_EMPTY = "";
/**
* 默认端口
*/
int DEFAULT_PORT = 8081;
}
......@@ -8,75 +8,15 @@ package com.dlink.constant;
**/
public interface FlinkSQLConstant {
/**
* 查询
*/
String SELECT = "SELECT";
/**
* 创建
*/
String CREATE = "CREATE";
/**
* 删除
*/
String DROP = "DROP";
/**
* 修改
*/
String ALTER = "ALTER";
/**
* 插入
*/
String INSERT = "INSERT";
/**
* DESCRIBE
*/
String DESCRIBE = "DESCRIBE";
/**
* EXPLAIN
*/
String EXPLAIN = "EXPLAIN";
/**
* USE
*/
String USE = "USE";
/**
* SHOW
*/
String SHOW = "SHOW";
/**
* LOAD
*/
String LOAD = "LOAD";
/**
* UNLOAD
*/
String UNLOAD = "UNLOAD";
/**
* SET
*/
String SET = "SET";
/**
* RESET
*/
String RESET = "RESET";
/**
* 未知操作类型
*/
String UNKNOWN = "UNKNOWN";
/**
* 查询时null对应的值
*/
String NULL_COLUMNS = "";
/**
* 创建聚合表 CREATEAGGTABLE
* 分隔符
*/
String CREATE_AGG_TABLE = "CREATEAGGTABLE";
String SEPARATOR = ";";
/**
* 删除表语句的头部 DROP TABLE IF EXISTS
* DDL 类型
*/
String DROP_TABLE_HEAD = " DROP TABLE IF EXISTS ";
String DDL = "DDL";
/**
* 分隔符
* DML 类型
*/
String SEPARATOR = ";";
String DML = "DML";
}
......@@ -13,10 +13,6 @@ public interface NetConstant {
* 斜杠/
*/
String SLASH = "/";
/**
* Flink默认端口
*/
String PORT = "8081";
/**
* 连接运行服务器超时时间 1000
*/
......
......@@ -138,6 +138,7 @@ public abstract class Executor {
}
stEnvironment = newstEnvironment;
}
public JobExecutionResult execute(String jobName) throws Exception{
return stEnvironment.execute(jobName);
}
......
package com.dlink.explainer;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.Executor;
import com.dlink.explainer.ca.ColumnCAGenerator;
import com.dlink.explainer.ca.ColumnCAResult;
......@@ -10,6 +12,7 @@ import com.dlink.explainer.trans.Trans;
import com.dlink.explainer.trans.TransGenerator;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.result.SqlExplainResult;
import com.dlink.utils.SqlUtil;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.catalog.CatalogManager;
......@@ -36,23 +39,23 @@ public class Explainer {
}
public List<SqlExplainResult> explainSqlResult(String statement, ExplainDetail... extraDetails) {
String[] sqls = statement.split(";");
String[] sqls = SqlUtil.getStatements(statement);
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
for (int i = 0; i < sqls.length; i++) {
SqlExplainResult record = new SqlExplainResult();
try {
if (!FlinkInterceptor.build(executor.getCustomTableEnvironmentImpl(), sqls[i])) {
record = executor.explainSqlRecord(sqls[i], extraDetails);
if ("DDL".equals(record.getType())) {
if (Asserts.isEquals(FlinkSQLConstant.DDL,record.getType())) {
executor.executeSql(sqls[i]);
}
}
}catch (Exception e){
} catch (Exception e) {
e.printStackTrace();
record.setError(e.getMessage());
}finally {
} finally {
record.setExplainTime(new Date());
record.setIndex(i+1);
record.setIndex(i + 1);
record.setSql(sqls[i]);
sqlExplainRecords.add(record);
}
......@@ -60,11 +63,12 @@ public class Explainer {
return sqlExplainRecords;
}
private List<TableCAResult> explainSqlTableCA(String statement,boolean onlyTable) {
private List<TableCAResult> generateTableCA(String statement, boolean onlyTable) {
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>();
for (int i = 0; i < sqlExplainRecords.size(); i++) {
if(sqlExplainRecords.get(i).getType()!=null&&sqlExplainRecords.get(i).getType().contains("DML")){
if (Asserts.isNotNull(sqlExplainRecords.get(i).getType())
&& sqlExplainRecords.get(i).getType().contains(FlinkSQLConstant.DML)) {
strPlans.add(sqlExplainRecords.get(i).getSql());
}
}
......@@ -72,21 +76,21 @@ public class Explainer {
for (int i = 0; i < strPlans.size(); i++) {
List<Trans> trans = translateTrans(translateObjectNode(strPlans.get(i)));
TableCAGenerator generator = new TableCAGenerator(trans);
if(onlyTable) {
if (onlyTable) {
generator.translateOnlyTable();
}else{
} else {
generator.translate();
}
results.add(new TableCAResult(generator));
}
if(results.size()>0){
if (results.size() > 0) {
CatalogManager catalogManager = executor.getCatalogManager();
for (int i = 0; i < results.size(); i++) {
TableCA sinkTableCA = (TableCA)results.get(i).getSinkTableCA();
if(sinkTableCA!=null){
TableCA sinkTableCA = (TableCA) results.get(i).getSinkTableCA();
if (sinkTableCA != null) {
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(sinkTableCA.getCatalog(), sinkTableCA.getDatabase(), sinkTableCA.getTable());
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(objectIdentifier);
if(tableOpt.isPresent()){
if (tableOpt.isPresent()) {
String[] fieldNames = tableOpt.get().getResolvedSchema().getFieldNames();
sinkTableCA.setFields(Arrays.asList(fieldNames));
}
......@@ -96,19 +100,19 @@ public class Explainer {
return results;
}
public List<TableCAResult> explainSqlTableCA(String statement) {
return explainSqlTableCA(statement,true);
public List<TableCAResult> generateTableCA(String statement) {
return generateTableCA(statement, true);
}
public List<TableCAResult> explainSqlTableColumnCA(String statement) {
return explainSqlTableCA(statement,false);
return generateTableCA(statement, false);
}
public List<ColumnCAResult> explainSqlColumnCA(String statement) {
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>();
for (int i = 0; i < sqlExplainRecords.size(); i++) {
if(sqlExplainRecords.get(i).getType().contains("DML")){
if (Asserts.isNotNull(sqlExplainRecords.get(i).getType()) && sqlExplainRecords.get(i).getType().contains("DML")) {
strPlans.add(sqlExplainRecords.get(i).getSql());
}
}
......@@ -125,11 +129,11 @@ public class Explainer {
return results;
}
private ObjectNode translateObjectNode(String strPlans){
private ObjectNode translateObjectNode(String strPlans) {
return executor.getStreamGraph(strPlans);
}
private List<Trans> translateTrans(ObjectNode plan){
private List<Trans> translateTrans(ObjectNode plan) {
return new TransGenerator(plan).translateTrans();
}
......
......@@ -17,7 +17,7 @@ public class CABuilder {
public static List<TableCANode> getOneTableCAByStatement(String statement){
List<TableCANode> tableCANodes = new ArrayList<>();
FlinkSqlPlus plus = FlinkSqlPlus.build();
List<TableCAResult> results = plus.explainSqlTableCA(statement);
List<TableCAResult> results = plus.generateTableCA(statement);
for (int j = 0; j < results.size(); j++) {
TableCAResult result = results.get(j);
TableCANode node = new TableCANode();
......@@ -59,10 +59,10 @@ public class CABuilder {
List<ColumnCAResult> columnCAResults = plus.explainSqlColumnCA(statement);
for (int j = 0; j < columnCAResults.size(); j++) {
ColumnCAResult result = columnCAResults.get(j);
ColumnCANode node = new ColumnCANode();
List<Integer> sinkColumns = result.getSinkColumns();
for (int k = 0; k < sinkColumns.size(); k++) {
ColumnCA columnCA = (ColumnCA)result.getColumnCASMaps().get(sinkColumns.get(k));
ColumnCANode node = new ColumnCANode();
node.setName(columnCA.getAlias());
node.setType(columnCA.getType());
node.setTitle(columnCA.getAlias());
......@@ -70,9 +70,9 @@ public class CABuilder {
List<ColumnCANode> children = new ArrayList<>();
buildColumnCANodeChildren(children,result,sinkColumns.get(k),columnCA.getOperation());
node.setChildren(children);
}
columnCANodes.add(node);
}
}
return columnCANodes;
}
......
......@@ -72,7 +72,10 @@ public class ColumnCAGenerator implements CAGenerator {
buildColumnCAFields(tableCA, tableCA.getParentId(), columnCA);
}
}
} else if (transList.get(i) instanceof SinkTrans) {
}
}
for (int i = 0; i < transList.size(); i++) {
if (transList.get(i) instanceof SinkTrans) {
TableCA tableCA = new TableCA((SinkTrans) transList.get(i));
searchColumnCAId(tableCA);
this.sinkTableCA = tableCA;
......
package com.dlink.interceptor;
import com.dlink.assertion.Asserts;
import com.dlink.catalog.function.FunctionManager;
import com.dlink.catalog.function.UDFunction;
import com.dlink.constant.FlinkFunctionConstant;
......@@ -25,7 +26,7 @@ public class FlinkInterceptor {
public static boolean build( CustomTableEnvironmentImpl stEnvironment,String statemnet){
initFunctions(stEnvironment,statemnet);
Operation operation = Operations.buildOperation(statemnet);
if(operation!=null) {
if(Asserts.isNotNull(operation)) {
operation.build(stEnvironment);
return operation.noExecute();
}
......
......@@ -86,7 +86,7 @@ public class JobManager extends RunTime {
private Executor createExecutorWithSession() {
if(config.isUseSession()) {
ExecutorEntity executorEntity = SessionPool.get(config.getSession());
if (Asserts.checkNotNull(executorEntity)) {
if (Asserts.isNotNull(executorEntity)) {
executor = executorEntity.getExecutor();
config.setSessionConfig(executorEntity.getSessionConfig());
initEnvironmentSetting();
......@@ -225,7 +225,7 @@ public class JobManager extends RunTime {
job.setResult(result);
}
}
if(FlinkSQLConstant.INSERT.equals(operationType)||FlinkSQLConstant.SELECT.equals(operationType)){
if(operationType==SqlType.INSERT||operationType==SqlType.SELECT){
break;
}
}
......
......@@ -68,8 +68,8 @@ public class FlinkSqlPlus {
return explainer.explainSqlTableColumnCA(statement);
}
public List<TableCAResult> explainSqlTableCA(String statement) {
return explainer.explainSqlTableCA(statement);
public List<TableCAResult> generateTableCA(String statement) {
return explainer.generateTableCA(statement);
}
public List<ColumnCAResult> explainSqlColumnCA(String statement) {
......
package com.dlink.trans;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.parser.SqlType;
import com.dlink.trans.ddl.CreateAggTableOperation;
......@@ -24,7 +25,7 @@ public class Operations {
continue;
}
sqlType = Operations.getOperationType(item);
if(FlinkSQLConstant.INSERT.equals(sqlType)||FlinkSQLConstant.SELECT.equals(sqlType)){
if(sqlType == SqlType.INSERT ||sqlType == SqlType.SELECT){
return sqlType;
}
}
......@@ -41,30 +42,10 @@ public class Operations {
}
}
return type;
/*if (sqlTrim.startsWith(FlinkSQLConstant.CREATE)) {
return FlinkSQLConstant.CREATE;
}
if (sqlTrim.startsWith(FlinkSQLConstant.ALTER)) {
return FlinkSQLConstant.ALTER;
}
if (sqlTrim.startsWith(FlinkSQLConstant.INSERT)) {
return FlinkSQLConstant.INSERT;
}
if (sqlTrim.startsWith(FlinkSQLConstant.DROP)) {
return FlinkSQLConstant.DROP;
}
if (sqlTrim.startsWith(FlinkSQLConstant.SELECT)) {
return FlinkSQLConstant.SELECT;
}
if (sqlTrim.startsWith(FlinkSQLConstant.SHOW)) {
return FlinkSQLConstant.SHOW;
}
return FlinkSQLConstant.UNKNOWN;*/
}
public static Operation buildOperation(String statement){
statement = statement.replace("\n"," ").replaceAll("\\s{1,}", " ").trim();
String sql = statement.toUpperCase();
String sql = statement.replace("\n"," ").replaceAll("\\s{1,}", " ").trim().toUpperCase();
for (int i = 0; i < operations.length; i++) {
if(sql.startsWith(operations[i].getHandle())){
return operations[i].create(statement);
......
package com.dlink.trans.ddl;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.utils.SqlExtractUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
......
......@@ -52,9 +52,4 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
.select(aggTable.getColumns());
stEnvironment.registerTable(aggTable.getName(), sink);
}
/*@Override
public boolean noExecute(){
return true;
}*/
}
package com.dlink.utils;
import java.util.ArrayList;
import java.util.List;
/**
* TODO
*
* @author wenmo
* @since 2021/6/13 21:09
*/
public class SqlExtractUtil {
private String[] KEY_WORDS = {
"AGG",
"AGGTABLE",
"AGGTABLES",
"AND",
"AS",
"BY",
"CREATE",
"DROP",
"FRAGMENT",
"FRAGMENTS",
"FROM",
"GROUP",
"LIMIT",
"OR",
"ORDER",
"SELECT",
"SHOW",
"TABLE",
"TABLES",
"VIEW",
"VIEWS",
"WHERE"
};
/**
* 获取固定分割符之间的内容 如 select a.name ,b.name from 中select和from之间的内容
*
* @param original
* @param upperStr
* @param start
* @param end
* @return
*/
public static String getSubStringPara(String original, String upperStr, String beforeStr, String start, String end, String secondEnd, boolean isStopTrim) {
int beforeIndex = upperStr.indexOf(" " + beforeStr + " ");
int startIndex = upperStr.indexOf(" " + start + " ", beforeIndex);
if (startIndex < 0) {
return "";
}
if (end.length() == 0) {
String resWitgout = original.substring(start.length() + startIndex + 1);
return isStopTrim ? resWitgout: resWitgout.trim();
}
int endIndex = upperStr.indexOf(" " + end + " ", startIndex);
if (endIndex < 0) {
endIndex = upperStr.indexOf(" " + secondEnd + " ", startIndex);
}
if (endIndex < 0) {
return "";
}
String res = original.substring(start.length() + startIndex + 1, endIndex);
return isStopTrim ? res: res.trim();
}
/**
* 处理一类固定分隔符的工具 例如条件语句 s1.name = s2.name and s1.age = s2.age And s1.sex=s2.sex
* 转换成 list = ["s1.name = s2.name", "and s1.age = s2", "And s1.sex=s2.sex"]
*
* @param original
* @param upperStr
* @param start
* @param end
* @param secondLayer
* @return
*/
public static List<String> getSubStringList(String original, String upperStr, String start, String end, String secondLayer) {
String subStringPara = getSubStringPara(original, upperStr, start, start, end, "", false);
List<String> list = new ArrayList<>();
if (subStringPara.length() == 0) {
return list;
}
String str[] = subStringPara.split(" " + secondLayer.toUpperCase() + " ");
for (int i = 0; i < str.length; i++) {
String[] split = str[i].split(" " + secondLayer.toLowerCase() + " ");
for (int j = 0; j < split.length; j++) {
if (split[j].replace(" ", "").length() > 0) {
list.add(split[j]);
}
}
}
return list;
}
}
package com.dlink.utils;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
/**
* SqlUtil
*
* @author wenmo
* @since 2021/7/14 21:57
*/
public class SqlUtil {
public static String[] getStatements(String sql){
if(Asserts.isNullString(sql)){
return new String[0];
}
return sql.split(FlinkSQLConstant.SEPARATOR);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment