Commit baa157da authored by godkaikai's avatar godkaikai

支持Set语法

parent 4fcc452c
......@@ -127,7 +127,6 @@ public class DataBaseController {
Asserts.checkNotNull(dataBase,"该数据源不存在!");
databaseService.checkHeartBeat(dataBase);
databaseService.updateById(dataBase);
logger.warn("埋点日志");
return Result.succeed(dataBase,"状态刷新完成");
}
......
......@@ -3,6 +3,7 @@ package com.dlink.assertion;
import com.dlink.exception.RunTimeException;
import java.util.Collection;
import java.util.Map;
/**
* Asserts
......@@ -59,6 +60,16 @@ public class Asserts {
return !isNullCollection(collection);
}
public static boolean isNullMap(Map map) {
if (isNull(map)||map.size()==0) {
return true;
}
return false;
}
public static boolean isNotNullMap(Map map) {
return !isNullMap(map);
}
public static void checkNull(String key,String msg) {
if (key == null||"".equals(key)) {
......@@ -83,4 +94,10 @@ public class Asserts {
throw new RunTimeException(msg);
}
}
public static void checkNullMap(Map map,String msg) {
if(isNullMap(map)){
throw new RunTimeException(msg);
}
}
}
......@@ -41,6 +41,10 @@ public class Explainer {
String[] sqls = SqlUtil.getStatements(statement);
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
for (int i = 0; i < sqls.length; i++) {
String sql = sqls[i].trim();
if(Asserts.isNullString(sql)){
continue;
}
SqlExplainResult record = new SqlExplainResult();
try {
if (!FlinkInterceptor.build(executor.getCustomTableEnvironmentImpl(), sqls[i])) {
......@@ -48,6 +52,9 @@ public class Explainer {
if (Asserts.isEquals(FlinkSQLConstant.DDL,record.getType())) {
executor.executeSql(sqls[i]);
}
}else{
record.setParseTrue(true);
record.setExplainTrue(true);
}
} catch (Exception e) {
e.printStackTrace();
......
......@@ -3,15 +3,19 @@ package com.dlink.interceptor;
import com.dlink.assertion.Asserts;
import com.dlink.catalog.function.FunctionManager;
import com.dlink.catalog.function.UDFunction;
import com.dlink.constant.FlinkFunctionConstant;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.trans.Operation;
import com.dlink.trans.Operations;
import com.dlink.ud.udtaf.RowsToMap;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.functions.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -25,6 +29,9 @@ public class FlinkInterceptor {
public static boolean build( CustomTableEnvironmentImpl stEnvironment,String statemnet){
initFunctions(stEnvironment,statemnet);
/*if(initConfiguration(stEnvironment,statemnet)){
return true;
}*/
Operation operation = Operations.buildOperation(statemnet);
if(Asserts.isNotNull(operation)) {
operation.build(stEnvironment);
......@@ -55,4 +62,5 @@ public class FlinkInterceptor {
}
}
}
}
......@@ -152,37 +152,38 @@ public class JobManager extends RunTime {
SubmitResult result = new SubmitResult(sessionId, sqlList, environmentSetting.getHost(), executorSetting.getJobName());
int currentIndex = 0;
try {
if (sqlList != null && sqlList.size() > 0) {
Executor executor = createExecutor();
for (String sqlText : sqlList) {
currentIndex++;
SqlType operationType = Operations.getOperationType(sqlText);
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
if (operationType==SqlType.INSERT) {
long start = System.currentTimeMillis();
if (!FlinkInterceptor.build(stEnvironment, sqlText)) {
TableResult tableResult = executor.executeSql(sqlText);
JobID jobID = tableResult.getJobClient().get().getJobID();
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
InsertResult insertResult = new InsertResult((jobID == null ? "" : jobID.toHexString()), true);
result.setResult(insertResult);
result.setJobId((jobID == null ? "" : jobID.toHexString()));
result.setTime(timeElapsed);
}
result.setSuccess(true);
result.setFinishDate(LocalDateTime.now());
} else {
if (!FlinkInterceptor.build(stEnvironment, sqlText)) {
executor.executeSql(sqlText);
}
}
}
} else {
if (Asserts.isNullCollection(sqlList)) {
result.setSuccess(false);
result.setMsg(LocalDateTime.now().toString() + ":执行sql语句为空。");
return result;
}
Executor executor = createExecutor();
for (String sqlText : sqlList) {
currentIndex++;
SqlType operationType = Operations.getOperationType(sqlText);
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
if (operationType.equals(SqlType.INSERT)) {
long start = System.currentTimeMillis();
if (!FlinkInterceptor.build(stEnvironment, sqlText)) {
TableResult tableResult = executor.executeSql(sqlText);
JobID jobID = tableResult.getJobClient().get().getJobID();
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
InsertResult insertResult = new InsertResult((jobID == null ? "" : jobID.toHexString()), true);
result.setResult(insertResult);
result.setJobId((jobID == null ? "" : jobID.toHexString()));
result.setTime(timeElapsed);
}
result.setSuccess(true);
result.setFinishDate(LocalDateTime.now());
} else if(operationType.equals(SqlType.SET)){
} else {
if (!FlinkInterceptor.build(stEnvironment, sqlText)) {
executor.executeSql(sqlText);
}
}
}
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getStackTrace();
......
package com.dlink.parser;
import com.dlink.assertion.Asserts;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -41,8 +43,8 @@ public abstract class BaseSingleSqlParser {
Map<String,List<String>> map = new HashMap<>();
for (SqlSegment sqlSegment : segments) {
sqlSegment.parse(originalSql);
if(sqlSegment.getStart()!=null&&!"".equals(sqlSegment.getStart())) {
map.put(sqlSegment.getStart(), sqlSegment.getBodyPieces());
if(Asserts.isNotNullString(sqlSegment.getStart())) {
map.put(sqlSegment.getStart().toUpperCase(), sqlSegment.getBodyPieces());
}
}
return map;
......@@ -54,9 +56,9 @@ public abstract class BaseSingleSqlParser {
public String getParsedSql() {
StringBuffer sb = new StringBuffer();
for (SqlSegment sqlSegment : segments) {
sb.append(sqlSegment.getParsedSqlSegment() + "n");
sb.append(sqlSegment.getParsedSqlSegment() + "\n");
}
String retval = sb.toString().replaceAll("n+", "n");
String retval = sb.toString().replaceAll("\n+", "\n");
return retval;
}
......
package com.dlink.parser;
/**
* SetSqlParser
*
* @author wenmo
* @since 2021/10/21 18:41
**/
public class SetSqlParser extends BaseSingleSqlParser {
public SetSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
//SET(\s+(\S+)\s*=(.*))?
segments.add(new SqlSegment("(set)\\s+(.+)(\\s*=)", "[.]"));
segments.add(new SqlSegment("(=)\\s*(.*)( ENDOFSQL)", ","));
}
}
......@@ -33,6 +33,8 @@ public class SingleSqlParserFactory {
} else if (contains(sql, "(create\\s+database)(.+)")) {
} else if (contains(sql, "(show\\s+databases)")) {
} else if (contains(sql, "(use)(.+)")) {
} else if (contains(sql, "(set)(.+)")) {
tmp = new SetSqlParser(sql);
} else {
}
return tmp.splitSql2Segment();
......
......@@ -15,6 +15,13 @@ public class AbstractOperation {
protected String statement;
public AbstractOperation() {
}
public AbstractOperation(String statement) {
this.statement = statement;
}
public String getStatement() {
return statement;
}
......
package com.dlink.trans;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.parser.SqlType;
import com.dlink.trans.ddl.CreateAggTableOperation;
import com.dlink.trans.ddl.SetOperation;
/**
* Operations
......@@ -14,7 +13,8 @@ import com.dlink.trans.ddl.CreateAggTableOperation;
public class Operations {
private static Operation[] operations = {
new CreateAggTableOperation()
new CreateAggTableOperation(),
new SetOperation()
};
public static SqlType getSqlTypeFromStatements(String statement){
......@@ -33,7 +33,7 @@ public class Operations {
}
public static SqlType getOperationType(String sql) {
String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").toUpperCase();
String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").trim().toUpperCase();
SqlType type = SqlType.UNKNOWN;
for (SqlType sqlType : SqlType.values()) {
if (sqlTrim.startsWith(sqlType.getType())) {
......
package com.dlink.trans.ddl;
import com.dlink.constant.FlinkFunctionConstant;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import com.dlink.ud.udf.GetKey;
import com.dlink.ud.udtaf.RowsToMap;
import org.apache.flink.table.api.Table;
import java.util.List;
......@@ -24,7 +21,7 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
}
public CreateAggTableOperation(String statement) {
this.statement = statement;
super(statement);
}
@Override
......
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* SetOperation
*
* @author wenmo
* @since 2021/10/21 19:56
**/
public class SetOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "SET";
public SetOperation() {
}
public SetOperation(String statement) {
super(statement);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public Operation create(String statement) {
return new SetOperation(statement);
}
@Override
public void build(CustomTableEnvironmentImpl stEnvironment) {
Map<String,List<String>> map = SingleSqlParserFactory.generateParser(statement);
if(Asserts.isNotNullMap(map)&&map.size()==2) {
Map<String, String> confMap = new HashMap<>();
confMap.put(StringUtils.join(map.get("SET"), "."), StringUtils.join(map.get("="), ","));
stEnvironment.getConfig().addConfiguration(Configuration.fromMap(confMap));
}
}
}
......@@ -41,4 +41,11 @@ public class SqlParserTest {
System.out.println(lists.toString());
System.out.println(StringUtils.join(lists.get("SELECT"),","));
}
@Test
public void setTest(){
String sql = "set table.exec.resource.default-parallelism = 2";
Map<String,List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString());
}
}
......@@ -329,6 +329,9 @@ export default (): React.ReactNode => {
<li>
<Link>优化在注册 Flink 集群时的链接检测与异常输出</Link>
</li>
<li>
<Link>支持set语法来设置执行环境参数</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment