Commit 1b459fbc authored by wenmo's avatar wenmo

新增 Batch 引擎

parent 179cd24e
......@@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
......@@ -13,7 +14,10 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
......@@ -34,7 +38,6 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row;
import java.lang.reflect.Method;
import java.util.ArrayList;
......@@ -47,7 +50,7 @@ import java.util.Map;
* @author wenmo
* @since 2021/6/7 22:06
**/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment {
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
......@@ -57,14 +60,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
}
static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
Configuration configuration = new Configuration();
configuration.setString("execution.runtime-mode", "BATCH");
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
return create(executionEnvironment, EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), tableConfig);
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return create(executionEnvironment, settings, new TableConfig());
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {
if (!settings.isStreamingMode()) {
throw new TableException("StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
} else {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
......@@ -72,9 +80,9 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ( ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
Planner planner = (ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new CustomTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader);
}
}
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
......@@ -94,25 +102,25 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
if (operations.get(i) instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
if (execEnv instanceof ExecutorBase) {
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
ObjectNode objectNode = mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
} finally {
return objectNode;
}
}else{
} else {
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
......@@ -124,27 +132,27 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
for (String statement : statements) {
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation)operation);
modifyOperations.add((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
if (execEnv instanceof ExecutorBase) {
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
if(tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
if (tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
}else{
} else {
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
......@@ -174,11 +182,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
record.setExplain(operation.asSummaryString());
operationlist.remove(i);
record.setType("DDL");
i=i-1;
i = i - 1;
}
}
record.setExplainTrue(true);
if(operationlist.size()==0){
if (operationlist.size() == 0) {
return record;
}
record.setExplain(planner.explain(operationlist, extraDetails));
......
......@@ -4,8 +4,11 @@ import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
......@@ -13,7 +16,10 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
......@@ -34,7 +40,6 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row;
import java.lang.reflect.Method;
import java.util.ArrayList;
......@@ -47,7 +52,7 @@ import java.util.Map;
* @author wenmo
* @since 2021/6/7 22:06
**/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment {
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
......@@ -57,14 +62,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
}
static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
return create(executionEnvironment, EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), tableConfig);
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return create(executionEnvironment, settings, new TableConfig());
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {
if (!settings.isStreamingMode()) {
throw new TableException("StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
} else {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
......@@ -74,7 +84,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = (ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new CustomTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader);
}
}
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
......
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CustomBatchTableEnvironmentImpl
*
* @author wenmo
* @since 2022/2/4 0:20
*/
public class CustomBatchTableEnvironmentImpl extends BatchTableEnvironmentImpl implements CustomTableEnvironment {
public CustomBatchTableEnvironmentImpl(ExecutionEnvironment execEnv, TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) {
super(execEnv, config, catalogManager, moduleManager);
}
/*protected CustomBatchTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
}*/
public static CustomBatchTableEnvironmentImpl create(ExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
}
static CustomBatchTableEnvironmentImpl create(ExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return create(executionEnvironment, settings, new TableConfig());
}
public static CustomBatchTableEnvironmentImpl create(ExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor =
ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
.create(executorProperties);
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner =
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
return new CustomBatchTableEnvironmentImpl(executionEnvironment, tableConfig, catalogManager, moduleManager);
// return new CustomBatchTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader);
}
@Override
public ObjectNode getStreamGraph(String statement) {
List<Operation> operations = super.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
for (int i = 0; i < operations.size(); i++) {
if (operations.get(i) instanceof ModifyOperation) {
addToBuffer((ModifyOperation) operations.get(i));
}
}
Pipeline pipeline = getPipeline("Flink Batch Job");
if (pipeline instanceof StreamGraph) {
JSONGenerator jsonGenerator = new JSONGenerator((StreamGraph) pipeline);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode = mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
} finally {
return objectNode;
}
} else {
throw new TableException("Unsupported SQL getStreamGraph().");
}
}
// return null;
}
@Override
public JobPlanInfo getJobPlanInfo(List<String> statements) {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}
@Override
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
for (String statement : statements) {
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
addToBuffer((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
Pipeline pipeline = getPipeline("Flink Batch Job");
if (pipeline instanceof StreamGraph) {
return (StreamGraph) pipeline;
} else {
throw new TableException("Unsupported SQL getStreamGraphFromInserts().");
}
// return null;
}
@Override
public JobGraph getJobGraphFromInserts(List<String> statements) {
return getStreamGraphFromInserts(statements).getJobGraph();
}
@Override
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = getParser().parse(statement);
record.setParseTrue(true);
if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! explainSql() only accepts a single SQL query.");
}
List<Operation> operationlist = new ArrayList<>(operations);
for (int i = 0; i < operationlist.size(); i++) {
Operation operation = operationlist.get(i);
if (operation instanceof ModifyOperation) {
record.setType("Modify DML");
} else if (operation instanceof ExplainOperation) {
record.setType("Explain DML");
} else if (operation instanceof QueryOperation) {
record.setType("Query DML");
} else {
record.setExplain(operation.asSummaryString());
operationlist.remove(i);
record.setType("DDL");
i = i - 1;
}
}
record.setExplainTrue(true);
if (operationlist.size() == 0) {
//record.setExplain("DDL语句不进行解释。");
return record;
}
record.setExplain(explainInternal(operationlist, extraDetails));
return record;
}
@Override
public boolean parseAndLoadConfiguration(String statement, ExecutionConfig executionConfig, Map<String, Object> setMap) {
List<Operation> operations = getParser().parse(statement);
for (Operation operation : operations) {
if (operation instanceof SetOperation) {
callSet((SetOperation) operation, executionConfig, setMap);
return true;
} else if (operation instanceof ResetOperation) {
callReset((ResetOperation) operation, executionConfig, setMap);
return true;
}
}
return false;
}
private void callSet(SetOperation setOperation, ExecutionConfig executionConfig, Map<String, Object> setMap) {
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
Map<String, String> confMap = new HashMap<>();
confMap.put(key, value);
setMap.put(key, value);
Configuration configuration = Configuration.fromMap(confMap);
executionConfig.configure(configuration, null);
getConfig().addConfiguration(configuration);
}
}
private void callReset(ResetOperation resetOperation, ExecutionConfig executionConfig, Map<String, Object> setMap) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
Map<String, String> confMap = new HashMap<>();
confMap.put(key, null);
setMap.remove(key);
Configuration configuration = Configuration.fromMap(confMap);
executionConfig.configure(configuration, null);
getConfig().addConfiguration(configuration);
} else {
setMap.clear();
}
}
}
......@@ -4,10 +4,11 @@ import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
......@@ -64,14 +65,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
}
static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
return create(executionEnvironment, EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), tableConfig);
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return create(executionEnvironment, settings, new TableConfig());
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {
if (!settings.isStreamingMode()) {
throw new TableException("StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
} else {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
......@@ -81,7 +87,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = (ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new CustomTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader);
}
}
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
......@@ -101,25 +107,25 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
if (operations.get(i) instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operations.get(i));
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
if (execEnv instanceof ExecutorBase) {
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
ObjectNode objectNode = mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
} finally {
return objectNode;
}
}else{
} else {
throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query.");
}
}
......@@ -131,27 +137,27 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
for (String statement : statements) {
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation)operation);
modifyOperations.add((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
if (execEnv instanceof ExecutorBase) {
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
if(tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
if (tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
}else{
} else {
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
......@@ -181,11 +187,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
record.setExplain(operation.asSummaryString());
operationlist.remove(i);
record.setType("DDL");
i=i-1;
i = i - 1;
}
}
record.setExplainTrue(true);
if(operationlist.size()==0){
if (operationlist.size() == 0) {
//record.setExplain("DDL语句不进行解释。");
return record;
}
......@@ -210,43 +216,43 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public boolean parseAndLoadConfiguration(String statement, ExecutionConfig executionConfig, Map<String,Object> setMap){
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
List<Operation> operations = getParser().parse(statement);
for(Operation operation : operations){
if(operation instanceof SetOperation){
callSet((SetOperation)operation,executionConfig,setMap);
for (Operation operation : operations) {
if (operation instanceof SetOperation) {
callSet((SetOperation) operation, environment, setMap);
return true;
} else if (operation instanceof ResetOperation){
callReset((ResetOperation)operation,executionConfig,setMap);
} else if (operation instanceof ResetOperation) {
callReset((ResetOperation) operation, environment, setMap);
return true;
}
}
return false;
}
private void callSet(SetOperation setOperation, ExecutionConfig executionConfig,Map<String,Object> setMap){
private void callSet(SetOperation setOperation, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,value);
setMap.put(key,value);
Map<String, String> confMap = new HashMap<>();
confMap.put(key, value);
setMap.put(key, value);
Configuration configuration = Configuration.fromMap(confMap);
executionConfig.configure(configuration,null);
environment.getConfig().configure(configuration, null);
getConfig().addConfiguration(configuration);
}
}
private void callReset(ResetOperation resetOperation, ExecutionConfig executionConfig,Map<String,Object> setMap) {
private void callReset(ResetOperation resetOperation, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,null);
Map<String, String> confMap = new HashMap<>();
confMap.put(key, null);
setMap.remove(key);
Configuration configuration = Configuration.fromMap(confMap);
executionConfig.configure(configuration,null);
environment.getConfig().configure(configuration, null);
getConfig().addConfiguration(configuration);
}else {
} else {
setMap.clear();
}
}
......
......@@ -4,15 +4,22 @@ import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
......@@ -34,7 +41,6 @@ import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.planner.delegation.DefaultExecutor;
import org.apache.flink.types.Row;
import java.lang.reflect.Method;
import java.util.ArrayList;
......@@ -48,7 +54,7 @@ import java.util.Map;
* @author wenmo
* @since 2021/10/22 10:02
**/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment {
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
......@@ -75,8 +81,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
userClassLoader);
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment){
return create(executionEnvironment,EnvironmentSettings.newInstance().build(),TableConfig.getDefault());
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build(), TableConfig.getDefault());
}
public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
return create(executionEnvironment, EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), tableConfig);
}
public static CustomTableEnvironmentImpl create(
......@@ -155,53 +169,58 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
if (operations.get(i) instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if(execEnv instanceof DefaultExecutor){
if (execEnv instanceof DefaultExecutor) {
StreamGraph streamGraph = ((DefaultExecutor) execEnv).getExecutionEnvironment().generateStreamGraph(trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
ObjectNode objectNode = mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
} finally {
return objectNode;
}
}else{
} else {
throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query.");
}
}
}
@Override
public JobPlanInfo getJobPlanInfo(List<String> statements) {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
for (String statement : statements) {
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation)operation);
modifyOperations.add((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof DefaultExecutor){
if (execEnv instanceof DefaultExecutor) {
StreamGraph streamGraph = ((DefaultExecutor) execEnv).getExecutionEnvironment().generateStreamGraph(trans);
if(tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
if (tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
}else{
} else {
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
......@@ -231,11 +250,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
record.setExplain(operation.asSummaryString());
operationlist.remove(i);
record.setType("DDL");
i=i-1;
i = i - 1;
}
}
record.setExplainTrue(true);
if(operationlist.size()==0){
if (operationlist.size() == 0) {
//record.setExplain("DDL语句不进行解释。");
return record;
}
......@@ -260,43 +279,43 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String,Object> setMap){
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
List<Operation> operations = getParser().parse(statement);
for(Operation operation : operations){
if(operation instanceof SetOperation){
callSet((SetOperation)operation,environment,setMap);
for (Operation operation : operations) {
if (operation instanceof SetOperation) {
callSet((SetOperation) operation, environment, setMap);
return true;
} else if (operation instanceof ResetOperation){
callReset((ResetOperation)operation,environment,setMap);
} else if (operation instanceof ResetOperation) {
callReset((ResetOperation) operation, environment, setMap);
return true;
}
}
return false;
}
private void callSet(SetOperation setOperation,StreamExecutionEnvironment environment,Map<String,Object> setMap){
private void callSet(SetOperation setOperation, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,value);
setMap.put(key,value);
Map<String, String> confMap = new HashMap<>();
confMap.put(key, value);
setMap.put(key, value);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
environment.getConfig().configure(configuration, null);
getConfig().addConfiguration(configuration);
}
}
private void callReset(ResetOperation resetOperation,StreamExecutionEnvironment environment,Map<String,Object> setMap) {
private void callReset(ResetOperation resetOperation, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,null);
Map<String, String> confMap = new HashMap<>();
confMap.put(key, null);
setMap.remove(key);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
environment.getConfig().configure(configuration, null);
getConfig().addConfiguration(configuration);
}else {
} else {
setMap.clear();
}
}
......
package com.dlink.core;
import com.dlink.executor.CustomBatchTableEnvironmentImpl;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import com.dlink.executor.CustomTableEnvironmentImpl;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.junit.Test;
/**
......@@ -17,7 +19,7 @@ import org.junit.Test;
*/
public class BatchTest {
@Test
public void batchTest(){
public void batchTest() {
String source = "CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
......@@ -42,7 +44,7 @@ public class BatchTest {
}
@Test
public void batchTest2(){
public void batchTest2() {
String source = "CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
......@@ -53,29 +55,18 @@ public class BatchTest {
" 'number-of-rows' = '100'\n" +
")";
String select = "select order_number,price,order_time from Orders";
LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
CustomBatchTableEnvironmentImpl batchTableEnvironment = CustomBatchTableEnvironmentImpl.create(environment);
batchTableEnvironment.executeSql(source);
TableResult tableResult = batchTableEnvironment.executeSql(select);
tableResult.print();
}
@Test
public void batchTest3(){
String source = "CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" buyer ROW<first_name STRING, last_name STRING>,\n" +
" order_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'number-of-rows' = '100'\n" +
")";
String select = "select order_number,price,order_time from Orders";
LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
CustomBatchTableEnvironmentImpl batchTableEnvironment = CustomBatchTableEnvironmentImpl.create(environment);
// LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
// configuration.setString("execution.runtime-mode", "STREAMING");
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
CustomTableEnvironmentImpl batchTableEnvironment = CustomTableEnvironmentImpl.create(environment,
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), tableConfig);
batchTableEnvironment.executeSql(source);
TableResult tableResult = batchTableEnvironment.executeSql(select);
tableResult.print();
batchTableEnvironment.executeSql(select);
// TableResult tableResult = batchTableEnvironment.executeSql(select);
// tableResult.print();
}
}
package com.dlink.executor;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableException;
/**
* AbstractBatchExecutor
*
* @author wenmo
* @since 2022/2/7 20:05
*/
public abstract class AbstractBatchExecutor extends Executor{
protected ExecutionEnvironment environment;
public void initEnvironment(){
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public void updateEnvironment(ExecutorSetting executorSetting){
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public JobExecutionResult execute(String jobName) throws Exception {
return environment.execute(jobName);
}
public StreamGraph getStreamGraph(){
throw new TableException("Batch model can't get StreamGraph.");
}
public StreamExecutionEnvironment getStreamExecutionEnvironment(){
return null;
}
public ExecutionConfig getExecutionConfig(){
return environment.getConfig();
}
public boolean parseAndLoadConfiguration(String statement){
return stEnvironment.parseAndLoadConfiguration(statement,getExecutionConfig(),setConfig);
}
}
package com.dlink.executor;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
/**
* AbstractStreamExecutor
*
* @author wenmo
* @since 2022/2/7 20:03
*/
public abstract class AbstractStreamExecutor extends Executor{
protected StreamExecutionEnvironment environment;
public void initEnvironment(){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public void updateEnvironment(ExecutorSetting executorSetting){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public JobExecutionResult execute(String jobName) throws Exception {
return environment.execute(jobName);
}
public StreamGraph getStreamGraph(){
return environment.getStreamGraph();
}
public StreamExecutionEnvironment getStreamExecutionEnvironment(){
return environment;
}
public ExecutionConfig getExecutionConfig(){
return environment.getConfig();
}
public boolean parseAndLoadConfiguration(String statement){
return stEnvironment.parseAndLoadConfiguration(statement,getExecutionConfig(),setConfig);
}
}
package com.dlink.executor;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* AppBatchExecutor
......@@ -8,16 +8,16 @@ import org.apache.flink.api.java.ExecutionEnvironment;
* @author wenmo
* @since 2022/2/7 22:14
*/
public class AppBatchExecutor extends AbstractBatchExecutor {
public class AppBatchExecutor extends Executor {
public AppBatchExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = ExecutionEnvironment.createLocalEnvironment();
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
init();
}
@Override
CustomTableEnvironment createCustomTableEnvironment() {
return CustomBatchTableEnvironmentImpl.create(environment);
return CustomTableEnvironmentImpl.createBatch(environment);
}
}
......@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @author wenmo
* @since 2021/11/18
*/
public class AppStreamExecutor extends AbstractStreamExecutor{
public class AppStreamExecutor extends Executor {
public AppStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
......
......@@ -20,8 +20,6 @@ import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.HashMap;
import java.util.List;
......@@ -29,6 +27,7 @@ import java.util.Map;
/**
* Executor
*
* @author wenmo
* @since 2021/11/17
**/
......@@ -38,7 +37,7 @@ public abstract class Executor {
protected CustomTableEnvironment stEnvironment;
protected EnvironmentSetting environmentSetting;
protected ExecutorSetting executorSetting;
protected Map<String,Object> setConfig = new HashMap<>();
protected Map<String, Object> setConfig = new HashMap<>();
protected SqlManager sqlManager = new SqlManager();
protected boolean useSqlFragment = true;
......@@ -51,60 +50,60 @@ public abstract class Executor {
return useSqlFragment;
}
public static Executor build(){
public static Executor build() {
return new LocalStreamExecutor(ExecutorSetting.DEFAULT);
}
public static Executor build(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
if(environmentSetting.isUseRemote()){
return buildRemoteExecutor(environmentSetting,executorSetting);
}else{
public static Executor build(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) {
if (environmentSetting.isUseRemote()) {
return buildRemoteExecutor(environmentSetting, executorSetting);
} else {
return buildLocalExecutor(executorSetting);
}
}
public static Executor buildLocalExecutor(ExecutorSetting executorSetting){
if(executorSetting.isUseBatchModel()){
public static Executor buildLocalExecutor(ExecutorSetting executorSetting) {
if (executorSetting.isUseBatchModel()) {
return new LocalBatchExecutor(executorSetting);
}else{
} else {
return new LocalStreamExecutor(executorSetting);
}
}
public static Executor buildAppStreamExecutor(ExecutorSetting executorSetting){
if(executorSetting.isUseBatchModel()){
public static Executor buildAppStreamExecutor(ExecutorSetting executorSetting) {
if (executorSetting.isUseBatchModel()) {
return new AppBatchExecutor(executorSetting);
}else{
} else {
return new AppStreamExecutor(executorSetting);
}
}
public static Executor buildRemoteExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
public static Executor buildRemoteExecutor(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) {
environmentSetting.setUseRemote(true);
if(executorSetting.isUseBatchModel()){
return new RemoteBatchExecutor(environmentSetting,executorSetting);
}else{
return new RemoteStreamExecutor(environmentSetting,executorSetting);
if (executorSetting.isUseBatchModel()) {
return new RemoteBatchExecutor(environmentSetting, executorSetting);
} else {
return new RemoteStreamExecutor(environmentSetting, executorSetting);
}
}
public abstract ExecutionConfig getExecutionConfig();
public abstract StreamExecutionEnvironment getStreamExecutionEnvironment();
public ExecutionConfig getExecutionConfig() {
return environment.getConfig();
}
public StreamExecutionEnvironment getEnvironment(){
public StreamExecutionEnvironment getStreamExecutionEnvironment() {
return environment;
}
public CustomTableEnvironment getCustomTableEnvironment(){
public CustomTableEnvironment getCustomTableEnvironment() {
return stEnvironment;
}
public ExecutorSetting getExecutorSetting(){
public ExecutorSetting getExecutorSetting() {
return executorSetting;
}
public EnvironmentSetting getEnvironmentSetting(){
public EnvironmentSetting getEnvironmentSetting() {
return environmentSetting;
}
......@@ -116,149 +115,175 @@ public abstract class Executor {
this.setConfig = setConfig;
}
protected void init(){
protected void init() {
initEnvironment();
initStreamExecutionEnvironment();
}
public void update(ExecutorSetting executorSetting){
public void update(ExecutorSetting executorSetting) {
updateEnvironment(executorSetting);
updateStreamExecutionEnvironment(executorSetting);
}
public abstract void initEnvironment();
public void initEnvironment() {
if (executorSetting.getCheckpoint() != null && executorSetting.getCheckpoint() > 0) {
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if (executorSetting.getParallelism() != null && executorSetting.getParallelism() > 0) {
environment.setParallelism(executorSetting.getParallelism());
}
if (executorSetting.getConfig() != null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public abstract void updateEnvironment(ExecutorSetting executorSetting);
public void updateEnvironment(ExecutorSetting executorSetting) {
if (executorSetting.getCheckpoint() != null && executorSetting.getCheckpoint() > 0) {
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if (executorSetting.getParallelism() != null && executorSetting.getParallelism() > 0) {
environment.setParallelism(executorSetting.getParallelism());
}
if (executorSetting.getConfig() != null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
abstract CustomTableEnvironment createCustomTableEnvironment();
private void initStreamExecutionEnvironment(){
private void initStreamExecutionEnvironment() {
useSqlFragment = executorSetting.isUseSqlFragment();
stEnvironment = createCustomTableEnvironment();
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
if (executorSetting.getJobName() != null && !"".equals(executorSetting.getJobName())) {
stEnvironment.getConfig().getConfiguration().setString(PipelineOptions.NAME.key(), executorSetting.getJobName());
}
setConfig.put(PipelineOptions.NAME.key(),executorSetting.getJobName());
if(executorSetting.getConfig()!=null){
setConfig.put(PipelineOptions.NAME.key(), executorSetting.getJobName());
if (executorSetting.getConfig() != null) {
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
}
}
}
private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting){
private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting) {
useSqlFragment = executorSetting.isUseSqlFragment();
copyCatalog();
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
if (executorSetting.getJobName() != null && !"".equals(executorSetting.getJobName())) {
stEnvironment.getConfig().getConfiguration().setString(PipelineOptions.NAME.key(), executorSetting.getJobName());
}
setConfig.put(PipelineOptions.NAME.key(),executorSetting.getJobName());
if(executorSetting.getConfig()!=null){
setConfig.put(PipelineOptions.NAME.key(), executorSetting.getJobName());
if (executorSetting.getConfig() != null) {
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
}
}
}
private void copyCatalog(){
private void copyCatalog() {
String[] catalogs = stEnvironment.listCatalogs();
CustomTableEnvironment newstEnvironment = createCustomTableEnvironment();
for (int i = 0; i < catalogs.length; i++) {
if(stEnvironment.getCatalog(catalogs[i]).isPresent()) {
newstEnvironment.getCatalogManager().unregisterCatalog(catalogs[i],true);
if (stEnvironment.getCatalog(catalogs[i]).isPresent()) {
newstEnvironment.getCatalogManager().unregisterCatalog(catalogs[i], true);
newstEnvironment.registerCatalog(catalogs[i], stEnvironment.getCatalog(catalogs[i]).get());
}
}
stEnvironment = newstEnvironment;
}
public String pretreatStatement(String statement){
return FlinkInterceptor.pretreatStatement(this,statement);
public String pretreatStatement(String statement) {
return FlinkInterceptor.pretreatStatement(this, statement);
}
private boolean pretreatExecute(String statement){
return !FlinkInterceptor.build(this,statement);
private boolean pretreatExecute(String statement) {
return !FlinkInterceptor.build(this, statement);
}
public abstract JobExecutionResult execute(String jobName) throws Exception ;
public JobExecutionResult execute(String jobName) throws Exception {
return environment.execute(jobName);
}
public TableResult executeSql(String statement){
public TableResult executeSql(String statement) {
statement = pretreatStatement(statement);
if(pretreatExecute(statement)) {
if (pretreatExecute(statement)) {
return stEnvironment.executeSql(statement);
}else{
} else {
return CustomTableResultImpl.TABLE_RESULT_OK;
}
}
public String explainSql(String statement, ExplainDetail... extraDetails){
public String explainSql(String statement, ExplainDetail... extraDetails) {
statement = pretreatStatement(statement);
if(pretreatExecute(statement)) {
return stEnvironment.explainSql(statement,extraDetails);
}else{
if (pretreatExecute(statement)) {
return stEnvironment.explainSql(statement, extraDetails);
} else {
return "";
}
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails){
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
statement = pretreatStatement(statement);
if(Asserts.isNotNullString(statement)&&pretreatExecute(statement)) {
return stEnvironment.explainSqlRecord(statement,extraDetails);
}else{
if (Asserts.isNotNullString(statement) && pretreatExecute(statement)) {
return stEnvironment.explainSqlRecord(statement, extraDetails);
} else {
return null;
}
}
public ObjectNode getStreamGraph(String statement){
public ObjectNode getStreamGraph(String statement) {
statement = pretreatStatement(statement);
if(pretreatExecute(statement)) {
if (pretreatExecute(statement)) {
return stEnvironment.getStreamGraph(statement);
}else{
} else {
return null;
}
}
public ObjectNode getStreamGraph(List<String> statements){
public ObjectNode getStreamGraph(List<String> statements) {
StreamGraph streamGraph = stEnvironment.getStreamGraphFromInserts(statements);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
ObjectNode objectNode = mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
} finally {
return objectNode;
}
}
public abstract StreamGraph getStreamGraph();
public StreamGraph getStreamGraph() {
return environment.getStreamGraph();
}
public ObjectNode getStreamGraphFromDataStream(List<String> statements){
for(String statement : statements){
public ObjectNode getStreamGraphFromDataStream(List<String> statements) {
for (String statement : statements) {
executeSql(statement);
}
StreamGraph streamGraph = getStreamGraph();
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
ObjectNode objectNode = mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
} finally {
return objectNode;
}
}
public JobPlanInfo getJobPlanInfo(List<String> statements){
public JobPlanInfo getJobPlanInfo(List<String> statements) {
return stEnvironment.getJobPlanInfo(statements);
}
public JobPlanInfo getJobPlanInfoFromDataStream(List<String> statements){
for(String statement : statements){
public JobPlanInfo getJobPlanInfoFromDataStream(List<String> statements) {
for (String statement : statements) {
executeSql(statement);
}
StreamGraph streamGraph = getStreamGraph();
......@@ -273,19 +298,19 @@ public abstract class Executor {
stEnvironment.createTemporarySystemFunction(name,var2);
}*/
public CatalogManager getCatalogManager(){
public CatalogManager getCatalogManager() {
return stEnvironment.getCatalogManager();
}
public JobGraph getJobGraphFromInserts(List<String> statements){
public JobGraph getJobGraphFromInserts(List<String> statements) {
return stEnvironment.getJobGraphFromInserts(statements);
}
public StatementSet createStatementSet(){
public StatementSet createStatementSet() {
return stEnvironment.createStatementSet();
}
public TableResult executeStatementSet(List<String> statements){
public TableResult executeStatementSet(List<String> statements) {
StatementSet statementSet = stEnvironment.createStatementSet();
for (String item : statements) {
statementSet.addInsertSql(item);
......@@ -293,7 +318,7 @@ public abstract class Executor {
return statementSet.execute();
}
public String explainStatementSet(List<String> statements){
public String explainStatementSet(List<String> statements) {
StatementSet statementSet = stEnvironment.createStatementSet();
for (String item : statements) {
statementSet.addInsertSql(item);
......@@ -301,13 +326,15 @@ public abstract class Executor {
return statementSet.explain();
}
public void submitSql(String statements){
public void submitSql(String statements) {
executeSql(statements);
}
public void submitStatementSet(List<String> statements){
public void submitStatementSet(List<String> statements) {
executeStatementSet(statements);
}
public abstract boolean parseAndLoadConfiguration(String statement);
public boolean parseAndLoadConfiguration(String statement) {
return stEnvironment.parseAndLoadConfiguration(statement, environment, setConfig);
}
}
package com.dlink.executor;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
/**
* LocalBatchExecutor
......@@ -12,16 +8,16 @@ import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
* @author wenmo
* @since 2022/2/4 0:04
*/
public class LocalBatchExecutor extends AbstractBatchExecutor {
public class LocalBatchExecutor extends Executor {
public LocalBatchExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = ExecutionEnvironment.createLocalEnvironment();
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
init();
}
@Override
CustomTableEnvironment createCustomTableEnvironment() {
return CustomBatchTableEnvironmentImpl.create(environment);
return CustomTableEnvironmentImpl.createBatch(environment);
}
}
......@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @author wenmo
* @since 2021/5/25 13:48
**/
public class LocalStreamExecutor extends AbstractStreamExecutor {
public class LocalStreamExecutor extends Executor {
public LocalStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
......
package com.dlink.executor;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
......@@ -9,17 +8,17 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @author wenmo
* @since 2022/2/7 22:10
*/
public class RemoteBatchExecutor extends AbstractBatchExecutor {
public class RemoteBatchExecutor extends Executor {
public RemoteBatchExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting) {
public RemoteBatchExecutor(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting;
this.environment = ExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
init();
}
@Override
CustomTableEnvironment createCustomTableEnvironment() {
return CustomBatchTableEnvironmentImpl.create(environment);
return CustomTableEnvironmentImpl.createBatch(environment);
}
}
......@@ -8,9 +8,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @author wenmo
* @since 2021/5/25 14:05
**/
public class RemoteStreamExecutor extends AbstractStreamExecutor {
public class RemoteStreamExecutor extends Executor {
public RemoteStreamExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting) {
public RemoteStreamExecutor(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
......
......@@ -647,6 +647,9 @@ export default (): React.ReactNode => {
<li>
<Link>新增 FlinkJar Dialect 的管理</Link>
</li>
<li>
<Link>新增 Batch 引擎</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment