Commit bfdac7ca authored by wenmo's avatar wenmo

数据源fragment配置和查询fragment语法

parent 9aac98c6
......@@ -20,7 +20,7 @@ import java.util.Map;
*/
@Getter
@Setter
public class APIExecuteSqlDTO extends AbstractStatementDTO{
public class APIExecuteSqlDTO extends AbstractStatementDTO {
// RUN_MODE
private String type;
private boolean useResult = false;
......@@ -28,8 +28,6 @@ public class APIExecuteSqlDTO extends AbstractStatementDTO{
private boolean useAutoCancel = false;
private boolean useStatementSet = false;
private String address;
private boolean fragment = false;
// private String statement;
private String jobName;
private Integer maxRowNum = 100;
private Integer checkPoint = 0;
......@@ -44,8 +42,8 @@ public class APIExecuteSqlDTO extends AbstractStatementDTO{
savePointStrategy = 3;
}
return new JobConfig(
type, useResult,useChangeLog, useChangeLog, false, null, true, address, jobName,
fragment, useStatementSet, maxRowNum, checkPoint, parallelism, savePointStrategy,
type, useResult, useChangeLog, useChangeLog, false, null, true, address, jobName,
isFragment(), useStatementSet, maxRowNum, checkPoint, parallelism, savePointStrategy,
savePointPath, configuration, gatewayConfig);
}
}
......@@ -17,14 +17,12 @@ import java.util.Map;
*/
@Getter
@Setter
public class APIExplainSqlDTO extends AbstractStatementDTO{
public class APIExplainSqlDTO extends AbstractStatementDTO {
private boolean useStatementSet = false;
private boolean fragment = false;
// private String statement;
private Integer parallelism;
private Map<String, String> configuration;
public JobConfig getJobConfig() {
return new JobConfig("local", false, false, fragment, useStatementSet, parallelism, configuration);
return new JobConfig("local", false, false, isFragment(), useStatementSet, parallelism, configuration);
}
}
......@@ -10,6 +10,8 @@ public class AbstractStatementDTO {
private String statement;
private Integer envId;
private boolean fragment = false;
public String getStatement() {
return statement;
......@@ -26,4 +28,12 @@ public class AbstractStatementDTO {
public void setEnvId(Integer envId) {
this.envId = envId;
}
public boolean isFragment() {
return fragment;
}
public void setFragment(boolean fragment) {
this.fragment = fragment;
}
}
......@@ -19,7 +19,7 @@ import java.util.Map;
*/
@Getter
@Setter
public class StudioExecuteDTO extends AbstractStatementDTO{
public class StudioExecuteDTO extends AbstractStatementDTO {
// RUN_MODE
private String type;
private String dialect;
......@@ -34,7 +34,6 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
private Integer clusterConfigurationId;
private Integer databaseId;
private Integer jarId;
private boolean fragment;
private String jobName;
private Integer taskId;
private Integer maxRowNum;
......@@ -46,9 +45,9 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
private static final ObjectMapper mapper = new ObjectMapper();
public JobConfig getJobConfig() {
Map<String,String> config = new HashMap<>();
Map<String, String> config = new HashMap<>();
JsonNode paras = null;
if(Asserts.isNotNullString(configJson)) {
if (Asserts.isNotNullString(configJson)) {
try {
paras = mapper.readTree(configJson);
} catch (JsonProcessingException e) {
......@@ -60,8 +59,8 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
);
}
return new JobConfig(
type,useResult,useChangeLog,useAutoCancel, useSession, session, clusterId,
clusterConfigurationId,jarId, taskId, jobName, fragment,statementSet,batchModel,
maxRowNum, checkPoint, parallelism,savePointStrategy, savePointPath,config);
type, useResult, useChangeLog, useAutoCancel, useSession, session, clusterId,
clusterConfigurationId, jarId, taskId, jobName, isFragment(), statementSet, batchModel,
maxRowNum, checkPoint, parallelism, savePointStrategy, savePointPath, config);
}
}
......@@ -276,6 +276,12 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private JobConfig buildJobConfig(Task task) {
boolean isJarTask = Dialect.FLINKJAR.equalsVal(task.getDialect());
if (!isJarTask && task.isFragment()) {
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
if (Asserts.isNotNullString(flinkWithSql)) {
task.setStatement(flinkWithSql + "\r\n" + task.getStatement());
}
}
if (!isJarTask && Asserts.isNotNull(task.getEnvId()) && task.getEnvId() != 0) {
Task envTask = getTaskInfoById(task.getEnvId());
if (Asserts.isNotNull(envTask) && Asserts.isNotNullString(envTask.getStatement())) {
......
......@@ -78,4 +78,12 @@ public class SqlParserTest {
Map<String,List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString());
}
@Test
public void showFragmentTest(){
String sql = "show fragment test";
Map<String,List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString());
}
}
......@@ -2,6 +2,7 @@ package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.interceptor.FlinkInterceptorResult;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -197,8 +198,8 @@ public abstract class Executor {
return FlinkInterceptor.pretreatStatement(this, statement);
}
private boolean pretreatExecute(String statement) {
return !FlinkInterceptor.build(this, statement);
private FlinkInterceptorResult pretreatExecute(String statement) {
return FlinkInterceptor.build(this, statement);
}
public JobExecutionResult execute(String jobName) throws Exception {
......@@ -207,7 +208,11 @@ public abstract class Executor {
public TableResult executeSql(String statement) {
statement = pretreatStatement(statement);
if (pretreatExecute(statement)) {
FlinkInterceptorResult flinkInterceptorResult = pretreatExecute(statement);
if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) {
return flinkInterceptorResult.getTableResult();
}
if (!flinkInterceptorResult.isNoExecute()) {
return stEnvironment.executeSql(statement);
} else {
return CustomTableResultImpl.TABLE_RESULT_OK;
......@@ -216,7 +221,7 @@ public abstract class Executor {
public String explainSql(String statement, ExplainDetail... extraDetails) {
statement = pretreatStatement(statement);
if (pretreatExecute(statement)) {
if (!pretreatExecute(statement).isNoExecute()) {
return stEnvironment.explainSql(statement, extraDetails);
} else {
return "";
......@@ -225,7 +230,7 @@ public abstract class Executor {
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
statement = pretreatStatement(statement);
if (Asserts.isNotNullString(statement) && pretreatExecute(statement)) {
if (Asserts.isNotNullString(statement) && !pretreatExecute(statement).isNoExecute()) {
return stEnvironment.explainSqlRecord(statement, extraDetails);
} else {
return null;
......@@ -234,7 +239,7 @@ public abstract class Executor {
public ObjectNode getStreamGraph(String statement) {
statement = pretreatStatement(statement);
if (pretreatExecute(statement)) {
if (!pretreatExecute(statement).isNoExecute()) {
return stEnvironment.getStreamGraph(statement);
} else {
return null;
......
......@@ -11,12 +11,7 @@ import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -26,6 +21,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
*
* @author wenmo
* @since 2021/6/7 22:06
**/
......@@ -111,6 +107,16 @@ public final class SqlManager {
}
}
public TableResult getSqlFragmentResult(String sqlFragmentName) {
if (Asserts.isNullString(sqlFragmentName)) {
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("fragment", DataTypes.STRING()))), new ArrayList<>());
}
String sqlFragment = getSqlFragment(sqlFragmentName);
List<Row> rows = new ArrayList<>();
rows.add(Row.of(sqlFragment));
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("fragment", DataTypes.STRING()))), rows);
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
......@@ -126,7 +132,7 @@ public final class SqlManager {
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows);
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("fragmentName", DataTypes.STRING()))), rows);
}
public Iterator getSqlFragmentsIterator() {
......@@ -141,9 +147,10 @@ public final class SqlManager {
return environment.fromValues(keys);
}
public boolean checkShowFragments(String sql){
public boolean checkShowFragments(String sql) {
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
......
......@@ -3,11 +3,12 @@ package com.dlink.interceptor;
import com.dlink.assertion.Asserts;
import com.dlink.catalog.function.FunctionManager;
import com.dlink.catalog.function.UDFunction;
import com.dlink.executor.Executor;
import com.dlink.executor.CustomTableEnvironmentImpl;
import com.dlink.executor.Executor;
import com.dlink.trans.Operation;
import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
......@@ -27,7 +28,7 @@ public class FlinkInterceptor {
public static String pretreatStatement(Executor executor, String statement) {
statement = SqlUtil.removeNote(statement);
if(executor.isUseSqlFragment()) {
if (executor.isUseSqlFragment()) {
statement = executor.getSqlManager().parseVariable(statement);
}
// initFunctions(executor.getCustomTableEnvironmentImpl(), statement);
......@@ -35,13 +36,15 @@ public class FlinkInterceptor {
}
// return false to continue with executeSql
public static boolean build(Executor executor, String statement) {
public static FlinkInterceptorResult build(Executor executor, String statement) {
boolean noExecute = false;
TableResult tableResult = null;
Operation operation = Operations.buildOperation(statement);
if (Asserts.isNotNull(operation)) {
operation.build(executor);
return operation.noExecute();
tableResult = operation.build(executor);
noExecute = operation.noExecute();
}
return false;
return FlinkInterceptorResult.build(noExecute, tableResult);
}
@Deprecated
......
package com.dlink.interceptor;
import org.apache.flink.table.api.TableResult;
/**
* FlinkInterceptorResult
*
* @author wenmo
* @since 2022/2/17 16:36
**/
public class FlinkInterceptorResult {
private boolean noExecute;
private TableResult tableResult;
public FlinkInterceptorResult() {
}
public FlinkInterceptorResult(boolean noExecute, TableResult tableResult) {
this.noExecute = noExecute;
this.tableResult = tableResult;
}
public boolean isNoExecute() {
return noExecute;
}
public void setNoExecute(boolean noExecute) {
this.noExecute = noExecute;
}
public TableResult getTableResult() {
return tableResult;
}
public void setTableResult(TableResult tableResult) {
this.tableResult = tableResult;
}
public static FlinkInterceptorResult buildResult(TableResult tableResult) {
return new FlinkInterceptorResult(false, tableResult);
}
public static FlinkInterceptorResult build(boolean noExecute, TableResult tableResult) {
return new FlinkInterceptorResult(noExecute, tableResult);
}
}
package com.dlink.parser;
/**
* ShowFragmentsParser
*
* @author wenmo
* @since 2022/2/17 16:19
**/
public class ShowFragmentParser extends BaseSingleSqlParser {
public ShowFragmentParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
//SHOW FRAGMENT (.+)
segments.add(new SqlSegment("FRAGMENT", "(show\\s+fragment)\\s+(.*)( ENDOFSQL)", ","));
}
}
......@@ -13,10 +13,10 @@ import java.util.regex.Pattern;
*/
public class SingleSqlParserFactory {
public static Map<String,List<String>> generateParser(String sql) {
public static Map<String, List<String>> generateParser(String sql) {
BaseSingleSqlParser tmp = null;
// sql = sql.replace("\n"," ").replaceAll("\\s{1,}", " ") +" ENDOFSQL";
sql = sql.replace("\r\n"," ").replace("\n"," ") +" ENDOFSQL";
sql = sql.replace("\r\n", " ").replace("\n", " ") + " ENDOFSQL";
if (contains(sql, "(insert\\s+into)(.+)(select)(.+)(from)(.+)")) {
tmp = new InsertSelectSqlParser(sql);
} else if (contains(sql, "(create\\s+aggtable)(.+)(as\\s+select)(.+)")) {
......@@ -37,6 +37,8 @@ public class SingleSqlParserFactory {
} else if (contains(sql, "(use)(.+)")) {
} else if (contains(sql, "(set)(.+)")) {
tmp = new SetSqlParser(sql);
} else if (contains(sql, "(show\\s+fragment)\\s+(.+)")) {
tmp = new ShowFragmentParser(sql);
} else {
}
return tmp.splitSql2Segment();
......
package com.dlink.trans;
import com.dlink.executor.Executor;
import org.apache.flink.table.api.TableResult;
/**
* Operation
......@@ -14,7 +15,7 @@ public interface Operation {
Operation create(String statement);
void build(Executor executor);
TableResult build(Executor executor);
boolean noExecute();
}
package com.dlink.trans;
import com.dlink.parser.SqlType;
import com.dlink.trans.ddl.CreateAggTableOperation;
import com.dlink.trans.ddl.CreateCDCSourceOperation;
import com.dlink.trans.ddl.SetOperation;
import com.dlink.trans.ddl.*;
/**
* Operations
......@@ -17,9 +15,11 @@ public class Operations {
new CreateAggTableOperation()
, new SetOperation()
, new CreateCDCSourceOperation()
, new ShowFragmentsOperation()
, new ShowFragmentOperation()
};
public static SqlType getSqlTypeFromStatements(String statement){
public static SqlType getSqlTypeFromStatements(String statement) {
String[] statements = statement.split(";");
SqlType sqlType = SqlType.UNKNOWN;
for (String item : statements) {
......@@ -27,7 +27,7 @@ public class Operations {
continue;
}
sqlType = Operations.getOperationType(item);
if(sqlType == SqlType.INSERT ||sqlType == SqlType.SELECT){
if (sqlType == SqlType.INSERT || sqlType == SqlType.SELECT) {
return sqlType;
}
}
......@@ -46,10 +46,10 @@ public class Operations {
return type;
}
public static Operation buildOperation(String statement){
String sql = statement.replace("\n"," ").replaceAll("\\s{1,}", " ").trim().toUpperCase();
public static Operation buildOperation(String statement) {
String sql = statement.replace("\n", " ").replaceAll("\\s{1,}", " ").trim().toUpperCase();
for (int i = 0; i < operations.length; i++) {
if(sql.startsWith(operations[i].getHandle())){
if (sql.startsWith(operations[i].getHandle())) {
return operations[i].create(statement);
}
}
......
......@@ -4,6 +4,7 @@ import com.dlink.executor.Executor;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import java.util.List;
......@@ -13,7 +14,7 @@ import java.util.List;
* @author wenmo
* @since 2021/6/13 19:24
*/
public class CreateAggTableOperation extends AbstractOperation implements Operation{
public class CreateAggTableOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "CREATE AGGTABLE";
......@@ -35,11 +36,11 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
}
@Override
public void build(Executor executor) {
public TableResult build(Executor executor) {
AggTable aggTable = AggTable.build(statement);
Table source = executor.getCustomTableEnvironment().sqlQuery("select * from "+ aggTable.getTable());
Table source = executor.getCustomTableEnvironment().sqlQuery("select * from " + aggTable.getTable());
List<String> wheres = aggTable.getWheres();
if(wheres!=null&&wheres.size()>0) {
if (wheres != null && wheres.size() > 0) {
for (String s : wheres) {
source = source.filter(s);
}
......@@ -48,5 +49,6 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
.flatAggregate(aggTable.getAggBy())
.select(aggTable.getColumns());
executor.getCustomTableEnvironment().registerTable(aggTable.getName(), sink);
return null;
}
}
......@@ -5,9 +5,10 @@ import com.dlink.executor.Executor;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.table.api.TableResult;
/**
* TODO
* CreateCDCSourceOperation
*
* @author wenmo
* @since 2022/1/29 23:25
......@@ -34,15 +35,16 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
}
@Override
public void build(Executor executor) {
public TableResult build(Executor executor) {
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getHostname(),cdcSource.getPort(),cdcSource.getUsername()
,cdcSource.getPassword(),cdcSource.getCheckpoint(),cdcSource.getParallelism(),cdcSource.getDatabase(),cdcSource.getTable()
,cdcSource.getStartupMode(),cdcSource.getTopic(),cdcSource.getBrokers());
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getTable()
, cdcSource.getStartupMode(), cdcSource.getTopic(), cdcSource.getBrokers());
try {
FlinkCDCMergeBuilder.buildMySqlCDC(executor.getStreamExecutionEnvironment(),config);
FlinkCDCMergeBuilder.buildMySqlCDC(executor.getStreamExecutionEnvironment(), config);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
......@@ -7,6 +7,7 @@ import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableResult;
import java.util.HashMap;
import java.util.List;
......@@ -40,22 +41,23 @@ public class SetOperation extends AbstractOperation implements Operation {
}
@Override
public void build(Executor executor) {
public TableResult build(Executor executor) {
try {
if(null != Class.forName("org.apache.log4j.Logger")){
if (null != Class.forName("org.apache.log4j.Logger")) {
executor.parseAndLoadConfiguration(statement);
return;
return null;
}
} catch (ClassNotFoundException e) {
}
Map<String,List<String>> map = SingleSqlParserFactory.generateParser(statement);
if(Asserts.isNotNullMap(map)&&map.size()==2) {
Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement);
if (Asserts.isNotNullMap(map) && map.size() == 2) {
Map<String, String> confMap = new HashMap<>();
confMap.put(StringUtils.join(map.get("SET"), "."), StringUtils.join(map.get("="), ","));
executor.getCustomTableEnvironment().getConfig().addConfiguration(Configuration.fromMap(confMap));
Configuration configuration = Configuration.fromMap(confMap);
executor.getExecutionConfig().configure(configuration,null);
executor.getExecutionConfig().configure(configuration, null);
executor.getCustomTableEnvironment().getConfig().addConfiguration(configuration);
}
return null;
}
}
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
import com.dlink.executor.Executor;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.TableResult;
import java.util.List;
import java.util.Map;
/**
* ShowFragmentOperation
*
* @author wenmo
* @since 2022/2/17 17:08
**/
public class ShowFragmentOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "SHOW FRAGMENT ";
public ShowFragmentOperation() {
}
public ShowFragmentOperation(String statement) {
super(statement);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public Operation create(String statement) {
return new ShowFragmentOperation(statement);
}
@Override
public TableResult build(Executor executor) {
Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement);
if (Asserts.isNotNullMap(map)) {
if (map.containsKey("FRAGMENT")) {
return executor.getSqlManager().getSqlFragmentResult(StringUtils.join(map.get("FRAGMENT"), ""));
}
}
return executor.getSqlManager().getSqlFragmentResult(null);
}
}
package com.dlink.trans.ddl;
import com.dlink.executor.Executor;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.table.api.TableResult;
/**
* ShowFragmentsOperation
*
* @author wenmo
* @since 2022/2/17 16:31
**/
public class ShowFragmentsOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "SHOW FRAGMENTS";
public ShowFragmentsOperation() {
}
public ShowFragmentsOperation(String statement) {
super(statement);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public Operation create(String statement) {
return new ShowFragmentsOperation(statement);
}
@Override
public TableResult build(Executor executor) {
return executor.getSqlManager().getSqlFragments();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment