Commit 06be59e1 authored by wenmo's avatar wenmo

0.2.2 aggtable

parent fdba1fd3
...@@ -74,6 +74,21 @@ ...@@ -74,6 +74,21 @@
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-core</artifactId> <artifactId>dlink-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<!--<scope>provided</scope>-->
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
package com.dlink.executor.custom; package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...@@ -16,6 +17,10 @@ import org.apache.flink.table.delegation.ExecutorFactory; ...@@ -16,6 +17,10 @@ import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory; import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
...@@ -267,4 +272,22 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -267,4 +272,22 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public boolean checkShowFragments(String sql){ public boolean checkShowFragments(String sql){
return sqlManager.checkShowFragments(sql); return sqlManager.checkShowFragments(sql);
} }
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
}
public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
}
public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
} }
...@@ -31,12 +31,17 @@ ...@@ -31,12 +31,17 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId> <artifactId>dlink-client-1.12</artifactId>
<!--<scope>provided</scope>--> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId> <artifactId>dlink-connector-jdbc</artifactId>
<!--<scope>provided</scope>--> <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
package com.dlink.catalog.function;
import com.dlink.constant.FlinkFunctionConstant;
import com.dlink.ud.udf.GetKey;
import com.dlink.ud.udtaf.RowsToMap;
import com.dlink.ud.udtaf.Top2;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.HashMap;
import java.util.Map;
/**
* FunctionManager
*
* @author wenmo
* @since 2021/6/14 21:19
*/
public class FunctionManager {
private static Map<String,UDFunction> functions = new HashMap<String,UDFunction>(){
{
put(FlinkFunctionConstant.GET_KEY,
new UDFunction(FlinkFunctionConstant.GET_KEY,
UDFunction.UDFunctionType.Scalar,
new GetKey()));
put(FlinkFunctionConstant.TO_MAP,
new UDFunction(FlinkFunctionConstant.TO_MAP,
UDFunction.UDFunctionType.TableAggregate,
new RowsToMap()));
put(FlinkFunctionConstant.TOP2,
new UDFunction(FlinkFunctionConstant.TOP2,
UDFunction.UDFunctionType.TableAggregate,
new Top2()));
}
};
public static Map<String,UDFunction> getUsedFunctions(String statement){
Map<String,UDFunction> map = new HashMap<>();
String sql = statement.toLowerCase();
for (Map.Entry<String, UDFunction> entry : functions.entrySet()) {
if(sql.contains(entry.getKey().toLowerCase())){
map.put(entry.getKey(),entry.getValue());
}
}
return map;
}
}
package com.dlink.catalog.function;
import org.apache.flink.table.functions.FunctionDefinition;
/**
* TODO
*
* @author wenmo
* @since 2021/6/14 22:14
*/
public class UDFunction {
public enum UDFunctionType {
Scalar, Table, Aggregate, TableAggregate
}
private String name;
private UDFunctionType type;
private FunctionDefinition function;
public UDFunction(String name, UDFunctionType type, FunctionDefinition function) {
this.name = name;
this.type = type;
this.function = function;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public UDFunctionType getType() {
return type;
}
public void setType(UDFunctionType type) {
this.type = type;
}
public FunctionDefinition getFunction() {
return function;
}
public void setFunction(FunctionDefinition function) {
this.function = function;
}
}
...@@ -4,9 +4,13 @@ public interface FlinkFunctionConstant { ...@@ -4,9 +4,13 @@ public interface FlinkFunctionConstant {
/** /**
* TO_MAP 函数 * TO_MAP 函数
*/ */
String TO_MAP = "TO_MAP"; String TO_MAP = "to_map";
/** /**
* GET_KEY 函数 * GET_KEY 函数
*/ */
String GET_KEY = "GET_KEY"; String GET_KEY = "get_key";
/**
* TOP2 函数
*/
String TOP2 = "top2";
} }
package com.dlink.interceptor; package com.dlink.interceptor;
import com.dlink.catalog.function.FunctionManager;
import com.dlink.catalog.function.UDFunction;
import com.dlink.constant.FlinkFunctionConstant;
import com.dlink.executor.custom.CustomTableEnvironmentImpl; import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.trans.Operation;
import com.dlink.trans.Operations;
import com.dlink.ud.udtaf.RowsToMap;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.functions.*;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/** /**
* TODO * FlinkInterceptor
* *
* @author wenmo * @author wenmo
* @since 2021/6/11 22:17 * @since 2021/6/11 22:17
*/ */
public class FlinkInterceptor { public class FlinkInterceptor {
public static void build( CustomTableEnvironmentImpl stEnvironment){ public static boolean build( CustomTableEnvironmentImpl stEnvironment,String statemnet){
initFunctions(stEnvironment,statemnet);
Operation operation = Operations.buildOperation(statemnet);
if(operation!=null) {
operation.build(stEnvironment);
return operation.noExecute();
}
return false;
} }
private static void initFunctions(CustomTableEnvironmentImpl stEnvironment,String statemnet){
Map<String, UDFunction> usedFunctions = FunctionManager.getUsedFunctions(statemnet);
String[] udfs = stEnvironment.listUserDefinedFunctions();
List<String> udflist = Arrays.asList(udfs);
for (Map.Entry<String, UDFunction> entry : usedFunctions.entrySet()) {
if(!udflist.contains(entry.getKey())){
if( entry.getValue().getType()== UDFunction.UDFunctionType.Scalar){
stEnvironment.registerFunction(entry.getKey(),
(ScalarFunction)entry.getValue().getFunction());
}else if( entry.getValue().getType()== UDFunction.UDFunctionType.Table){
stEnvironment.registerFunction(entry.getKey(),
(TableFunction)entry.getValue().getFunction());
}else if( entry.getValue().getType()== UDFunction.UDFunctionType.Aggregate){
stEnvironment.registerFunction(entry.getKey(),
(AggregateFunction)entry.getValue().getFunction());
}else if( entry.getValue().getType()== UDFunction.UDFunctionType.TableAggregate){
stEnvironment.registerFunction(entry.getKey(),
(TableAggregateFunction)entry.getValue().getFunction());
}
}
}
}
} }
...@@ -5,6 +5,8 @@ import com.dlink.constant.FlinkSQLConstant; ...@@ -5,6 +5,8 @@ import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.EnvironmentSetting; import com.dlink.executor.EnvironmentSetting;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.result.*; import com.dlink.result.*;
import com.dlink.session.ExecutorEntity; import com.dlink.session.ExecutorEntity;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
...@@ -35,25 +37,25 @@ public class JobManager { ...@@ -35,25 +37,25 @@ public class JobManager {
} }
public JobManager(String host) { public JobManager(String host) {
if(host!=null) { if (host != null) {
String[] strs = host.split(":"); String[] strs = host.split(":");
if(strs.length>=2) { if (strs.length >= 2) {
this.flinkHost = strs[0]; this.flinkHost = strs[0];
this.port = Integer.parseInt(strs[1]); this.port = Integer.parseInt(strs[1]);
}else{ } else {
this.flinkHost = strs[0]; this.flinkHost = strs[0];
this.port = 8081; this.port = 8081;
} }
} }
} }
public JobManager(String host,String sessionId, Integer maxRowNum) { public JobManager(String host, String sessionId, Integer maxRowNum) {
if(host!=null) { if (host != null) {
String[] strs = host.split(":"); String[] strs = host.split(":");
if(strs.length>=2) { if (strs.length >= 2) {
this.flinkHost = strs[0]; this.flinkHost = strs[0];
this.port = Integer.parseInt(strs[1]); this.port = Integer.parseInt(strs[1]);
}else{ } else {
this.flinkHost = strs[0]; this.flinkHost = strs[0];
this.port = 8081; this.port = 8081;
} }
...@@ -74,16 +76,16 @@ public class JobManager { ...@@ -74,16 +76,16 @@ public class JobManager {
this.port = port; this.port = port;
} }
public RunResult execute(String statement,ExecutorSetting executorSetting) { public RunResult execute(String statement, ExecutorSetting executorSetting) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost,port,executorSetting,executorSetting.getJobName()); RunResult runResult = new RunResult(sessionId, statement, flinkHost, port, executorSetting, executorSetting.getJobName());
Executor executor = null; Executor executor = null;
ExecutorEntity executorEntity = SessionPool.get(sessionId); ExecutorEntity executorEntity = SessionPool.get(sessionId);
if (executorEntity != null) { if (executorEntity != null) {
executor = executorEntity.getExecutor(); executor = executorEntity.getExecutor();
} else { } else {
if(executorSetting.isRemote()) { if (executorSetting.isRemote()) {
executor = Executor.build(new EnvironmentSetting(flinkHost, FlinkConstant.PORT), executorSetting); executor = Executor.build(new EnvironmentSetting(flinkHost, FlinkConstant.PORT), executorSetting);
}else{ } else {
executor = Executor.build(null, executorSetting); executor = Executor.build(null, executorSetting);
} }
SessionPool.push(new ExecutorEntity(sessionId, executor)); SessionPool.push(new ExecutorEntity(sessionId, executor));
...@@ -99,16 +101,19 @@ public class JobManager { ...@@ -99,16 +101,19 @@ public class JobManager {
} }
String operationType = Operations.getOperationType(item); String operationType = Operations.getOperationType(item);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
TableResult tableResult = executor.executeSql(item); CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
if (!FlinkInterceptor.build(stEnvironment, item)) {
TableResult tableResult = executor.executeSql(item);
if (tableResult.getJobClient().isPresent()) {
runResult.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
}
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult);
runResult.setResult(result);
}
long finish = System.currentTimeMillis(); long finish = System.currentTimeMillis();
long timeElapsed = finish - start; long timeElapsed = finish - start;
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult);
runResult.setResult(result);
runResult.setTime(timeElapsed); runResult.setTime(timeElapsed);
runResult.setFinishDate(LocalDateTime.now()); runResult.setFinishDate(LocalDateTime.now());
if(tableResult.getJobClient().isPresent()) {
runResult.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
}
runResult.setSuccess(true); runResult.setSuccess(true);
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -129,45 +134,50 @@ public class JobManager { ...@@ -129,45 +134,50 @@ public class JobManager {
} }
public SubmitResult submit(String statement, ExecutorSetting executorSetting) { public SubmitResult submit(String statement, ExecutorSetting executorSetting) {
if(statement==null||"".equals(statement)){ if (statement == null || "".equals(statement)) {
return SubmitResult.error("FlinkSql语句不存在"); return SubmitResult.error("FlinkSql语句不存在");
} }
String [] statements = statement.split(FlinkSQLConstant.SEPARATOR); String[] statements = statement.split(FlinkSQLConstant.SEPARATOR);
return submit(Arrays.asList(statements),executorSetting); return submit(Arrays.asList(statements), executorSetting);
} }
public SubmitResult submit(List<String> sqlList, ExecutorSetting executorSetting) { public SubmitResult submit(List<String> sqlList, ExecutorSetting executorSetting) {
SubmitResult result = new SubmitResult(sessionId,sqlList,flinkHost,executorSetting.getJobName()); SubmitResult result = new SubmitResult(sessionId, sqlList, flinkHost, executorSetting.getJobName());
int currentIndex = 0; int currentIndex = 0;
try { try {
if (sqlList != null && sqlList.size() > 0) { if (sqlList != null && sqlList.size() > 0) {
EnvironmentSetting environmentSetting = null; EnvironmentSetting environmentSetting = null;
if(executorSetting.isRemote()) { if (executorSetting.isRemote()) {
environmentSetting = new EnvironmentSetting(flinkHost, port); environmentSetting = new EnvironmentSetting(flinkHost, port);
} }
Executor executor = Executor.build(environmentSetting, executorSetting); Executor executor = Executor.build(environmentSetting, executorSetting);
for (String sqlText : sqlList) { for (String sqlText : sqlList) {
currentIndex++; currentIndex++;
String operationType = Operations.getOperationType(sqlText); String operationType = Operations.getOperationType(sqlText);
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
if (operationType.equalsIgnoreCase(FlinkSQLConstant.INSERT)) { if (operationType.equalsIgnoreCase(FlinkSQLConstant.INSERT)) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
TableResult tableResult = executor.executeSql(sqlText); if (!FlinkInterceptor.build(stEnvironment, sqlText)) {
long finish = System.currentTimeMillis(); TableResult tableResult = executor.executeSql(sqlText);
long timeElapsed = finish - start; JobID jobID = tableResult.getJobClient().get().getJobID();
JobID jobID = tableResult.getJobClient().get().getJobID(); long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
InsertResult insertResult = new InsertResult(sqlText, (jobID == null ? "" : jobID.toHexString()), true, timeElapsed, LocalDateTime.now());
result.setResult(insertResult);
result.setJobId((jobID == null ? "" : jobID.toHexString()));
result.setTime(timeElapsed);
}
result.setSuccess(true); result.setSuccess(true);
result.setTime(timeElapsed);
result.setFinishDate(LocalDateTime.now()); result.setFinishDate(LocalDateTime.now());
InsertResult insertResult = new InsertResult(sqlText,(jobID == null ? "" : jobID.toHexString()),true,timeElapsed,LocalDateTime.now());
result.setResult(insertResult);
result.setJobId((jobID == null ? "" : jobID.toHexString()));
} else { } else {
executor.executeSql(sqlText); if (!FlinkInterceptor.build(stEnvironment, sqlText)) {
executor.executeSql(sqlText);
}
} }
} }
} else { } else {
result.setSuccess(false); result.setSuccess(false);
result.setMsg(LocalDateTime.now().toString()+":执行sql语句为空。"); result.setMsg(LocalDateTime.now().toString() + ":执行sql语句为空。");
return result; return result;
} }
} catch (Exception e) { } catch (Exception e) {
......
package com.dlink.parser;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* BaseSingleSqlParser
*
* @author wenmo
* @since 2021/6/14 16:43
*/
public abstract class BaseSingleSqlParser {
//原始Sql语句
protected String originalSql;
//Sql语句片段
protected List<SqlSegment> segments;
/**
* 构造函数,传入原始Sql语句,进行劈分。
**/
public BaseSingleSqlParser(String originalSql) {
this.originalSql = originalSql;
segments = new ArrayList<SqlSegment>();
initializeSegments();
}
/**
* 初始化segments,强制子类实现
**/
protected abstract void initializeSegments();
/**
* 将originalSql劈分成一个个片段
**/
protected Map<String,List<String>> splitSql2Segment() {
Map<String,List<String>> map = new HashMap<>();
for (SqlSegment sqlSegment : segments) {
sqlSegment.parse(originalSql);
if(sqlSegment.getStart()!=null&&!"".equals(sqlSegment.getStart())) {
map.put(sqlSegment.getStart(), sqlSegment.getBodyPieces());
}
}
return map;
}
/**
* 得到解析完毕的Sql语句
**/
public String getParsedSql() {
StringBuffer sb = new StringBuffer();
for (SqlSegment sqlSegment : segments) {
sb.append(sqlSegment.getParsedSqlSegment() + "n");
}
String retval = sb.toString().replaceAll("n+", "n");
return retval;
}
}
package com.dlink.parser;
/**
* CreateAggTableSelectSqlParser
*
* @author wenmo
* @since 2021/6/14 16:56
*/
public class CreateAggTableSelectSqlParser extends BaseSingleSqlParser {
public CreateAggTableSelectSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(create\\s+aggtable)(.+)(as\\s+select)", "[,]"));
segments.add(new SqlSegment("(select)(.+)(from)", "[,]"));
segments.add(new SqlSegment("(from)(.+?)( where | on | having | group\\s+by | order\\s+by | agg\\s+by | ENDOFSQL)", "(,|\\s+left\\s+join\\s+|\\s+right\\s+join\\s+|\\s+inner\\s+join\\s+)"));
segments.add(new SqlSegment("(where|on|having)(.+?)( group\\s+by | order\\s+by | agg\\s+by | ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(group\\s+by)(.+?)( order\\s+by | agg\\s+by | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(order\\s+by)(.+?)( agg\\s+by | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(agg\\s+by)(.+?)( ENDOFSQL)", "[,]"));
}
}
package com.dlink.parser;
/**
* DeleteSqlParser
*
* @author wenmo
* @since 2021/6/14 16:51
*/
public class DeleteSqlParser extends BaseSingleSqlParser {
public DeleteSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(delete\\s+from)(.+)( where | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(where)(.+)( ENDOFSQL)", "(and|or)"));
}
}
package com.dlink.parser;
/**
* InsertSelectSqlParser
*
* @author wenmo
* @since 2021/6/14 16:53
*/
public class InsertSelectSqlParser extends BaseSingleSqlParser {
public InsertSelectSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(insert\\s+into)(.+)( select )", "[,]"));
segments.add(new SqlSegment("(select)(.+)(from)", "[,]"));
segments.add(new SqlSegment("(from)(.+?)( where | on | having | group\\s+by | order\\s+by | ENDOFSQL)", "(,|\\s+left\\s+join\\s+|\\s+right\\s+join\\s+|\\s+inner\\s+join\\s+)"));
segments.add(new SqlSegment("(where|on|having)(.+?)( group\\s+by | order\\s+by | ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(group\\s+by)(.+?)( order\\s+by| ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(order\\s+by)(.+?)( ENDOFSQL)", "[,]"));
}
}
package com.dlink.parser;
/**
* InsertSqlParser
*
* @author wenmo
* @since 2021/6/14 16:54
*/
public class InsertSqlParser extends BaseSingleSqlParser {
public InsertSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(insert\\s+into)(.+?)([(])", "[,]"));
segments.add(new SqlSegment("([(])(.+?)([)]\\s+values\\s+[(])", "[,]"));
segments.add(new SqlSegment("([)]\\s+values\\s+[(])(.+)([)]\\s+ENDOFSQL)", "[,]"));
}
public String getParsedSql() {
String retval = super.getParsedSql();
retval = retval + ")";
return retval;
}
}
package com.dlink.parser;
/**
* SelectSqlParser
*
* @author wenmo
* @since 2021/6/14 16:53
*/
public class SelectSqlParser extends BaseSingleSqlParser {
public SelectSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(select)(.+)(from)", "[,]"));
segments.add(new SqlSegment("(from)(.+?)(where |group\\s+by|having|order\\s+by | ENDOFSQL)", "(,|s+lefts+joins+|s+rights+joins+|s+inners+joins+)"));
segments.add(new SqlSegment("(where)(.+?)(group\\s+by |having| order\\s+by | ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(group\\s+by)(.+?)(having|order\\s+by| ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(having)(.+?)(order\\s+by| ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(order\\s+by)(.+)( ENDOFSQL)", "[,]"));
}
}
package com.dlink.parser;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* SingleSqlParserFactory
*
* @author wenmo
* @since 2021/6/14 16:49
*/
public class SingleSqlParserFactory {
public static Map<String,List<String>> generateParser(String sql) {
BaseSingleSqlParser tmp = null;
// sql = sql.replace("\n"," ").replaceAll("\\s{1,}", " ") +" ENDOFSQL";
sql = sql.replace("\n"," ") +" ENDOFSQL";
if (contains(sql, "(insert\\s+into)(.+)(select)(.+)(from)(.+)")) {
tmp = new InsertSelectSqlParser(sql);
} else if (contains(sql, "(create\\s+aggtable)(.+)(as\\s+select)(.+)")) {
tmp = new CreateAggTableSelectSqlParser(sql);
} else if (contains(sql, "(select)(.+)(from)(.+)")) {
tmp = new SelectSqlParser(sql);
} else if (contains(sql, "(delete\\s+from)(.+)")) {
tmp = new DeleteSqlParser(sql);
} else if (contains(sql, "(update)(.+)(set)(.+)")) {
tmp = new UpdateSqlParser(sql);
} else if (contains(sql, "(insert\\s+into)(.+)(values)(.+)")) {
tmp = new InsertSqlParser(sql);
} else if (contains(sql, "(create\\s+table)(.+)")) {
} else if (contains(sql, "(create\\s+database)(.+)")) {
} else if (contains(sql, "(show\\s+databases)")) {
} else if (contains(sql, "(use)(.+)")) {
} else {
}
return tmp.splitSql2Segment();
}
/**
* 看word是否在lineText中存在,支持正则表达式
*
* @param sql:要解析的sql语句
* @param regExp:正则表达式
* @return
**/
private static boolean contains(String sql, String regExp) {
Pattern pattern = Pattern.compile(regExp, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(sql);
return matcher.find();
}
}
package com.dlink.parser;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* SqlSegment
*
* @author wenmo
* @since 2021/6/14 16:12
*/
public class SqlSegment {
private static final String Crlf = "|";
@SuppressWarnings("unused")
private static final String FourSpace = "  ";
/**
* Sql语句片段类型,大写
**/
private String type;
/**
* Sql语句片段开头部分
**/
private String start;
/**
* Sql语句片段中间部分
**/
private String body;
/**
* Sql语句片段结束部分
**/
private String end;
/**
* 用于分割中间部分的正则表达式
**/
private String bodySplitPattern;
/**
* 表示片段的正则表达式
**/
private String segmentRegExp;
/**
* 分割后的Body小片段
**/
private List<String> bodyPieces;
/**
* 构造函数
*
* @param segmentRegExp 表示这个Sql片段的正则表达式
* @param bodySplitPattern 用于分割body的正则表达式
**/
public SqlSegment(String segmentRegExp, String bodySplitPattern) {
this.type = "";
this.start = "";
this.body = "";
this.end = "";
this.segmentRegExp = segmentRegExp;
this.bodySplitPattern = bodySplitPattern;
this.bodyPieces = new ArrayList<String>();
}
/**
* 从sql中查找符合segmentRegExp的部分,并赋值到start,body,end等三个属性中
**/
public void parse(String sql) {
Pattern pattern = Pattern.compile(segmentRegExp, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(sql);
while (matcher.find()) {
start = matcher.group(1);
body = matcher.group(2);
end = matcher.group(3);
type = start.replace("\n"," ").replaceAll("\\s{1,}", " ").toUpperCase();
parseBody();
}
}
/**
* 解析body部分
**/
private void parseBody() {
List<String> ls = new ArrayList<String>();
Pattern p = Pattern.compile(bodySplitPattern, Pattern.CASE_INSENSITIVE);
body = body.trim();
Matcher m = p.matcher(body);
StringBuffer sb = new StringBuffer();
boolean result = m.find();
while (result) {
m.appendReplacement(sb, Crlf);
result = m.find();
}
m.appendTail(sb);
//ls.add(start);
String[] arr = sb.toString().split("[|]");
int arrLength = arr.length;
for (int i = 0; i < arrLength; i++) {
ls.add(arr[i]);
}
bodyPieces = ls;
}
/**
* 取得解析好的Sql片段
**/
public String getParsedSqlSegment() {
StringBuffer sb = new StringBuffer();
sb.append(start + Crlf);
for (String piece : bodyPieces) {
sb.append(piece + Crlf);
}
return sb.toString();
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getStart() {
return start;
}
public void setStart(String start) {
this.start = start;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public String getEnd() {
return end;
}
public void setEnd(String end) {
this.end = end;
}
public String getBodySplitPattern() {
return bodySplitPattern;
}
public void setBodySplitPattern(String bodySplitPattern) {
this.bodySplitPattern = bodySplitPattern;
}
public String getSegmentRegExp() {
return segmentRegExp;
}
public void setSegmentRegExp(String segmentRegExp) {
this.segmentRegExp = segmentRegExp;
}
public List<String> getBodyPieces() {
return bodyPieces;
}
public void setBodyPieces(List<String> bodyPieces) {
this.bodyPieces = bodyPieces;
}
}
package com.dlink.parser;
/**
* UpdateSqlParser
*
* @author wenmo
* @since 2021/6/14 16:52
*/
public class UpdateSqlParser extends BaseSingleSqlParser {
public UpdateSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(update)(.+)(set)", "[,]"));
segments.add(new SqlSegment("(set)(.+?)( where | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(where)(.+)(ENDOFSQL)", "(and|or)"));
}
}
package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import java.util.Arrays;
import java.util.List;
/**
* AbstractOperation
*
* @author wenmo
* @since 2021/6/14 18:18
*/
public class AbstractOperation {
protected String statement;
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public boolean checkFunctionExist(CustomTableEnvironmentImpl stEnvironment,String key){
String[] udfs = stEnvironment.listUserDefinedFunctions();
List<String> udflist = Arrays.asList(udfs);
if(udflist.contains(key.toLowerCase())){
return true;
}else {
return false;
}
}
public boolean noExecute(){
return true;
}
}
package com.dlink.trans; package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
/** /**
* TODO * TODO
* *
...@@ -8,5 +10,5 @@ package com.dlink.trans; ...@@ -8,5 +10,5 @@ package com.dlink.trans;
*/ */
public interface CreateOperation extends Operation{ public interface CreateOperation extends Operation{
void create(); //void create(CustomTableEnvironmentImpl stEnvironment);
} }
package com.dlink.trans; package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
/** /**
* Operation * Operation
* *
...@@ -10,7 +12,9 @@ public interface Operation { ...@@ -10,7 +12,9 @@ public interface Operation {
String getHandle(); String getHandle();
boolean canHandle(String key); Operation create(String statement);
void build(CustomTableEnvironmentImpl stEnvironment);
void build(); boolean noExecute();
} }
package com.dlink.trans; package com.dlink.trans;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.trans.ddl.CreateAggTableOperation;
/** /**
* SqlUtil * Operations
* *
* @author wenmo * @author wenmo
* @since 2021/5/25 15:50 * @since 2021/5/25 15:50
**/ **/
public class Operations { public class Operations {
private static Operation[] operations = {
new CreateAggTableOperation()
};
public static String getOperationType(String sql) { public static String getOperationType(String sql) {
String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").toUpperCase(); String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").toUpperCase();
if (sqlTrim.startsWith(FlinkSQLConstant.CREATE)) { if (sqlTrim.startsWith(FlinkSQLConstant.CREATE)) {
...@@ -32,4 +37,15 @@ public class Operations { ...@@ -32,4 +37,15 @@ public class Operations {
} }
return FlinkSQLConstant.UNKNOWN_TYPE; return FlinkSQLConstant.UNKNOWN_TYPE;
} }
public static Operation buildOperation(String statement){
statement = statement.replace("\n"," ").replaceAll("\\s{1,}", " ").trim();
String sql = statement.toUpperCase();
for (int i = 0; i < operations.length; i++) {
if(sql.startsWith(operations[i].getHandle())){
return operations[i].create(statement);
}
}
return null;
}
} }
package com.dlink.trans.ddl;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.utils.SqlExtractUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
/**
* AggTable
*
* @author wenmo
* @since 2021/6/13 20:32
*/
public class AggTable {
private String statement;
private String name;
private String columns;
private String table;
private List<String> wheres;
private String groupBy;
private String aggBy;
public AggTable(String statement, String name, String columns, String table, List<String> wheres, String groupBy, String aggBy) {
this.statement = statement;
this.name = name;
this.columns = columns;
this.table = table;
this.wheres = wheres;
this.groupBy = groupBy;
this.aggBy = aggBy;
}
public static AggTable build(String statement){
Map<String,List<String>> map = SingleSqlParserFactory.generateParser(statement);
return new AggTable(statement,
getString(map,"CREATE AGGTABLE"),
getString(map,"SELECT"),
getString(map,"FROM"),
map.get("WHERE"),
getString(map,"GROUP BY"),
getString(map,"AGG BY"));
}
private static String getString(Map<String,List<String>> map,String key){
return StringUtils.join(map.get(key),",");
}
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColumns() {
return columns;
}
public void setColumns(String columns) {
this.columns = columns;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public List<String> getWheres() {
return wheres;
}
public void setWheres(List<String> wheres) {
this.wheres = wheres;
}
public String getGroupBy() {
return groupBy;
}
public void setGroupBy(String groupBy) {
this.groupBy = groupBy;
}
public String getAggBy() {
return aggBy;
}
public void setAggBy(String aggBy) {
this.aggBy = aggBy;
}
}
package com.dlink.trans.ddl; package com.dlink.trans.ddl;
import com.dlink.constant.FlinkFunctionConstant;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation; import com.dlink.trans.Operation;
import com.dlink.ud.udf.GetKey;
import com.dlink.ud.udtaf.RowsToMap;
import org.apache.flink.table.api.Table;
import java.util.List;
/** /**
* TODO * CreateAggTableOperation
* *
* @author wenmo * @author wenmo
* @since 2021/6/13 19:24 * @since 2021/6/13 19:24
*/ */
public class CreateAggTableOperation implements Operation{ public class CreateAggTableOperation extends AbstractOperation implements Operation{
private String KEY_WORD = "CREATE AGGTABLE"; private String KEY_WORD = "CREATE AGGTABLE";
public CreateAggTableOperation() {
}
public CreateAggTableOperation(String statement) {
this.statement = statement;
}
@Override @Override
public String getHandle() { public String getHandle() {
return KEY_WORD; return KEY_WORD;
} }
@Override @Override
public boolean canHandle(String key) { public Operation create(String statement) {
if(KEY_WORD.equalsIgnoreCase(key)){ return new CreateAggTableOperation(statement);
return true;
}else {
return false;
}
} }
@Override
public void build(CustomTableEnvironmentImpl stEnvironment) {
AggTable aggTable = AggTable.build(statement);
Table source = stEnvironment.sqlQuery("select * from "+ aggTable.getTable());
List<String> wheres = aggTable.getWheres();
if(wheres!=null&&wheres.size()>0) {
for (String s : wheres) {
source = source.filter(s);
}
}
Table sink = source.groupBy(aggTable.getGroupBy())
.flatAggregate(aggTable.getAggBy())
.select(aggTable.getColumns());
stEnvironment.registerTable(aggTable.getName(), sink);
}
/*@Override
public boolean noExecute(){
return true;
}*/
} }
package com.dlink.utils;
import java.util.ArrayList;
import java.util.List;
/**
* TODO
*
* @author wenmo
* @since 2021/6/13 21:09
*/
public class SqlExtractUtil {
private String[] KEY_WORDS = {
"AGG",
"AGGTABLE",
"AGGTABLES",
"AND",
"AS",
"BY",
"CREATE",
"DROP",
"FRAGMENT",
"FRAGMENTS",
"FROM",
"GROUP",
"LIMIT",
"OR",
"ORDER",
"SELECT",
"SHOW",
"TABLE",
"TABLES",
"VIEW",
"VIEWS",
"WHERE"
};
/**
* 获取固定分割符之间的内容 如 select a.name ,b.name from 中select和from之间的内容
*
* @param original
* @param upperStr
* @param start
* @param end
* @return
*/
public static String getSubStringPara(String original, String upperStr, String beforeStr, String start, String end, String secondEnd, boolean isStopTrim) {
int beforeIndex = upperStr.indexOf(" " + beforeStr + " ");
int startIndex = upperStr.indexOf(" " + start + " ", beforeIndex);
if (startIndex < 0) {
return "";
}
if (end.length() == 0) {
String resWitgout = original.substring(start.length() + startIndex + 1);
return isStopTrim ? resWitgout: resWitgout.trim();
}
int endIndex = upperStr.indexOf(" " + end + " ", startIndex);
if (endIndex < 0) {
endIndex = upperStr.indexOf(" " + secondEnd + " ", startIndex);
}
if (endIndex < 0) {
return "";
}
String res = original.substring(start.length() + startIndex + 1, endIndex);
return isStopTrim ? res: res.trim();
}
/**
* 处理一类固定分隔符的工具 例如条件语句 s1.name = s2.name and s1.age = s2.age And s1.sex=s2.sex
* 转换成 list = ["s1.name = s2.name", "and s1.age = s2", "And s1.sex=s2.sex"]
*
* @param original
* @param upperStr
* @param start
* @param end
* @param secondLayer
* @return
*/
public static List<String> getSubStringList(String original, String upperStr, String start, String end, String secondLayer) {
String subStringPara = getSubStringPara(original, upperStr, start, start, end, "", false);
List<String> list = new ArrayList<>();
if (subStringPara.length() == 0) {
return list;
}
String str[] = subStringPara.split(" " + secondLayer.toUpperCase() + " ");
for (int i = 0; i < str.length; i++) {
String[] split = str[i].split(" " + secondLayer.toLowerCase() + " ");
for (int j = 0; j < split.length; j++) {
if (split[j].replace(" ", "").length() > 0) {
list.add(split[j]);
}
}
}
return list;
}
}
package com.dlink.core;
import com.dlink.parser.SingleSqlParserFactory;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
* SqlParserTest
*
* @author wenmo
* @since 2021/6/14 17:03
*/
public class SqlParserTest {
@Test
public void selectTest(){
String sql = "insert into T SElecT id,xm as name frOm people wheRe id=1 And enabled = 1";
Map<String,List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString());
}
@Test
public void createAggTableTest(){
String sql = "CREATE AGGTABLE agg1 AS \n" +
"SELECT sid,data\n" +
"FROM score\n" +
"WHERE cls = 1\n" +
"GROUP BY sid\n" +
"AGG BY toMap(cls,score) as (data)";
//sql=sql.replace("\n"," ");
Map<String,List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString());
System.out.println(StringUtils.join(lists.get("SELECT"),","));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.3.0-SANPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-function</artifactId>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
...@@ -14,7 +14,7 @@ import java.util.Map; ...@@ -14,7 +14,7 @@ import java.util.Map;
* @author wenmo * @author wenmo
* @since 2021/5/25 15:50 * @since 2021/5/25 15:50
**/ **/
public class RowsToMap extends TableAggregateFunction<String, Map> { public class RowsToMap extends TableAggregateFunction<String, Map> {
@Override @Override
public Map createAccumulator() { public Map createAccumulator() {
return new HashMap(); return new HashMap();
...@@ -46,6 +46,5 @@ public class RowsToMap extends TableAggregateFunction<String, Map> { ...@@ -46,6 +46,5 @@ public class RowsToMap extends TableAggregateFunction<String, Map> {
public void emitValue(Map acc, Collector<String> out) { public void emitValue(Map acc, Collector<String> out) {
out.collect(acc.toString()); out.collect(acc.toString());
} }
} }
\ No newline at end of file
package com.dlink.ud.udtaf;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
/**
* 官网Demo Top2
*
* @author wenmo
* @since 2021/6/14 20:44
*/
public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2.Top2Accumulator> {
public static class Top2Accumulator {
public Integer first;
public Integer second;
}
@Override
public Top2Accumulator createAccumulator() {
Top2Accumulator acc = new Top2Accumulator();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accumulator acc, Integer value) {
if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}
public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>> out) {
// emit the value and rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
<module>dlink-admin</module> <module>dlink-admin</module>
<module>dlink-connectors</module> <module>dlink-connectors</module>
<module>dlink-client</module> <module>dlink-client</module>
<module>dlink-function</module>
</modules> </modules>
<properties> <properties>
...@@ -132,6 +133,11 @@ ...@@ -132,6 +133,11 @@
<artifactId>dlink-connector-jdbc</artifactId> <artifactId>dlink-connector-jdbc</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
<build> <build>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment