Unverified Commit a95ce812 authored by Kerwin's avatar Kerwin Committed by GitHub

Added dlink-executor module code style. (#919)

parent 535dc9ef
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.catalog.function;
import com.dlink.constant.FlinkFunctionConstant;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.catalog.function;
import org.apache.flink.table.functions.FunctionDefinition;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.constant;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.constant;
public interface FlinkFunctionConstant {
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.constant;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.exception;
/**
......
......@@ -17,10 +17,10 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......
......@@ -17,10 +17,10 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......
......@@ -17,12 +17,12 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkConstant;
import com.dlink.constant.NetConstant;
import lombok.Getter;
import lombok.Setter;
......
......@@ -17,16 +17,13 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.interceptor.FlinkInterceptorResult;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
......@@ -48,6 +45,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Executor
*
......
......@@ -17,20 +17,21 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
/**
* ExecutorSetting
*
......@@ -107,7 +108,8 @@ public class ExecutorSetting {
this.config = config;
}
public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boolean useSqlFragment, boolean useStatementSet, boolean useBatchModel, String savePointPath, String jobName, String configJson) {
public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boolean useSqlFragment, boolean useStatementSet,
boolean useBatchModel, String savePointPath, String jobName, String configJson) {
List<Map<String, String>> configList = new ArrayList<>();
if (Asserts.isNotNullString(configJson)) {
try {
......@@ -144,14 +146,14 @@ public class ExecutorSetting {
@Override
public String toString() {
return "ExecutorSetting{" +
"checkpoint=" + checkpoint +
", parallelism=" + parallelism +
", useSqlFragment=" + useSqlFragment +
", useStatementSet=" + useStatementSet +
", savePointPath='" + savePointPath + '\'' +
", jobName='" + jobName + '\'' +
", config=" + config +
'}';
return "ExecutorSetting{"
+ "checkpoint=" + checkpoint
+ ", parallelism=" + parallelism
+ ", useSqlFragment=" + useSqlFragment
+ ", useStatementSet=" + useStatementSet
+ ", savePointPath='" + savePointPath + '\''
+ ", jobName='" + jobName + '\''
+ ", config=" + config
+ '}';
}
}
......@@ -17,15 +17,13 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* LocalBatchExecutor
*
......
......@@ -17,15 +17,13 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* LocalStreamExecuter
*
......
......@@ -17,15 +17,13 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* RemoteBatchExecutor
*
......
......@@ -17,15 +17,13 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* RemoteStreamExecutor
*
......
......@@ -17,13 +17,17 @@
*
*/
package com.dlink.executor;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static java.lang.String.format;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.model.SystemConfiguration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
......@@ -41,10 +45,6 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.model.SystemConfiguration;
/**
* Flink Sql Fragment Manager
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.interceptor;
import com.dlink.assertion.Asserts;
......@@ -25,6 +24,7 @@ import com.dlink.executor.Executor;
import com.dlink.trans.Operation;
import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import org.apache.flink.table.api.TableResult;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.interceptor;
import org.apache.flink.table.api.TableResult;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
import com.dlink.assertion.Asserts;
......@@ -41,7 +40,6 @@ public abstract class BaseSingleSqlParser {
//Sql语句片段
protected List<SqlSegment> segments;
/**
* 构造函数,传入原始Sql语句,进行劈分。
**/
......@@ -64,7 +62,6 @@ public abstract class BaseSingleSqlParser {
for (SqlSegment sqlSegment : segments) {
sqlSegment.parse(originalSql);
if (Asserts.isNotNullString(sqlSegment.getStart())) {
// map.put(sqlSegment.getStart().toUpperCase(), sqlSegment.getBodyPieces());
map.put(sqlSegment.getType().toUpperCase(), sqlSegment.getBodyPieces());
}
}
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
import java.util.List;
......@@ -35,7 +34,7 @@ public class SingleSqlParserFactory {
public static Map<String, List<String>> generateParser(String sql) {
BaseSingleSqlParser tmp = null;
// sql = sql.replace("\n"," ").replaceAll("\\s{1,}", " ") +" ENDOFSQL";
//sql = sql.replace("\n"," ").replaceAll("\\s{1,}", " ") +" ENDOFSQL";
sql = sql.replace("\r\n", " ").replace("\n", " ") + " ENDOFSQL";
if (contains(sql, "(insert\\s+into)(.+)(select)(.+)(from)(.+)")) {
tmp = new InsertSelectSqlParser(sql);
......@@ -59,7 +58,6 @@ public class SingleSqlParserFactory {
tmp = new SetSqlParser(sql);
} else if (contains(sql, "(show\\s+fragment)\\s+(.+)")) {
tmp = new ShowFragmentParser(sql);
} else {
}
return tmp.splitSql2Segment();
}
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
import com.dlink.assertion.Asserts;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.parser;
/**
......
......@@ -17,16 +17,16 @@
*
*/
package com.dlink.trans;
import com.dlink.executor.CustomTableEnvironmentImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* AbstractOperation
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.trans;
/**
......
......@@ -17,10 +17,10 @@
*
*/
package com.dlink.trans;
import com.dlink.executor.Executor;
import org.apache.flink.table.api.TableResult;
/**
......
......@@ -17,11 +17,14 @@
*
*/
package com.dlink.trans;
import com.dlink.parser.SqlType;
import com.dlink.trans.ddl.*;
import com.dlink.trans.ddl.CreateAggTableOperation;
import com.dlink.trans.ddl.CreateCDCSourceOperation;
import com.dlink.trans.ddl.SetOperation;
import com.dlink.trans.ddl.ShowFragmentOperation;
import com.dlink.trans.ddl.ShowFragmentsOperation;
/**
* Operations
......@@ -32,7 +35,7 @@ import com.dlink.trans.ddl.*;
public class Operations {
private static Operation[] operations = {
new CreateAggTableOperation()
new CreateAggTableOperation()
, new SetOperation()
, new CreateCDCSourceOperation()
, new ShowFragmentsOperation()
......
......@@ -17,10 +17,10 @@
*
*/
package com.dlink.trans.ddl;
import com.dlink.parser.SingleSqlParserFactory;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
......
......@@ -17,18 +17,17 @@
*
*/
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
import com.dlink.parser.SingleSqlParserFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.dlink.assertion.Asserts;
import com.dlink.parser.SingleSqlParserFactory;
/**
* CDCSource
*
......
......@@ -17,12 +17,12 @@
*
*/
package com.dlink.trans.ddl;
import com.dlink.executor.Executor;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
......@@ -36,7 +36,7 @@ import java.util.List;
*/
public class CreateAggTableOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "CREATE AGGTABLE";
private static final String KEY_WORD = "CREATE AGGTABLE";
public CreateAggTableOperation() {
}
......
......@@ -17,17 +17,8 @@
*
*/
package com.dlink.trans.ddl;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.CDCBuilderFactory;
......@@ -41,6 +32,14 @@ import com.dlink.model.Table;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* CreateCDCSourceOperation
*
......@@ -49,7 +48,7 @@ import com.dlink.trans.Operation;
*/
public class CreateCDCSourceOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "EXECUTE CDCSOURCE";
private static final String KEY_WORD = "EXECUTE CDCSOURCE";
public CreateCDCSourceOperation() {
}
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
......@@ -25,6 +24,7 @@ import com.dlink.executor.Executor;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableResult;
......@@ -41,7 +41,7 @@ import java.util.Map;
**/
public class SetOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "SET";
private static final String KEY_WORD = "SET";
public SetOperation() {
}
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
......@@ -25,6 +24,7 @@ import com.dlink.executor.Executor;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.TableResult;
......@@ -38,7 +38,7 @@ import java.util.Map;
* @since 2022/2/17 17:08
**/
public class ShowFragmentOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "SHOW FRAGMENT ";
private static final String KEY_WORD = "SHOW FRAGMENT ";
public ShowFragmentOperation() {
}
......
......@@ -17,12 +17,12 @@
*
*/
package com.dlink.trans.ddl;
import com.dlink.executor.Executor;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.table.api.TableResult;
/**
......@@ -33,7 +33,7 @@ import org.apache.flink.table.api.TableResult;
**/
public class ShowFragmentsOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "SHOW FRAGMENTS";
private static final String KEY_WORD = "SHOW FRAGMENTS";
public ShowFragmentsOperation() {
}
......
......@@ -17,11 +17,11 @@
*
*/
package com.dlink.core;
import com.dlink.executor.Executor;
import com.dlink.interceptor.FlinkInterceptor;
import org.junit.Assert;
import org.junit.Test;
......@@ -34,10 +34,10 @@ import org.junit.Test;
public class FlinkInterceptorTest {
@Test
public void replaceFragmentTest(){
String statement = "nullif1:=NULLIF(1, 0) as val;" +
"nullif2:=NULLIF(0, 0) as val$null;" +
"select ${nullif1},${nullif2}";
public void replaceFragmentTest() {
String statement = "nullif1:=NULLIF(1, 0) as val;"
+ "nullif2:=NULLIF(0, 0) as val$null;"
+ "select ${nullif1},${nullif2}";
String pretreatStatement = FlinkInterceptor.pretreatStatement(Executor.build(), statement);
Assert.assertEquals("select NULLIF(1, 0) as val,NULLIF(0, 0) as val$null",pretreatStatement);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment