Unverified Commit 710c7eee authored by Licho's avatar Licho Committed by GitHub

refactor: simplify Executor code, and log and extract common code fragment (#1166)

* refactor: simplify Executor code, and log and extract common code fragment.

* refactor: remove @SLf4j annotation
Signed-off-by: 's avatarLicho <chinabhsun@gmail.com>
Signed-off-by: 's avatarLicho <chinabhsun@gmail.com>
parent 8ca8a953
...@@ -29,10 +29,10 @@ import org.apache.flink.table.api.TableResult; ...@@ -29,10 +29,10 @@ import org.apache.flink.table.api.TableResult;
**/ **/
public class SelectResultBuilder implements ResultBuilder { public class SelectResultBuilder implements ResultBuilder {
private Integer maxRowNum; private final Integer maxRowNum;
private boolean isChangeLog; private final boolean isChangeLog;
private boolean isAutoCancel; private final boolean isAutoCancel;
private String timeZone; private final String timeZone;
public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) { public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
this.maxRowNum = maxRowNum; this.maxRowNum = maxRowNum;
......
...@@ -51,6 +51,9 @@ import java.util.HashMap; ...@@ -51,6 +51,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
...@@ -63,6 +66,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; ...@@ -63,6 +66,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
**/ **/
public abstract class Executor { public abstract class Executor {
private static final Logger logger = LoggerFactory.getLogger(Executor.class);
protected StreamExecutionEnvironment environment; protected StreamExecutionEnvironment environment;
protected CustomTableEnvironment stEnvironment; protected CustomTableEnvironment stEnvironment;
protected EnvironmentSetting environmentSetting; protected EnvironmentSetting environmentSetting;
...@@ -72,14 +77,6 @@ public abstract class Executor { ...@@ -72,14 +77,6 @@ public abstract class Executor {
protected SqlManager sqlManager = new SqlManager(); protected SqlManager sqlManager = new SqlManager();
protected boolean useSqlFragment = true; protected boolean useSqlFragment = true;
public SqlManager getSqlManager() {
return sqlManager;
}
public boolean isUseSqlFragment() {
return useSqlFragment;
}
public static Executor build() { public static Executor build() {
return new LocalStreamExecutor(ExecutorSetting.DEFAULT); return new LocalStreamExecutor(ExecutorSetting.DEFAULT);
} }
...@@ -117,6 +114,14 @@ public abstract class Executor { ...@@ -117,6 +114,14 @@ public abstract class Executor {
} }
} }
public SqlManager getSqlManager() {
return sqlManager;
}
public boolean isUseSqlFragment() {
return useSqlFragment;
}
public ExecutionConfig getExecutionConfig() { public ExecutionConfig getExecutionConfig() {
return environment.getConfig(); return environment.getConfig();
} }
...@@ -164,25 +169,14 @@ public abstract class Executor { ...@@ -164,25 +169,14 @@ public abstract class Executor {
} }
public void initEnvironment() { public void initEnvironment() {
/*if (executorSetting.getCheckpoint() != null && executorSetting.getCheckpoint() > 0) { updateEnvironment(executorSetting);
environment.enableCheckpointing(executorSetting.getCheckpoint());
}*/
if (executorSetting.getParallelism() != null && executorSetting.getParallelism() > 0) {
environment.setParallelism(executorSetting.getParallelism());
}
if (executorSetting.getConfig() != null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
} }
public void updateEnvironment(ExecutorSetting executorSetting) { public void updateEnvironment(ExecutorSetting executorSetting) {
/*if (executorSetting.getCheckpoint() != null && executorSetting.getCheckpoint() > 0) { if (executorSetting.isValidParallelism()) {
environment.enableCheckpointing(executorSetting.getCheckpoint());
}*/
if (executorSetting.getParallelism() != null && executorSetting.getParallelism() > 0) {
environment.setParallelism(executorSetting.getParallelism()); environment.setParallelism(executorSetting.getParallelism());
} }
if (executorSetting.getConfig() != null) { if (executorSetting.getConfig() != null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig()); Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null); environment.getConfig().configure(configuration, null);
...@@ -192,43 +186,34 @@ public abstract class Executor { ...@@ -192,43 +186,34 @@ public abstract class Executor {
abstract CustomTableEnvironment createCustomTableEnvironment(); abstract CustomTableEnvironment createCustomTableEnvironment();
private void initStreamExecutionEnvironment() { private void initStreamExecutionEnvironment() {
useSqlFragment = executorSetting.isUseSqlFragment(); updateStreamExecutionEnvironment(executorSetting);
stEnvironment = createCustomTableEnvironment();
if (executorSetting.getJobName() != null && !"".equals(executorSetting.getJobName())) {
stEnvironment.getConfig().getConfiguration().setString(PipelineOptions.NAME.key(), executorSetting.getJobName());
}
setConfig.put(PipelineOptions.NAME.key(), executorSetting.getJobName());
if (executorSetting.getConfig() != null) {
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
}
}
} }
private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting) { private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting) {
useSqlFragment = executorSetting.isUseSqlFragment(); useSqlFragment = executorSetting.isUseSqlFragment();
copyCatalog();
if (executorSetting.getJobName() != null && !"".equals(executorSetting.getJobName())) { CustomTableEnvironment newestEnvironment = createCustomTableEnvironment();
stEnvironment.getConfig().getConfiguration().setString(PipelineOptions.NAME.key(), executorSetting.getJobName()); if (stEnvironment != null) {
} for (String catalog : stEnvironment.listCatalogs()) {
setConfig.put(PipelineOptions.NAME.key(), executorSetting.getJobName()); stEnvironment.getCatalog(catalog).ifPresent(t -> {
if (executorSetting.getConfig() != null) { newestEnvironment.getCatalogManager().unregisterCatalog(catalog, true);
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) { newestEnvironment.registerCatalog(catalog, t);
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue()); });
} }
} }
stEnvironment = newestEnvironment;
final Configuration configuration = stEnvironment.getConfig().getConfiguration();
if (executorSetting.isValidJobName()) {
configuration.setString(PipelineOptions.NAME.key(), executorSetting.getJobName());
} }
private void copyCatalog() { setConfig.put(PipelineOptions.NAME.key(), executorSetting.getJobName());
String[] catalogs = stEnvironment.listCatalogs(); if (executorSetting.getConfig() != null) {
CustomTableEnvironment newstEnvironment = createCustomTableEnvironment(); for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
for (int i = 0; i < catalogs.length; i++) { configuration.setString(entry.getKey(), entry.getValue());
if (stEnvironment.getCatalog(catalogs[i]).isPresent()) {
newstEnvironment.getCatalogManager().unregisterCatalog(catalogs[i], true);
newstEnvironment.registerCatalog(catalogs[i], stEnvironment.getCatalog(catalogs[i]).get());
} }
} }
stEnvironment = newstEnvironment;
} }
public String pretreatStatement(String statement) { public String pretreatStatement(String statement) {
...@@ -285,14 +270,15 @@ public abstract class Executor { ...@@ -285,14 +270,15 @@ public abstract class Executor {
Method method = null; Method method = null;
try { try {
method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
} catch (NoSuchMethodException | SecurityException e1) { } catch (NoSuchMethodException | SecurityException e) {
e1.printStackTrace(); logger.error(e.getMessage());
} }
// 获取方法的访问权限 // 获取方法的访问权限
boolean accessible = method.isAccessible(); boolean accessible = method.isAccessible();
try { try {
// 修改访问权限为可写 // 修改访问权限为可写
if (accessible == false) { if (!accessible) {
method.setAccessible(true); method.setAccessible(true);
} }
// 获取系统类加载器 // 获取系统类加载器
...@@ -300,7 +286,7 @@ public abstract class Executor { ...@@ -300,7 +286,7 @@ public abstract class Executor {
// jar路径加入到系统url路径里 // jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl); method.invoke(classLoader, jarUrl);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.getMessage());
} finally { } finally {
method.setAccessible(accessible); method.setAccessible(accessible);
} }
...@@ -308,33 +294,37 @@ public abstract class Executor { ...@@ -308,33 +294,37 @@ public abstract class Executor {
public String explainSql(String statement, ExplainDetail... extraDetails) { public String explainSql(String statement, ExplainDetail... extraDetails) {
statement = pretreatStatement(statement); statement = pretreatStatement(statement);
if (!pretreatExecute(statement).isNoExecute()) { if (pretreatExecute(statement).isNoExecute()) {
return stEnvironment.explainSql(statement, extraDetails);
} else {
return ""; return "";
} }
return stEnvironment.explainSql(statement, extraDetails);
} }
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) { public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
statement = pretreatStatement(statement); statement = pretreatStatement(statement);
if (Asserts.isNotNullString(statement) && !pretreatExecute(statement).isNoExecute()) { if (Asserts.isNotNullString(statement) && !pretreatExecute(statement).isNoExecute()) {
return stEnvironment.explainSqlRecord(statement, extraDetails); return stEnvironment.explainSqlRecord(statement, extraDetails);
} else {
return null;
} }
return null;
} }
public ObjectNode getStreamGraph(String statement) { public ObjectNode getStreamGraph(String statement) {
statement = pretreatStatement(statement); statement = pretreatStatement(statement);
if (!pretreatExecute(statement).isNoExecute()) { if (pretreatExecute(statement).isNoExecute()) {
return stEnvironment.getStreamGraph(statement);
} else {
return null; return null;
} }
return stEnvironment.getStreamGraph(statement);
} }
public ObjectNode getStreamGraph(List<String> statements) { public ObjectNode getStreamGraph(List<String> statements) {
StreamGraph streamGraph = stEnvironment.getStreamGraphFromInserts(statements); StreamGraph streamGraph = stEnvironment.getStreamGraphFromInserts(statements);
return getStreamGraphJsonNode(streamGraph);
}
private ObjectNode getStreamGraphJsonNode(StreamGraph streamGraph) {
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph); JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON(); String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
...@@ -343,9 +333,9 @@ public abstract class Executor { ...@@ -343,9 +333,9 @@ public abstract class Executor {
objectNode = (ObjectNode) mapper.readTree(json); objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
e.printStackTrace(); e.printStackTrace();
} finally {
return objectNode;
} }
return objectNode;
} }
public StreamGraph getStreamGraph() { public StreamGraph getStreamGraph() {
...@@ -356,18 +346,9 @@ public abstract class Executor { ...@@ -356,18 +346,9 @@ public abstract class Executor {
for (String statement : statements) { for (String statement : statements) {
executeSql(statement); executeSql(statement);
} }
StreamGraph streamGraph = getStreamGraph(); StreamGraph streamGraph = getStreamGraph();
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph); return getStreamGraphJsonNode(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode = mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
} finally {
return objectNode;
}
} }
public JobPlanInfo getJobPlanInfo(List<String> statements) { public JobPlanInfo getJobPlanInfo(List<String> statements) {
...@@ -382,14 +363,6 @@ public abstract class Executor { ...@@ -382,14 +363,6 @@ public abstract class Executor {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(streamGraph.getJobGraph())); return new JobPlanInfo(JsonPlanGenerator.generatePlan(streamGraph.getJobGraph()));
} }
/*public void registerFunction(String name, ScalarFunction function){
stEnvironment.registerFunction(name,function);
}
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2){
stEnvironment.createTemporarySystemFunction(name,var2);
}*/
public CatalogManager getCatalogManager() { public CatalogManager getCatalogManager() {
return stEnvironment.getCatalogManager(); return stEnvironment.getCatalogManager();
} }
......
...@@ -26,6 +26,9 @@ import java.util.HashMap; ...@@ -26,6 +26,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
...@@ -42,7 +45,20 @@ import lombok.Setter; ...@@ -42,7 +45,20 @@ import lombok.Setter;
@Getter @Getter
public class ExecutorSetting { public class ExecutorSetting {
private boolean useBatchModel = false; private static final Logger log = LoggerFactory.getLogger(ExecutorSetting.class);
public static final ExecutorSetting DEFAULT = new ExecutorSetting(0, 1, true);
public static final String CHECKPOINT_CONST = "checkpoint";
public static final String PARALLELISM_CONST = "parallelism";
public static final String USE_SQL_FRAGMENT = "useSqlFragment";
public static final String USE_STATEMENT_SET = "useStatementSet";
public static final String USE_BATCH_MODEL = "useBatchModel";
public static final String SAVE_POINT_PATH = "savePointPath";
public static final String JOB_NAME = "jobName";
public static final String CONFIG_CONST = "config";
private static final ObjectMapper mapper = new ObjectMapper();
private boolean useBatchModel;
private Integer checkpoint; private Integer checkpoint;
private Integer parallelism; private Integer parallelism;
private boolean useSqlFragment; private boolean useSqlFragment;
...@@ -50,54 +66,37 @@ public class ExecutorSetting { ...@@ -50,54 +66,37 @@ public class ExecutorSetting {
private String savePointPath; private String savePointPath;
private String jobName; private String jobName;
private Map<String, String> config; private Map<String, String> config;
public static final ExecutorSetting DEFAULT = new ExecutorSetting(0, 1, true);
private static final ObjectMapper mapper = new ObjectMapper();
public ExecutorSetting(boolean useSqlFragment) { public ExecutorSetting(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment; this(null, useSqlFragment);
} }
public ExecutorSetting(Integer checkpoint) { public ExecutorSetting(Integer checkpoint) {
this.checkpoint = checkpoint; this(checkpoint, false);
} }
public ExecutorSetting(Integer checkpoint, boolean useSqlFragment) { public ExecutorSetting(Integer checkpoint, boolean useSqlFragment) {
this.checkpoint = checkpoint; this(checkpoint, null, useSqlFragment, null, null);
this.useSqlFragment = useSqlFragment;
} }
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment) { public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment) {
this.checkpoint = checkpoint; this(checkpoint, parallelism, useSqlFragment, null, null);
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
} }
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName) { public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName) {
this.checkpoint = checkpoint; this(checkpoint, parallelism, useSqlFragment, savePointPath, jobName, null);
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
this.jobName = jobName;
} }
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) { public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) {
this.checkpoint = checkpoint; this(checkpoint, parallelism, useSqlFragment, savePointPath, null, null);
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
} }
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName, Map<String, String> config) { public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName, Map<String, String> config) {
this.checkpoint = checkpoint; this(checkpoint, parallelism, useSqlFragment, false, false, savePointPath, jobName, config);
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
this.jobName = jobName;
this.config = config;
} }
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, boolean useStatementSet, public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, boolean useStatementSet, boolean useBatchModel, String savePointPath, String jobName,
boolean useBatchModel, String savePointPath, String jobName, Map<String, String> config) { Map<String, String> config) {
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.parallelism = parallelism; this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment; this.useSqlFragment = useSqlFragment;
...@@ -108,52 +107,44 @@ public class ExecutorSetting { ...@@ -108,52 +107,44 @@ public class ExecutorSetting {
this.config = config; this.config = config;
} }
public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boolean useSqlFragment, boolean useStatementSet, public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boolean useSqlFragment, boolean useStatementSet, boolean useBatchModel, String savePointPath, String jobName,
boolean useBatchModel, String savePointPath, String jobName, String configJson) { String configJson) {
List<Map<String, String>> configList = new ArrayList<>(); List<Map<String, String>> configList = new ArrayList<>();
if (Asserts.isNotNullString(configJson)) { if (Asserts.isNotNullString(configJson)) {
try { try {
configList = mapper.readValue(configJson, ArrayList.class); configList = mapper.readValue(configJson, ArrayList.class);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
e.printStackTrace(); log.error(e.getMessage());
} }
} }
Map<String, String> config = new HashMap<>(); Map<String, String> config = new HashMap<>();
for (Map<String, String> item : configList) { for (Map<String, String> item : configList) {
config.put(item.get("key"), item.get("value")); config.put(item.get("key"), item.get("value"));
} }
return new ExecutorSetting(checkpoint, parallelism, useSqlFragment, useStatementSet, useBatchModel, savePointPath, jobName, config); return new ExecutorSetting(checkpoint, parallelism, useSqlFragment, useStatementSet, useBatchModel,
savePointPath, jobName, config);
} }
public static ExecutorSetting build(Map<String, String> settingMap) { public static ExecutorSetting build(Map<String, String> settingMap) {
Integer checkpoint = null; Integer checkpoint = Integer.valueOf(settingMap.get(CHECKPOINT_CONST));
Integer parallelism = null; Integer parallelism = Integer.valueOf(settingMap.get(PARALLELISM_CONST));
if (settingMap.containsKey("checkpoint") && !"".equals(settingMap.get("checkpoint"))) {
checkpoint = Integer.valueOf(settingMap.get("checkpoint")); return build(checkpoint, parallelism, "1".equals(settingMap.get(USE_SQL_FRAGMENT)), "1".equals(settingMap.get(USE_STATEMENT_SET)), "1".equals(settingMap.get(USE_BATCH_MODEL)),
settingMap.get(SAVE_POINT_PATH), settingMap.get(JOB_NAME), settingMap.get(CONFIG_CONST));
} }
if (settingMap.containsKey("parallelism") && !"".equals(settingMap.get("parallelism"))) {
parallelism = Integer.valueOf(settingMap.get("parallelism")); public boolean isValidParallelism() {
return this.getParallelism() != null && this.getParallelism() > 0;
} }
return build(checkpoint,
parallelism, public boolean isValidJobName() {
"1".equals(settingMap.get("useSqlFragment")), return this.getJobName() != null && !"".equals(this.getJobName());
"1".equals(settingMap.get("useStatementSet")),
"1".equals(settingMap.get("useBatchModel")),
settingMap.get("savePointPath"),
settingMap.get("jobName"),
settingMap.get("config"));
} }
@Override @Override
public String toString() { public String toString() {
return "ExecutorSetting{" return String.format("ExecutorSetting{checkpoint=%d, parallelism=%d, useSqlFragment=%s, useStatementSet=%s, savePointPath='%s', jobName='%s', config=%s}", checkpoint, parallelism,
+ "checkpoint=" + checkpoint useSqlFragment, useStatementSet, savePointPath, jobName, config);
+ ", parallelism=" + parallelism
+ ", useSqlFragment=" + useSqlFragment
+ ", useStatementSet=" + useStatementSet
+ ", savePointPath='" + savePointPath + '\''
+ ", jobName='" + jobName + '\''
+ ", config=" + config
+ '}';
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment