Commit 281c00cb authored by godkaikai's avatar godkaikai

executor模块独立优化

parent e5510452
......@@ -24,6 +24,7 @@ import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import java.time.LocalDateTime;
......@@ -394,14 +395,51 @@ public class JobManager extends RunTime {
executor.executeSql(item.getValue());
}
}
if(config.isUseStatementSet()) {
if(config.isUseStatementSet()&&useGateway) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) {
inserts.add(item.getValue());
}
}
currentSql = inserts.toString();
currentSql = String.join(FlinkSQLConstant.SEPARATOR,inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
InsertResult insertResult = new InsertResult(gatewayResult.getAppId(), true);
job.setResult(insertResult);
job.setJobId(gatewayResult.getAppId());
job.setJobManagerAddress(gatewayResult.getWebURL());
}else if(config.isUseStatementSet()&&!useGateway) {
List<String> inserts = new ArrayList<>();
StatementSet statementSet = stEnvironment.createStatementSet();
for (StatementParam item : jobParam.getTrans()) {
if(item.getType().equals(SqlType.INSERT)) {
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) {
statementSet.addInsertSql(item.getValue());
inserts.add(item.getValue());
}
}
}
if(inserts.size()>0) {
currentSql = String.join(FlinkSQLConstant.SEPARATOR, inserts);
TableResult tableResult = statementSet.execute();
if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
}
if (config.isUseResult()) {
IResult result = ResultBuilder.build(SqlType.INSERT, maxRowNum, "", true).getResult(tableResult);
job.setResult(result);
}
}
}else if(!config.isUseStatementSet()&&useGateway) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) {
inserts.add(item.getValue());
break;
}
}
currentSql = String.join(FlinkSQLConstant.SEPARATOR,inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
InsertResult insertResult = new InsertResult(gatewayResult.getAppId(), true);
......
......@@ -382,7 +382,7 @@ CREATE TABLE `dlink_cluster_configuration` (
PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
CREATE TABLE `dlink_job` (
CREATE TABLE `dlink_jar` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '名称',
`alias` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '别名',
......
package com.dlink.catalog.function;
import com.dlink.constant.FlinkFunctionConstant;
import com.dlink.ud.udf.GetKey;
import com.dlink.ud.udtaf.RowsToMap;
import com.dlink.ud.udtaf.Top2;
import java.util.HashMap;
import java.util.Map;
/**
* FunctionManager
*
* @author wenmo
* @since 2021/6/14 21:19
*/
public class FunctionManager {
private static Map<String,UDFunction> functions = new HashMap<String,UDFunction>(){
{
put(FlinkFunctionConstant.GET_KEY,
new UDFunction(FlinkFunctionConstant.GET_KEY,
UDFunction.UDFunctionType.Scalar,
new GetKey()));
put(FlinkFunctionConstant.TO_MAP,
new UDFunction(FlinkFunctionConstant.TO_MAP,
UDFunction.UDFunctionType.TableAggregate,
new RowsToMap()));
put(FlinkFunctionConstant.TOP2,
new UDFunction(FlinkFunctionConstant.TOP2,
UDFunction.UDFunctionType.TableAggregate,
new Top2()));
}
};
public static Map<String,UDFunction> getUsedFunctions(String statement){
Map<String,UDFunction> map = new HashMap<>();
String sql = statement.toLowerCase();
for (Map.Entry<String, UDFunction> entry : functions.entrySet()) {
if(sql.contains(entry.getKey().toLowerCase())){
map.put(entry.getKey(),entry.getValue());
}
}
return map;
}
}
package com.dlink.catalog.function;
import org.apache.flink.table.functions.FunctionDefinition;
/**
* UDFunction
*
* @author wenmo
* @since 2021/6/14 22:14
*/
public class UDFunction {
public enum UDFunctionType {
Scalar, Table, Aggregate, TableAggregate
}
private String name;
private UDFunctionType type;
private FunctionDefinition function;
public UDFunction(String name, UDFunctionType type, FunctionDefinition function) {
this.name = name;
this.type = type;
this.function = function;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public UDFunctionType getType() {
return type;
}
public void setType(UDFunctionType type) {
this.type = type;
}
public FunctionDefinition getFunction() {
return function;
}
public void setFunction(FunctionDefinition function) {
this.function = function;
}
}
package com.dlink.constant;
public interface FlinkFunctionConstant {
/**
* TO_MAP 函数
*/
String TO_MAP = "to_map";
/**
* GET_KEY 函数
*/
String GET_KEY = "get_key";
/**
* TOP2 函数
*/
String TOP2 = "top2";
}
package com.dlink.constant;
/**
* FlinkSQLConstant
*
* @author wenmo
* @since 2021/5/25 15:51
**/
public interface FlinkSQLConstant {
/**
* 分隔符
*/
String SEPARATOR = ";";
/**
* DDL 类型
*/
String DDL = "DDL";
/**
* DML 类型
*/
String DML = "DML";
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.custom.SqlManager;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobExecutionResult;
......@@ -17,7 +18,7 @@ import java.util.Map;
/**
* Executor
* @author wenmo
* @since 2021/5/25 13:39
* @since 2021/11/17
**/
public abstract class Executor {
......@@ -26,6 +27,17 @@ public abstract class Executor {
protected EnvironmentSetting environmentSetting;
protected ExecutorSetting executorSetting;
protected SqlManager sqlManager = new SqlManager();
protected boolean useSqlFragment = true;
public SqlManager getSqlManager() {
return sqlManager;
}
public boolean isUseSqlFragment() {
return useSqlFragment;
}
public static Executor build(){
return new LocalStreamExecutor(ExecutorSetting.DEFAULT);
}
......
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.custom.CustomTableResultImpl;
import com.dlink.executor.custom.TableSchemaField;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/6/7 22:06
**/
public final class SqlManager {
private Map<String, String> sqlFragments;
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
sqlFragments = new HashMap<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public List<String> listSqlFragments() {
return new ArrayList<>(sqlFragments.keySet());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public Map<String, String> getSqlFragment() {
return sqlFragments;
}
public TableResult getSqlFragments() {
List<Row> rows = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows);
}
public Iterator getSqlFragmentsIterator() {
return sqlFragments.entrySet().iterator();
}
public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
}
return environment.fromValues(keys);
}
public boolean checkShowFragments(String sql){
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) {
return statement;
}
String[] strs = statement.split(";");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim();
if (str.length() == 0) {
continue;
}
if (str.contains(":=")) {
String[] strs2 = str.split(":=");
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(":=") + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
}
} else {
sb.append(replaceVariable(str));
}
}
return sb.toString();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
}
package com.dlink.interceptor;
import com.dlink.assertion.Asserts;
import com.dlink.catalog.function.FunctionManager;
import com.dlink.catalog.function.UDFunction;
import com.dlink.executor.Executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.trans.Operation;
import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* FlinkInterceptor
*
* @author wenmo
* @since 2021/6/11 22:17
*/
public class FlinkInterceptor {
public static String pretreatStatement(Executor executor, String statement) {
statement = SqlUtil.removeNote(statement);
if(executor.isUseSqlFragment()) {
statement = executor.getSqlManager().parseVariable(statement);
}
initFunctions(executor.getCustomTableEnvironmentImpl(), statement);
return statement;
}
public static boolean build(Executor executor, String statement) {
Operation operation = Operations.buildOperation(statement);
if (Asserts.isNotNull(operation)) {
operation.build(executor.getCustomTableEnvironmentImpl());
return operation.noExecute();
}
return false;
}
private static void initFunctions(CustomTableEnvironmentImpl stEnvironment, String statement) {
Map<String, UDFunction> usedFunctions = FunctionManager.getUsedFunctions(statement);
String[] udfs = stEnvironment.listUserDefinedFunctions();
List<String> udflist = Arrays.asList(udfs);
for (Map.Entry<String, UDFunction> entry : usedFunctions.entrySet()) {
if (!udflist.contains(entry.getKey())) {
if (entry.getValue().getType() == UDFunction.UDFunctionType.Scalar) {
stEnvironment.registerFunction(entry.getKey(),
(ScalarFunction) entry.getValue().getFunction());
} else if (entry.getValue().getType() == UDFunction.UDFunctionType.Table) {
stEnvironment.registerFunction(entry.getKey(),
(TableFunction) entry.getValue().getFunction());
} else if (entry.getValue().getType() == UDFunction.UDFunctionType.Aggregate) {
stEnvironment.registerFunction(entry.getKey(),
(AggregateFunction) entry.getValue().getFunction());
} else if (entry.getValue().getType() == UDFunction.UDFunctionType.TableAggregate) {
stEnvironment.registerFunction(entry.getKey(),
(TableAggregateFunction) entry.getValue().getFunction());
}
}
}
}
}
package com.dlink.parser;
import com.dlink.assertion.Asserts;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* BaseSingleSqlParser
*
* @author wenmo
* @since 2021/6/14 16:43
*/
public abstract class BaseSingleSqlParser {
//原始Sql语句
protected String originalSql;
//Sql语句片段
protected List<SqlSegment> segments;
/**
* 构造函数,传入原始Sql语句,进行劈分。
**/
public BaseSingleSqlParser(String originalSql) {
this.originalSql = originalSql;
segments = new ArrayList<SqlSegment>();
initializeSegments();
}
/**
* 初始化segments,强制子类实现
**/
protected abstract void initializeSegments();
/**
* 将originalSql劈分成一个个片段
**/
protected Map<String,List<String>> splitSql2Segment() {
Map<String,List<String>> map = new HashMap<>();
for (SqlSegment sqlSegment : segments) {
sqlSegment.parse(originalSql);
if(Asserts.isNotNullString(sqlSegment.getStart())) {
map.put(sqlSegment.getStart().toUpperCase(), sqlSegment.getBodyPieces());
}
}
return map;
}
/**
* 得到解析完毕的Sql语句
**/
public String getParsedSql() {
StringBuffer sb = new StringBuffer();
for (SqlSegment sqlSegment : segments) {
sb.append(sqlSegment.getParsedSqlSegment() + "\n");
}
String retval = sb.toString().replaceAll("\n+", "\n");
return retval;
}
}
package com.dlink.parser;
/**
* CreateAggTableSelectSqlParser
*
* @author wenmo
* @since 2021/6/14 16:56
*/
public class CreateAggTableSelectSqlParser extends BaseSingleSqlParser {
public CreateAggTableSelectSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(create\\s+aggtable)(.+)(as\\s+select)", "[,]"));
segments.add(new SqlSegment("(select)(.+)(from)", "[,]"));
segments.add(new SqlSegment("(from)(.+?)( where | on | having | group\\s+by | order\\s+by | agg\\s+by | ENDOFSQL)", "(,|\\s+left\\s+join\\s+|\\s+right\\s+join\\s+|\\s+inner\\s+join\\s+)"));
segments.add(new SqlSegment("(where|on|having)(.+?)( group\\s+by | order\\s+by | agg\\s+by | ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(group\\s+by)(.+?)( order\\s+by | agg\\s+by | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(order\\s+by)(.+?)( agg\\s+by | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(agg\\s+by)(.+?)( ENDOFSQL)", "[,]"));
}
}
package com.dlink.parser;
/**
* DeleteSqlParser
*
* @author wenmo
* @since 2021/6/14 16:51
*/
public class DeleteSqlParser extends BaseSingleSqlParser {
public DeleteSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(delete\\s+from)(.+)( where | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(where)(.+)( ENDOFSQL)", "(and|or)"));
}
}
package com.dlink.parser;
/**
* InsertSelectSqlParser
*
* @author wenmo
* @since 2021/6/14 16:53
*/
public class InsertSelectSqlParser extends BaseSingleSqlParser {
public InsertSelectSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(insert\\s+into)(.+)( select )", "[,]"));
segments.add(new SqlSegment("(select)(.+)(from)", "[,]"));
segments.add(new SqlSegment("(from)(.+?)( where | on | having | group\\s+by | order\\s+by | ENDOFSQL)", "(,|\\s+left\\s+join\\s+|\\s+right\\s+join\\s+|\\s+inner\\s+join\\s+)"));
segments.add(new SqlSegment("(where|on|having)(.+?)( group\\s+by | order\\s+by | ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(group\\s+by)(.+?)( order\\s+by| ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(order\\s+by)(.+?)( ENDOFSQL)", "[,]"));
}
}
package com.dlink.parser;
/**
* InsertSqlParser
*
* @author wenmo
* @since 2021/6/14 16:54
*/
public class InsertSqlParser extends BaseSingleSqlParser {
public InsertSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(insert\\s+into)(.+?)([(])", "[,]"));
segments.add(new SqlSegment("([(])(.+?)([)]\\s+values\\s+[(])", "[,]"));
segments.add(new SqlSegment("([)]\\s+values\\s+[(])(.+)([)]\\s+ENDOFSQL)", "[,]"));
}
public String getParsedSql() {
String retval = super.getParsedSql();
retval = retval + ")";
return retval;
}
}
package com.dlink.parser;
/**
* SelectSqlParser
*
* @author wenmo
* @since 2021/6/14 16:53
*/
public class SelectSqlParser extends BaseSingleSqlParser {
public SelectSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(select)(.+)(from)", "[,]"));
segments.add(new SqlSegment("(from)(.+?)(where |group\\s+by|having|order\\s+by | ENDOFSQL)", "(,|s+lefts+joins+|s+rights+joins+|s+inners+joins+)"));
segments.add(new SqlSegment("(where)(.+?)(group\\s+by |having| order\\s+by | ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(group\\s+by)(.+?)(having|order\\s+by| ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(having)(.+?)(order\\s+by| ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(order\\s+by)(.+)( ENDOFSQL)", "[,]"));
}
}
package com.dlink.parser;
/**
* SetSqlParser
*
* @author wenmo
* @since 2021/10/21 18:41
**/
public class SetSqlParser extends BaseSingleSqlParser {
public SetSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
//SET(\s+(\S+)\s*=(.*))?
segments.add(new SqlSegment("(set)\\s+(.+)(\\s*=)", "[.]"));
segments.add(new SqlSegment("(=)\\s*(.*)( ENDOFSQL)", ","));
}
}
package com.dlink.parser;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* SingleSqlParserFactory
*
* @author wenmo
* @since 2021/6/14 16:49
*/
public class SingleSqlParserFactory {
public static Map<String,List<String>> generateParser(String sql) {
BaseSingleSqlParser tmp = null;
// sql = sql.replace("\n"," ").replaceAll("\\s{1,}", " ") +" ENDOFSQL";
sql = sql.replace("\r\n"," ").replace("\n"," ") +" ENDOFSQL";
if (contains(sql, "(insert\\s+into)(.+)(select)(.+)(from)(.+)")) {
tmp = new InsertSelectSqlParser(sql);
} else if (contains(sql, "(create\\s+aggtable)(.+)(as\\s+select)(.+)")) {
tmp = new CreateAggTableSelectSqlParser(sql);
} else if (contains(sql, "(select)(.+)(from)(.+)")) {
tmp = new SelectSqlParser(sql);
} else if (contains(sql, "(delete\\s+from)(.+)")) {
tmp = new DeleteSqlParser(sql);
} else if (contains(sql, "(update)(.+)(set)(.+)")) {
tmp = new UpdateSqlParser(sql);
} else if (contains(sql, "(insert\\s+into)(.+)(values)(.+)")) {
tmp = new InsertSqlParser(sql);
} else if (contains(sql, "(create\\s+table)(.+)")) {
} else if (contains(sql, "(create\\s+database)(.+)")) {
} else if (contains(sql, "(show\\s+databases)")) {
} else if (contains(sql, "(use)(.+)")) {
} else if (contains(sql, "(set)(.+)")) {
tmp = new SetSqlParser(sql);
} else {
}
return tmp.splitSql2Segment();
}
/**
* 看word是否在lineText中存在,支持正则表达式
*
* @param sql:要解析的sql语句
* @param regExp:正则表达式
* @return
**/
private static boolean contains(String sql, String regExp) {
Pattern pattern = Pattern.compile(regExp, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(sql);
return matcher.find();
}
}
package com.dlink.parser;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* SqlSegment
*
* @author wenmo
* @since 2021/6/14 16:12
*/
public class SqlSegment {
private static final String Crlf = "|";
@SuppressWarnings("unused")
private static final String FourSpace = "  ";
/**
* Sql语句片段类型,大写
**/
private String type;
/**
* Sql语句片段开头部分
**/
private String start;
/**
* Sql语句片段中间部分
**/
private String body;
/**
* Sql语句片段结束部分
**/
private String end;
/**
* 用于分割中间部分的正则表达式
**/
private String bodySplitPattern;
/**
* 表示片段的正则表达式
**/
private String segmentRegExp;
/**
* 分割后的Body小片段
**/
private List<String> bodyPieces;
/**
* 构造函数
*
* @param segmentRegExp 表示这个Sql片段的正则表达式
* @param bodySplitPattern 用于分割body的正则表达式
**/
public SqlSegment(String segmentRegExp, String bodySplitPattern) {
this.type = "";
this.start = "";
this.body = "";
this.end = "";
this.segmentRegExp = segmentRegExp;
this.bodySplitPattern = bodySplitPattern;
this.bodyPieces = new ArrayList<String>();
}
/**
* 从sql中查找符合segmentRegExp的部分,并赋值到start,body,end等三个属性中
**/
public void parse(String sql) {
Pattern pattern = Pattern.compile(segmentRegExp, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(sql);
while (matcher.find()) {
start = matcher.group(1);
body = matcher.group(2);
end = matcher.group(3);
type = start.replace("\n"," ").replaceAll("\\s{1,}", " ").toUpperCase();
parseBody();
}
}
/**
* 解析body部分
**/
private void parseBody() {
List<String> ls = new ArrayList<String>();
Pattern p = Pattern.compile(bodySplitPattern, Pattern.CASE_INSENSITIVE);
body = body.trim();
Matcher m = p.matcher(body);
StringBuffer sb = new StringBuffer();
boolean result = m.find();
while (result) {
m.appendReplacement(sb, Crlf);
result = m.find();
}
m.appendTail(sb);
//ls.add(start);
String[] arr = sb.toString().split("[|]");
int arrLength = arr.length;
for (int i = 0; i < arrLength; i++) {
ls.add(arr[i]);
}
bodyPieces = ls;
}
/**
* 取得解析好的Sql片段
**/
public String getParsedSqlSegment() {
StringBuffer sb = new StringBuffer();
sb.append(start + Crlf);
for (String piece : bodyPieces) {
sb.append(piece + Crlf);
}
return sb.toString();
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getStart() {
return start;
}
public void setStart(String start) {
this.start = start;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public String getEnd() {
return end;
}
public void setEnd(String end) {
this.end = end;
}
public String getBodySplitPattern() {
return bodySplitPattern;
}
public void setBodySplitPattern(String bodySplitPattern) {
this.bodySplitPattern = bodySplitPattern;
}
public String getSegmentRegExp() {
return segmentRegExp;
}
public void setSegmentRegExp(String segmentRegExp) {
this.segmentRegExp = segmentRegExp;
}
public List<String> getBodyPieces() {
return bodyPieces;
}
public void setBodyPieces(List<String> bodyPieces) {
this.bodyPieces = bodyPieces;
}
}
package com.dlink.parser;
/**
* SqlType
*
* @author wenmo
* @since 2021/7/3 11:11
*/
public enum SqlType {
SELECT("SELECT"),
CREATE("CREATE"),
DROP("DROP"),
ALTER("ALTER"),
INSERT("INSERT"),
DESCRIBE("DESCRIBE"),
EXPLAIN("EXPLAIN"),
USE("USE"),
SHOW("SHOW"),
LOAD("LOAD"),
UNLOAD("UNLOAD"),
SET("SET"),
RESET("RESET"),
UNKNOWN("UNKNOWN"),
;
private String type;
SqlType(String type) {
this.type = type;
}
public void setType(String type) {
this.type = type;
}
public String getType() {
return type;
}
public boolean equalsValue(String value){
return type.equalsIgnoreCase(value);
}
}
package com.dlink.parser;
/**
* UpdateSqlParser
*
* @author wenmo
* @since 2021/6/14 16:52
*/
public class UpdateSqlParser extends BaseSingleSqlParser {
public UpdateSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(update)(.+)(set)", "[,]"));
segments.add(new SqlSegment("(set)(.+?)( where | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(where)(.+)(ENDOFSQL)", "(and|or)"));
}
}
package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import java.util.Arrays;
import java.util.List;
/**
* AbstractOperation
*
* @author wenmo
* @since 2021/6/14 18:18
*/
public class AbstractOperation {
protected String statement;
public AbstractOperation() {
}
public AbstractOperation(String statement) {
this.statement = statement;
}
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public boolean checkFunctionExist(CustomTableEnvironmentImpl stEnvironment,String key){
String[] udfs = stEnvironment.listUserDefinedFunctions();
List<String> udflist = Arrays.asList(udfs);
if(udflist.contains(key.toLowerCase())){
return true;
}else {
return false;
}
}
public boolean noExecute(){
return true;
}
}
package com.dlink.trans;
/**
* TODO
*
* @author wenmo
* @since 2021/6/13 19:34
*/
public interface CreateOperation extends Operation{
//void create(CustomTableEnvironmentImpl stEnvironment);
}
package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
/**
* Operation
*
* @author wenmo
* @since 2021/6/13 19:24
*/
public interface Operation {
String getHandle();
Operation create(String statement);
void build(CustomTableEnvironmentImpl stEnvironment);
boolean noExecute();
}
package com.dlink.trans;
import com.dlink.parser.SqlType;
import com.dlink.trans.ddl.CreateAggTableOperation;
import com.dlink.trans.ddl.SetOperation;
/**
* Operations
*
* @author wenmo
* @since 2021/5/25 15:50
**/
public class Operations {
private static Operation[] operations = {
new CreateAggTableOperation(),
new SetOperation()
};
public static SqlType getSqlTypeFromStatements(String statement){
String[] statements = statement.split(";");
SqlType sqlType = SqlType.UNKNOWN;
for (String item : statements) {
if (item.trim().isEmpty()) {
continue;
}
sqlType = Operations.getOperationType(item);
if(sqlType == SqlType.INSERT ||sqlType == SqlType.SELECT){
return sqlType;
}
}
return sqlType;
}
public static SqlType getOperationType(String sql) {
String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").trim().toUpperCase();
SqlType type = SqlType.UNKNOWN;
for (SqlType sqlType : SqlType.values()) {
if (sqlTrim.startsWith(sqlType.getType())) {
type = sqlType;
break;
}
}
return type;
}
public static Operation buildOperation(String statement){
String sql = statement.replace("\n"," ").replaceAll("\\s{1,}", " ").trim().toUpperCase();
for (int i = 0; i < operations.length; i++) {
if(sql.startsWith(operations[i].getHandle())){
return operations[i].create(statement);
}
}
return null;
}
}
package com.dlink.trans.ddl;
import com.dlink.parser.SingleSqlParserFactory;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
/**
* AggTable
*
* @author wenmo
* @since 2021/6/13 20:32
*/
public class AggTable {
private String statement;
private String name;
private String columns;
private String table;
private List<String> wheres;
private String groupBy;
private String aggBy;
public AggTable(String statement, String name, String columns, String table, List<String> wheres, String groupBy, String aggBy) {
this.statement = statement;
this.name = name;
this.columns = columns;
this.table = table;
this.wheres = wheres;
this.groupBy = groupBy;
this.aggBy = aggBy;
}
public static AggTable build(String statement){
Map<String,List<String>> map = SingleSqlParserFactory.generateParser(statement);
return new AggTable(statement,
getString(map,"CREATE AGGTABLE"),
getString(map,"SELECT"),
getString(map,"FROM"),
map.get("WHERE"),
getString(map,"GROUP BY"),
getString(map,"AGG BY"));
}
private static String getString(Map<String,List<String>> map,String key){
return StringUtils.join(map.get(key),",");
}
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColumns() {
return columns;
}
public void setColumns(String columns) {
this.columns = columns;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public List<String> getWheres() {
return wheres;
}
public void setWheres(List<String> wheres) {
this.wheres = wheres;
}
public String getGroupBy() {
return groupBy;
}
public void setGroupBy(String groupBy) {
this.groupBy = groupBy;
}
public String getAggBy() {
return aggBy;
}
public void setAggBy(String aggBy) {
this.aggBy = aggBy;
}
}
package com.dlink.trans.ddl;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.table.api.Table;
import java.util.List;
/**
* CreateAggTableOperation
*
* @author wenmo
* @since 2021/6/13 19:24
*/
public class CreateAggTableOperation extends AbstractOperation implements Operation{
private String KEY_WORD = "CREATE AGGTABLE";
public CreateAggTableOperation() {
}
public CreateAggTableOperation(String statement) {
super(statement);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public Operation create(String statement) {
return new CreateAggTableOperation(statement);
}
@Override
public void build(CustomTableEnvironmentImpl stEnvironment) {
AggTable aggTable = AggTable.build(statement);
Table source = stEnvironment.sqlQuery("select * from "+ aggTable.getTable());
List<String> wheres = aggTable.getWheres();
if(wheres!=null&&wheres.size()>0) {
for (String s : wheres) {
source = source.filter(s);
}
}
Table sink = source.groupBy(aggTable.getGroupBy())
.flatAggregate(aggTable.getAggBy())
.select(aggTable.getColumns());
stEnvironment.registerTable(aggTable.getName(), sink);
}
}
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* SetOperation
*
* @author wenmo
* @since 2021/10/21 19:56
**/
public class SetOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "SET";
public SetOperation() {
}
public SetOperation(String statement) {
super(statement);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public Operation create(String statement) {
return new SetOperation(statement);
}
@Override
public void build(CustomTableEnvironmentImpl stEnvironment) {
Map<String,List<String>> map = SingleSqlParserFactory.generateParser(statement);
if(Asserts.isNotNullMap(map)&&map.size()==2) {
Map<String, String> confMap = new HashMap<>();
confMap.put(StringUtils.join(map.get("SET"), "."), StringUtils.join(map.get("="), ","));
stEnvironment.getConfig().addConfiguration(Configuration.fromMap(confMap));
}
}
}
package com.dlink.utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.stream.Collectors;
/**
* MapParseUtils
*
* @author wenmo
* @since 2021/6/22
**/
public class MapParseUtils {
/**
* 数组是否嵌套
*
* @param inStr
* @return
*/
public static Boolean getStrIsNest(String inStr) {
if (inStr == null || inStr.isEmpty()) {
return false;
}
Deque<Integer> stack = new LinkedList<>();
for (int i = 0; i < inStr.length(); i++) {
if (inStr.charAt(i) == '[') {
stack.push(i);
}
if (inStr.charAt(i) == ']') {
stack.pop();
if (stack.size() != 0) {
return true;
}
}
}
return false;
}
/**
* 获取嵌套最外层的下标对 table=[[default_catalog, default_database, score, project=[sid, cls, score]]], fields=[sid, cls, score]
* ^(下标x) ^(下标y) ^(下标z) ^(下标n)
* List<Integer> [x, y, z, n]
*
* @param inStr
* @return
*/
public static List<Integer> getNestList(String inStr) {
Stack nestIndexList = new Stack();
if (inStr == null || inStr.isEmpty()) {
return nestIndexList;
}
Deque<Integer> stack = new LinkedList<>();
for (int i = 0; i < inStr.length(); i++) {
if (inStr.charAt(i) == '[') {
if (stack.isEmpty()) {
nestIndexList.add(i);
}
stack.push(i);
}
if (inStr.charAt(i) == ']') {
stack.pop();
if (stack.size() == 0) {
nestIndexList.add(i);
}
}
}
return nestIndexList;
}
/**
* 获取最外层括号下标 table=[((f.SERIAL_NO || f.PRESC_NO) || f.ITEM_NO) AS EXPR$0, ((f.DATE || f.TIME) || f.ITEM_NO) AS EXPR$2]
* ^(下标x) ^(下标y) ^(下标z) ^(下标n)
* List<Integer> [x, y, z, n]
*
* @param inStr
* @return
*/
public static List<Integer> getBracketsList(String inStr) {
Stack nestIndexList = new Stack();
if (inStr == null || inStr.isEmpty()) {
return nestIndexList;
}
Deque<Integer> stack = new LinkedList<>();
for (int i = 0; i < inStr.length(); i++) {
if (inStr.charAt(i) == '(') {
if (stack.isEmpty()) {
nestIndexList.add(i);
}
stack.push(i);
}
if (inStr.charAt(i) == ')') {
stack.pop();
if (stack.size() == 0) {
nestIndexList.add(i);
}
}
}
return nestIndexList;
}
/**
* 转换map
*
* @param inStr
* @return
*/
public static Map parse(String inStr, String... blackKeys) {
if (getStrIsNest(inStr)) {
return parseForNest(inStr, blackKeys);
} else {
return parseForNotNest(inStr);
}
}
/**
* 嵌套解析
*
* @param inStr
* @return
*/
public static Map parseForNest(String inStr, String... blackKeys) {
Map map = new HashMap();
List<Integer> nestList = getNestList(inStr);
int num = nestList.size() / 2;
for (int i = 0; i < num; i++) {
if (i == 0) {
String substring = inStr.substring(0, nestList.get(i + 1) + 1);
String key = getMapKey(substring);
boolean isNext = true;
for (int j = 0; j < blackKeys.length; j++) {
if (key.equals(blackKeys[j])) {
isNext = false;
}
}
if (isNext) {
if (getStrIsNest(substring)) {
map.put(key, getMapListNest(substring));
} else {
map.put(key, getMapList(substring));
}
} else {
map.put(key, getTextValue(substring));
}
} else {
String substring = inStr.substring(nestList.get(2 * i - 1) + 2, nestList.get(2 * i + 1) + 1);
String key = getMapKey(substring);
boolean isNext = true;
for (int j = 0; j < blackKeys.length; j++) {
if (key.equals(blackKeys[j])) {
isNext = false;
}
}
if (isNext) {
if (getStrIsNest(substring)) {
map.put(key, getMapListNest(substring));
} else {
map.put(key, getMapList(substring));
}
} else {
map.put(key, getTextValue(substring));
}
}
}
return map;
}
/**
* @return java.util.Map
* @author lewnn
* @operate
* @date 2021/8/20 15:03
*/
public static Map parseForSelect(String inStr) {
Map map = new HashMap();
List<Integer> bracketsList = getBracketsList(inStr);
String mapKey = getMapKey(inStr);
List<String> list = new ArrayList<>();
int size = bracketsList.size();
if (size % 2 != 0) {
// 此处若size部位偶数 则返回空 可能会存在问题
return map;
} else {
int numSize = size / 2;//括号对数
for (int i = 0; i < numSize; i++) {
String msgStr = "";
if (2 * i + 2 >= size) {
msgStr = inStr.substring(bracketsList.get(2 * i), inStr.lastIndexOf("]"));
} else {
msgStr = inStr.substring(bracketsList.get(2 * i), bracketsList.get(2 * i + 2));
msgStr = msgStr.substring(0, msgStr.lastIndexOf(",") > 0 ? msgStr.lastIndexOf(",") : msgStr.length());
}
list.add(msgStr);
}
}
map.put(mapKey, list);
return map;
}
/**
* 非嵌套解析
*
* @param inStr
* @return
*/
public static Map parseForNotNest(String inStr) {
String[] split = inStr.split("], ");
Map map = new HashMap();
for (int i = 0; i < split.length; i++) {
if (i == split.length - 1) {
map.put(getMapKey(split[i]), getMapList(split[i]));
} else {
map.put(getMapKey(split[i] + "]"), getMapList(split[i] + "]"));
}
}
return map;
}
/**
* 获取主键 例子where=[(sid = sid0)] =[ 前即key
*
* @param splitStr
* @return
*/
public static String getMapKey(String splitStr) {
if (splitStr == null || splitStr.indexOf("=[") == -1) {
return "";
}
return splitStr.substring(0, splitStr.indexOf("=[")).replace(" ", "");
}
/**
* 获取主键对应的集合值 例子where=[(sid = sid0)] []中内容为集合内容
*
* @param splitStr
* @return
*/
public static List getMapList(String splitStr) {
if (splitStr == null || splitStr.indexOf("[") == -1 || splitStr.indexOf("]") == -1) {
return new ArrayList();
}
return Arrays.stream(splitStr.substring(splitStr.indexOf("[") + 1, splitStr.lastIndexOf("]")).split(", ")).collect(Collectors.toList());
}
/**
* 获取嵌套主键对应的集合值 例子table=[[default_catalog, default_database, score, project=[sid, cls, score]]] []中内容为集合内容
*
* @param splitStr
* @return
*/
public static List getMapListNest(String splitStr) {
List list = new ArrayList();
if (splitStr == null || splitStr.indexOf("[") == -1 || splitStr.indexOf("]") == -1) {
return new ArrayList();
}
String substring = splitStr.substring(splitStr.indexOf("[") + 1, splitStr.lastIndexOf("]")).trim();
//样例 [default_catalog, default_database, score, project=[sid, cls, score]]
if (substring.startsWith("[")) {
//还是一个集合
list.add(getMapListNest(substring));
} else {
//不是一个集合 而是元素时 default_catalog, default_database, score, project=[sid, cls, score], course=[en, ds, as]
//嵌套所以 还会有[]
List<Integer> nestList = getNestList(substring);
int num = nestList.size() / 2;
String[] str = new String[num];
for (int i = 0; i < num; i++) {
str[i] = substring.substring(nestList.get(2 * i), nestList.get(2 * i + 1) + 1);
}
//倒叙替换 去除集合内容干扰
for (int i = num - 1; i >= 0; i--) {
substring = substring.substring(0, nestList.get(2 * i)) + "_str" + i + "_" + substring.substring(nestList.get(2 * i + 1) + 1);
}
//去除干扰后 default_catalog, default_database, score, project=_str0_, course=_str1_
// _str0_ = [sid, cls, score]
// _str1_ = [en, ds, as]
String[] split = substring.split(", ");
int index = 0;
for (String s : split) {
if (s.startsWith("[")) {
list.add(getMapListNest(splitStr));
} else if (s.indexOf("_str") != -1) {
// project=_str0_ 还原集合干扰 project=[sid, cls, score]
list.add(parseForNest(s.replace("_str" + index + "_", str[index])));
index++;
} else {
list.add(s);
}
}
}
return list;
}
private static String getTextValue(String splitStr) {
return splitStr.substring(splitStr.indexOf("[") + 1, splitStr.lastIndexOf("]"));
}
}
package com.dlink.utils;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
/**
* SqlUtil
*
* @author wenmo
* @since 2021/7/14 21:57
*/
public class SqlUtil {
public static String[] getStatements(String sql){
if(Asserts.isNullString(sql)){
return new String[0];
}
return sql.split(FlinkSQLConstant.SEPARATOR);
}
public static String removeNote(String sql){
if(Asserts.isNotNullString(sql)) {
sql = sql.replaceAll("--([^'\\r\\n]{0,}('[^'\\r\\n]{0,}'){0,1}[^'\\r\\n]{0,}){0,}$", "").trim();
}
return sql;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment