Commit 3c11f876 authored by wenmo's avatar wenmo

executor模块独立与解耦重构,并优化增强逻辑

parent 56914d69
...@@ -114,11 +114,6 @@ ...@@ -114,11 +114,6 @@
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-metadata-base</artifactId> <artifactId>dlink-metadata-base</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
...@@ -5,7 +5,6 @@ import com.dlink.dto.SessionDTO; ...@@ -5,7 +5,6 @@ import com.dlink.dto.SessionDTO;
import com.dlink.dto.StudioCADTO; import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO; import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO; import com.dlink.dto.StudioExecuteDTO;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import com.dlink.service.StudioService; import com.dlink.service.StudioService;
...@@ -140,11 +139,4 @@ public class StudioController { ...@@ -140,11 +139,4 @@ public class StudioController {
public Result cancel(@RequestParam Integer clusterId,@RequestParam String jobId) { public Result cancel(@RequestParam Integer clusterId,@RequestParam String jobId) {
return Result.succeed(studioService.cancel(clusterId,jobId),"停止成功"); return Result.succeed(studioService.cancel(clusterId,jobId),"停止成功");
} }
/**
* 提交jar
*/
@PostMapping("/submitJar")
public Result submitJar(@RequestBody JsonNode para) {
return Result.succeed(studioService.submitJar(GatewayConfig.build(para)),"执行成功");
}
} }
...@@ -109,9 +109,9 @@ public class TaskController { ...@@ -109,9 +109,9 @@ public class TaskController {
/** /**
* 提交作业 * 提交作业
*/ */
@GetMapping(value = "/submitApplication") /*@GetMapping(value = "/submitApplication")
public Result submitApplicationByTaskId(@RequestParam Integer id) { public Result submitApplicationByTaskId(@RequestParam Integer id) {
return taskService.submitApplicationByTaskId(id); return taskService.submitApplicationByTaskId(id);
} }*/
} }
package com.dlink.model; package com.dlink.model;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.annotation.FieldFill; import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.db.model.SuperEntity; import com.dlink.db.model.SuperEntity;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import java.util.HashMap;
/** /**
* 任务 * 任务
* *
...@@ -55,13 +50,13 @@ public class Task extends SuperEntity{ ...@@ -55,13 +50,13 @@ public class Task extends SuperEntity{
@TableField(exist = false) @TableField(exist = false)
private String clusterName; private String clusterName;
public ExecutorSetting buildExecutorSetting(){ /*public ExecutorSetting buildExecutorSetting(){
HashMap configMap = new HashMap(); HashMap configMap = new HashMap();
if(config!=null&&!"".equals(clusterName)) { if(config!=null&&!"".equals(clusterName)) {
configMap = JSONUtil.toBean(config, HashMap.class); configMap = JSONUtil.toBean(config, HashMap.class);
} }
return new ExecutorSetting(checkPoint,parallelism,fragment,savePointPath,alias,configMap); return new ExecutorSetting(checkPoint,parallelism,fragment,savePointPath,alias,configMap);
} }*/
public JobConfig buildSubmitConfig(){ public JobConfig buildSubmitConfig(){
boolean useRemote = true; boolean useRemote = true;
......
package com.dlink.service; package com.dlink.service;
import com.dlink.db.service.ISuperService; import com.dlink.db.service.ISuperService;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.model.ClusterConfiguration; import com.dlink.model.ClusterConfiguration;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* ClusterConfigService * ClusterConfigService
...@@ -18,6 +18,6 @@ public interface ClusterConfigurationService extends ISuperService<ClusterConfig ...@@ -18,6 +18,6 @@ public interface ClusterConfigurationService extends ISuperService<ClusterConfig
List<ClusterConfiguration> listEnabledAll(); List<ClusterConfiguration> listEnabledAll();
GatewayConfig buildGatewayConfig(Integer id); Map<String,String> getGatewayConfig(Integer id);
} }
...@@ -5,8 +5,6 @@ import com.dlink.dto.StudioDDLDTO; ...@@ -5,8 +5,6 @@ import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO; import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.ca.ColumnCANode; import com.dlink.explainer.ca.ColumnCANode;
import com.dlink.explainer.ca.TableCANode; import com.dlink.explainer.ca.TableCANode;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import com.dlink.result.SelectResult; import com.dlink.result.SelectResult;
...@@ -50,6 +48,4 @@ public interface StudioService { ...@@ -50,6 +48,4 @@ public interface StudioService {
List<JsonNode> listJobs(Integer clusterId); List<JsonNode> listJobs(Integer clusterId);
boolean cancel(Integer clusterId,String jobId); boolean cancel(Integer clusterId,String jobId);
GatewayResult submitJar(GatewayConfig config);
} }
...@@ -17,7 +17,7 @@ public interface TaskService extends ISuperService<Task> { ...@@ -17,7 +17,7 @@ public interface TaskService extends ISuperService<Task> {
JobResult submitByTaskId(Integer id); JobResult submitByTaskId(Integer id);
Result submitApplicationByTaskId(Integer id); // Result submitApplicationByTaskId(Integer id);
Task getTaskInfoById(Integer id); Task getTaskInfoById(Integer id);
......
...@@ -2,14 +2,13 @@ package com.dlink.service.impl; ...@@ -2,14 +2,13 @@ package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.mapper.ClusterConfigurationMapper; import com.dlink.mapper.ClusterConfigurationMapper;
import com.dlink.model.ClusterConfiguration; import com.dlink.model.ClusterConfiguration;
import com.dlink.service.ClusterConfigurationService; import com.dlink.service.ClusterConfigurationService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* ClusterConfigServiceImpl * ClusterConfigServiceImpl
...@@ -32,12 +31,8 @@ public class ClusterConfigurationServiceImpl extends SuperServiceImpl<ClusterCon ...@@ -32,12 +31,8 @@ public class ClusterConfigurationServiceImpl extends SuperServiceImpl<ClusterCon
} }
@Override @Override
public GatewayConfig buildGatewayConfig(Integer id) { public Map getGatewayConfig(Integer id) {
ClusterConfiguration clusterConfiguration = this.getClusterConfigById(id); ClusterConfiguration clusterConfiguration = this.getClusterConfigById(id);
GatewayConfig gatewayConfig = new GatewayConfig(); return clusterConfiguration.getConfig();
gatewayConfig.setClusterConfig(ClusterConfig.build(clusterConfiguration.getConfig().get("flinkConfigPath"),
clusterConfiguration.getConfig().get("flinkLibPath"),
clusterConfiguration.getConfig().get("hadoopConfigPath")));
return gatewayConfig;
} }
} }
...@@ -8,14 +8,10 @@ import com.dlink.dto.StudioExecuteDTO; ...@@ -8,14 +8,10 @@ import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.ca.CABuilder; import com.dlink.explainer.ca.CABuilder;
import com.dlink.explainer.ca.ColumnCANode; import com.dlink.explainer.ca.ColumnCANode;
import com.dlink.explainer.ca.TableCANode; import com.dlink.explainer.ca.TableCANode;
import com.dlink.gateway.Gateway;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import com.dlink.job.JobManager; import com.dlink.job.JobManager;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.model.Cluster; import com.dlink.model.Cluster;
import com.dlink.parser.SqlType;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import com.dlink.result.SelectResult; import com.dlink.result.SelectResult;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
...@@ -24,13 +20,11 @@ import com.dlink.service.StudioService; ...@@ -24,13 +20,11 @@ import com.dlink.service.StudioService;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo; import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import com.dlink.trans.Operations;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
...@@ -124,29 +118,17 @@ public class StudioServiceImpl implements StudioService { ...@@ -124,29 +118,17 @@ public class StudioServiceImpl implements StudioService {
@Override @Override
public List<TableCANode> getOneTableCAByStatement(String statement) { public List<TableCANode> getOneTableCAByStatement(String statement) {
if(Operations.getSqlTypeFromStatements(statement)== SqlType.INSERT) {
return CABuilder.getOneTableCAByStatement(statement); return CABuilder.getOneTableCAByStatement(statement);
}else{
return new ArrayList<>();
}
} }
@Override @Override
public List<TableCANode> getOneTableColumnCAByStatement(String statement) { public List<TableCANode> getOneTableColumnCAByStatement(String statement) {
if(Operations.getSqlTypeFromStatements(statement)== SqlType.INSERT) {
return CABuilder.getOneTableColumnCAByStatement(statement); return CABuilder.getOneTableColumnCAByStatement(statement);
}else{
return new ArrayList<>();
}
} }
@Override @Override
public List<ColumnCANode> getColumnCAByStatement(String statement) { public List<ColumnCANode> getColumnCAByStatement(String statement) {
if(Operations.getSqlTypeFromStatements(statement)== SqlType.INSERT) {
return CABuilder.getColumnCAByStatement(statement); return CABuilder.getColumnCAByStatement(statement);
}else{
return new ArrayList<>();
}
} }
@Override @Override
...@@ -162,9 +144,4 @@ public class StudioServiceImpl implements StudioService { ...@@ -162,9 +144,4 @@ public class StudioServiceImpl implements StudioService {
Asserts.checkNotNull(cluster,"该集群不存在"); Asserts.checkNotNull(cluster,"该集群不存在");
return FlinkAPI.build(cluster.getJobManagerHost()).stop(jobId); return FlinkAPI.build(cluster.getJobManagerHost()).stop(jobId);
} }
@Override
public GatewayResult submitJar(GatewayConfig config) {
return Gateway.build(config).submitJar();
}
} }
package com.dlink.service.impl; package com.dlink.service.impl;
import com.dlink.assertion.Assert; import com.dlink.assertion.Assert;
import com.dlink.common.result.Result;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import com.dlink.job.JobManager; import com.dlink.job.JobManager;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
...@@ -13,7 +9,6 @@ import com.dlink.mapper.TaskMapper; ...@@ -13,7 +9,6 @@ import com.dlink.mapper.TaskMapper;
import com.dlink.model.Cluster; import com.dlink.model.Cluster;
import com.dlink.model.Statement; import com.dlink.model.Statement;
import com.dlink.model.Task; import com.dlink.model.Task;
import com.dlink.result.SubmitResult;
import com.dlink.service.ClusterConfigurationService; import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService; import com.dlink.service.ClusterService;
import com.dlink.service.StatementService; import com.dlink.service.StatementService;
...@@ -47,13 +42,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -47,13 +42,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if(!JobManager.useGateway(config.getType())) { if(!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
}else{ }else{
config.setGatewayConfig(clusterConfigurationService.buildGatewayConfig(task.getClusterConfigurationId())); config.buildGatewayConfig(clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId()));
} }
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(statement.getStatement()); return jobManager.executeSql(statement.getStatement());
} }
@Override /*@Override
public Result submitApplicationByTaskId(Integer id) { public Result submitApplicationByTaskId(Integer id) {
Task task = this.getById(id); Task task = this.getById(id);
Assert.check(task); Assert.check(task);
...@@ -71,7 +66,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -71,7 +66,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
SubmitResult result = jobManager.submitGraph(statement.getStatement(), gatewayConfig); SubmitResult result = jobManager.submitGraph(statement.getStatement(), gatewayConfig);
return Result.succeed(result,"提交成功"); return Result.succeed(result,"提交成功");
} }*/
@Override @Override
public Task getTaskInfoById(Integer id) { public Task getTaskInfoById(Integer id) {
......
...@@ -165,5 +165,12 @@ ...@@ -165,5 +165,12 @@
<include>dlink-gateway-${project.version}.jar</include> <include>dlink-gateway-${project.version}.jar</include>
</includes> </includes>
</fileSet> </fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-executor/target</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>dlink-executor-${project.version}.jar</include>
</includes>
</fileSet>
</fileSets> </fileSets>
</assembly> </assembly>
\ No newline at end of file
package com.dlink.executor.custom; package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionCatalog;
...@@ -50,13 +48,8 @@ import java.util.Map; ...@@ -50,13 +48,8 @@ import java.util.Map;
**/ **/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
private SqlManager sqlManager; protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
private boolean useSqlFragment = true;
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader); super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
this.sqlManager = sqlManager;
} }
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) { public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
...@@ -73,20 +66,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -73,20 +66,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} else { } else {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager(); ModuleManager moduleManager = new ModuleManager();
SqlManager sqlManager = new SqlManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build(); CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties(); Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment); Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties(); Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ((PlannerFactory) ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager); Planner planner = ( ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new CustomTableEnvironmentImpl(catalogManager, sqlManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader); return new CustomTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader);
} }
} }
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) { private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
try { try {
ExecutorFactory executorFactory = (ExecutorFactory) ComponentFactoryService.find(ExecutorFactory.class, executorProperties); ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class); Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment); return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
} catch (Exception var4) { } catch (Exception var4) {
...@@ -94,39 +86,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -94,39 +86,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} }
} }
public void useSqlFragment() {
this.useSqlFragment = true;
}
public void unUseSqlFragment() {
this.useSqlFragment = false;
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
} else {
return super.explainSql(statement, extraDetails);
}
}
public ObjectNode getStreamGraph(String statement) { public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be explained.");
}
List<Operation> operations = super.parser.parse(statement); List<Operation> operations = super.parser.parse(statement);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query."); throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
...@@ -160,15 +120,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -160,15 +120,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public JobGraph getJobGraphFromInserts(List<String> statements) { public JobGraph getJobGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList(); List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){ for(String statement : statements){
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be add inserts.");
}
List<Operation> operations = getParser().parse(statement); List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException("Only single statement is supported."); throw new TableException("Only single statement is supported.");
...@@ -192,17 +143,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -192,17 +143,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) { public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult(); SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
String orignSql = statement;
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
record.setParseTrue(true);
record.setType("Sql Fragment");
record.setExplain(orignSql);
record.setExplainTrue(true);
return record;
}
}
List<Operation> operations = parser.parse(statement); List<Operation> operations = parser.parse(statement);
record.setParseTrue(true); record.setParseTrue(true);
if (operations.size() != 1) { if (operations.size() != 1) {
...@@ -233,66 +173,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -233,66 +173,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return record; return record;
} }
@Override
public String[] getCompletionHints(String statement, int position) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return new String[0];
}
}
return super.getCompletionHints(statement, position);
}
@Override
public Table sqlQuery(String query) {
if(useSqlFragment) {
query = sqlManager.parseVariable(query);
if (query.length() == 0) {
throw new TableException("Unsupported SQL query! The SQL query parsed is null.If it's a sql fragment, and please use executeSql().");
}
if (checkShowFragments(query)) {
return sqlManager.getSqlFragmentsTable(this);
} else {
return super.sqlQuery(query);
}
}else {
return super.sqlQuery(query);
}
}
@Override
public TableResult executeSql(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return CustomTableResultImpl.TABLE_RESULT_OK;
}
if (checkShowFragments(statement)) {
return sqlManager.getSqlFragments();
} else {
return super.executeSql(statement);
}
}else{
return super.executeSql(statement);
}
}
@Override
public void sqlUpdate(String stmt) {
if(useSqlFragment) {
stmt = sqlManager.parseVariable(stmt);
if (stmt.length() == 0) {
throw new TableException("Unsupported SQL update! The SQL update parsed is null.If it's a sql fragment, and please use executeSql().");
}
}
super.sqlUpdate(stmt);
}
public boolean checkShowFragments(String sql){
return sqlManager.checkShowFragments(sql);
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) { public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction); TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo); this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
......
...@@ -14,7 +14,6 @@ import java.util.Collections; ...@@ -14,7 +14,6 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.*;
/** /**
* 定制TableResultImpl * 定制TableResultImpl
......
package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
public final class SqlManager {
private Map<String, String> sqlFragments;
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
sqlFragments = new HashMap<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public List<String> listSqlFragments() {
return new ArrayList<>(sqlFragments.keySet());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
/*if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}*/
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public Map<String, String> getSqlFragment() {
return sqlFragments;
}
public TableResult getSqlFragments() {
List<Row> rows = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows);
}
public Iterator getSqlFragmentsIterator() {
return sqlFragments.entrySet().iterator();
}
public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
}
return environment.fromValues(keys);
}
public boolean checkShowFragments(String sql){
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) {
return statement;
}
String[] strs = statement.split(";");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim();
if (str.length() == 0) {
continue;
}
if (str.contains(":=")) {
String[] strs2 = str.split(":=");
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(":=") + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
}
} else {
sb.append(replaceVariable(str));
}
}
return sb.toString();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
}
...@@ -6,25 +6,20 @@ import com.fasterxml.jackson.databind.ObjectMapper; ...@@ -6,25 +6,20 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory; import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.factories.ComponentFactoryService;
...@@ -53,13 +48,8 @@ import java.util.Map; ...@@ -53,13 +48,8 @@ import java.util.Map;
**/ **/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
private SqlManager sqlManager; protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
private boolean useSqlFragment = true;
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader); super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
this.sqlManager = sqlManager;
} }
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) { public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
...@@ -76,20 +66,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -76,20 +66,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} else { } else {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager(); ModuleManager moduleManager = new ModuleManager();
SqlManager sqlManager = new SqlManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build(); CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties(); Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment); Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties(); Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ((PlannerFactory) ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager); Planner planner = (ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new CustomTableEnvironmentImpl(catalogManager, sqlManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader); return new CustomTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader);
} }
} }
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) { private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
try { try {
ExecutorFactory executorFactory = (ExecutorFactory) ComponentFactoryService.find(ExecutorFactory.class, executorProperties); ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class); Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment); return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
} catch (Exception var4) { } catch (Exception var4) {
...@@ -97,39 +86,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -97,39 +86,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} }
} }
public void useSqlFragment() {
this.useSqlFragment = true;
}
public void unUseSqlFragment() {
this.useSqlFragment = false;
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
} else {
return super.explainSql(statement, extraDetails);
}
}
public ObjectNode getStreamGraph(String statement) { public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be explained.");
}
List<Operation> operations = super.parser.parse(statement); List<Operation> operations = super.parser.parse(statement);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query."); throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
...@@ -163,15 +120,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -163,15 +120,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public JobGraph getJobGraphFromInserts(List<String> statements) { public JobGraph getJobGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList(); List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){ for(String statement : statements){
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be add inserts.");
}
List<Operation> operations = getParser().parse(statement); List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException("Only single statement is supported."); throw new TableException("Only single statement is supported.");
...@@ -195,17 +143,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -195,17 +143,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) { public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult(); SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
String orignSql = statement;
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
record.setParseTrue(true);
record.setType("Sql Fragment");
record.setExplain(orignSql);
record.setExplainTrue(true);
return record;
}
}
List<Operation> operations = parser.parse(statement); List<Operation> operations = parser.parse(statement);
record.setParseTrue(true); record.setParseTrue(true);
if (operations.size() != 1) { if (operations.size() != 1) {
...@@ -236,66 +173,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -236,66 +173,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return record; return record;
} }
@Override
public String[] getCompletionHints(String statement, int position) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return new String[0];
}
}
return super.getCompletionHints(statement, position);
}
@Override
public Table sqlQuery(String query) {
if(useSqlFragment) {
query = sqlManager.parseVariable(query);
if (query.length() == 0) {
throw new TableException("Unsupported SQL query! The SQL query parsed is null.If it's a sql fragment, and please use executeSql().");
}
if (checkShowFragments(query)) {
return sqlManager.getSqlFragmentsTable(this);
} else {
return super.sqlQuery(query);
}
}else {
return super.sqlQuery(query);
}
}
@Override
public TableResult executeSql(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return CustomTableResultImpl.TABLE_RESULT_OK;
}
if (checkShowFragments(statement)) {
return sqlManager.getSqlFragments();
} else {
return super.executeSql(statement);
}
}else{
return super.executeSql(statement);
}
}
@Override
public void sqlUpdate(String stmt) {
if(useSqlFragment) {
stmt = sqlManager.parseVariable(stmt);
if (stmt.length() == 0) {
throw new TableException("Unsupported SQL update! The SQL update parsed is null.If it's a sql fragment, and please use executeSql().");
}
}
super.sqlUpdate(stmt);
}
public boolean checkShowFragments(String sql){
return sqlManager.checkShowFragments(sql);
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) { public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction); TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo); this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
......
package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
public final class SqlManager {
private Map<String, String> sqlFragments;
private List<ModifyOperation> operations = new ArrayList();
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
sqlFragments = new HashMap<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public List<String> listSqlFragments() {
return new ArrayList<>(sqlFragments.keySet());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
/*if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}*/
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public Map<String, String> getSqlFragment() {
return sqlFragments;
}
public TableResult getSqlFragments() {
List<Row> rows = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows);
}
public Iterator getSqlFragmentsIterator() {
return sqlFragments.entrySet().iterator();
}
public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
}
return environment.fromValues(keys);
}
public boolean checkShowFragments(String sql){
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) {
return statement;
}
String[] strs = statement.split(";");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim();
if (str.length() == 0) {
continue;
}
if (str.contains(":=")) {
String[] strs2 = str.split(":=");
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(":=") + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
}
} else {
sb.append(replaceVariable(str));
}
}
return sb.toString();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
public void addInsertSql(String statement,CustomTableEnvironmentImpl tableEnvironment) {
List<Operation> operations = tableEnvironment.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = (Operation)operations.get(0);
if (operation instanceof ModifyOperation) {
this.operations.add((ModifyOperation)operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
}
...@@ -45,13 +45,8 @@ import java.util.Map; ...@@ -45,13 +45,8 @@ import java.util.Map;
**/ **/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
private SqlManager sqlManager; protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
private boolean useSqlFragment = true;
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader); super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
this.sqlManager = sqlManager;
} }
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) { public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
...@@ -68,20 +63,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -68,20 +63,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} else { } else {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager(); ModuleManager moduleManager = new ModuleManager();
SqlManager sqlManager = new SqlManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build(); CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties(); Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment); Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties(); Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ((PlannerFactory) ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager); Planner planner = (ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new CustomTableEnvironmentImpl(catalogManager, sqlManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader); return new CustomTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader);
} }
} }
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) { private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
try { try {
ExecutorFactory executorFactory = (ExecutorFactory) ComponentFactoryService.find(ExecutorFactory.class, executorProperties); ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class); Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment); return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
} catch (Exception var4) { } catch (Exception var4) {
...@@ -89,39 +83,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -89,39 +83,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} }
} }
public void useSqlFragment() {
this.useSqlFragment = true;
}
public void unUseSqlFragment() {
this.useSqlFragment = false;
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
} else {
return super.explainSql(statement, extraDetails);
}
}
public ObjectNode getStreamGraph(String statement) { public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be explained.");
}
List<Operation> operations = super.getParser().parse(statement); List<Operation> operations = super.getParser().parse(statement);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query."); throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
...@@ -132,7 +94,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -132,7 +94,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
modifyOperations.add((ModifyOperation)operations.get(i)); modifyOperations.add((ModifyOperation)operations.get(i));
} }
} }
List<Transformation<?>> trans = super.planner.translate(modifyOperations); List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){ if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans); StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph); JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
...@@ -155,15 +117,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -155,15 +117,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public JobGraph getJobGraphFromInserts(List<String> statements) { public JobGraph getJobGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList(); List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){ for(String statement : statements){
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be add inserts.");
}
List<Operation> operations = getParser().parse(statement); List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException("Only single statement is supported."); throw new TableException("Only single statement is supported.");
...@@ -187,17 +140,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -187,17 +140,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) { public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult(); SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
String orignSql = statement;
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
record.setParseTrue(true);
record.setType("Sql Fragment");
record.setExplain(orignSql);
record.setExplainTrue(true);
return record;
}
}
List<Operation> operations = getParser().parse(statement); List<Operation> operations = getParser().parse(statement);
record.setParseTrue(true); record.setParseTrue(true);
if (operations.size() != 1) { if (operations.size() != 1) {
...@@ -229,66 +171,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -229,66 +171,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return record; return record;
} }
@Override
public String[] getCompletionHints(String statement, int position) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return new String[0];
}
}
return super.getCompletionHints(statement, position);
}
@Override
public Table sqlQuery(String query) {
if(useSqlFragment) {
query = sqlManager.parseVariable(query);
if (query.length() == 0) {
throw new TableException("Unsupported SQL query! The SQL query parsed is null.If it's a sql fragment, and please use executeSql().");
}
if (checkShowFragments(query)) {
return sqlManager.getSqlFragmentsTable(this);
} else {
return super.sqlQuery(query);
}
}else {
return super.sqlQuery(query);
}
}
@Override
public TableResult executeSql(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return CustomTableResultImpl.TABLE_RESULT_OK;
}
if (checkShowFragments(statement)) {
return sqlManager.getSqlFragments();
} else {
return super.executeSql(statement);
}
}else{
return super.executeSql(statement);
}
}
@Override
public void sqlUpdate(String stmt) {
if(useSqlFragment) {
stmt = sqlManager.parseVariable(stmt);
if (stmt.length() == 0) {
throw new TableException("Unsupported SQL update! The SQL update parsed is null.If it's a sql fragment, and please use executeSql().");
}
}
super.sqlUpdate(stmt);
}
public boolean checkShowFragments(String sql){
return sqlManager.checkShowFragments(sql);
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) { public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction); TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo); this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
......
package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
public final class SqlManager {
private Map<String, String> sqlFragments;
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
sqlFragments = new HashMap<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public List<String> listSqlFragments() {
return new ArrayList<>(sqlFragments.keySet());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public Map<String, String> getSqlFragment() {
return sqlFragments;
}
public TableResult getSqlFragments() {
List<Row> rows = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows);
}
public Iterator getSqlFragmentsIterator() {
return sqlFragments.entrySet().iterator();
}
public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
}
return environment.fromValues(keys);
}
public boolean checkShowFragments(String sql){
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) {
return statement;
}
String[] strs = statement.split(";");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim();
if (str.length() == 0) {
continue;
}
if (str.contains(":=")) {
String[] strs2 = str.split(":=");
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(":=") + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
}
} else {
sb.append(replaceVariable(str));
}
}
return sb.toString();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
}
...@@ -10,7 +10,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph; ...@@ -10,7 +10,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*; import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionCatalog;
...@@ -43,18 +46,13 @@ import java.util.List; ...@@ -43,18 +46,13 @@ import java.util.List;
**/ **/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
private SqlManager sqlManager; protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
private boolean useSqlFragment = true;
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader); super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
this.sqlManager = sqlManager;
} }
public CustomTableEnvironmentImpl( public CustomTableEnvironmentImpl(
CatalogManager catalogManager, CatalogManager catalogManager,
ModuleManager moduleManager, ModuleManager moduleManager,
SqlManager sqlManager,
FunctionCatalog functionCatalog, FunctionCatalog functionCatalog,
TableConfig tableConfig, TableConfig tableConfig,
StreamExecutionEnvironment executionEnvironment, StreamExecutionEnvironment executionEnvironment,
...@@ -71,7 +69,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -71,7 +69,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
planner, planner,
isStreamingMode, isStreamingMode,
userClassLoader); userClassLoader);
this.sqlManager = sqlManager;
} }
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment){ public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment){
...@@ -88,8 +85,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -88,8 +85,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
final ModuleManager moduleManager = new ModuleManager(); final ModuleManager moduleManager = new ModuleManager();
final SqlManager sqlManager = new SqlManager();
final CatalogManager catalogManager = final CatalogManager catalogManager =
CatalogManager.newBuilder() CatalogManager.newBuilder()
.classLoader(classLoader) .classLoader(classLoader)
...@@ -119,7 +114,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -119,7 +114,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return new CustomTableEnvironmentImpl( return new CustomTableEnvironmentImpl(
catalogManager, catalogManager,
moduleManager, moduleManager,
sqlManager,
functionCatalog, functionCatalog,
tableConfig, tableConfig,
executionEnvironment, executionEnvironment,
...@@ -150,39 +144,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -150,39 +144,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} }
} }
public void useSqlFragment() {
this.useSqlFragment = true;
}
public void unUseSqlFragment() {
this.useSqlFragment = false;
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
} else {
return super.explainSql(statement, extraDetails);
}
}
public ObjectNode getStreamGraph(String statement) { public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be explained.");
}
List<Operation> operations = super.getParser().parse(statement); List<Operation> operations = super.getParser().parse(statement);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query."); throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
...@@ -216,15 +178,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -216,15 +178,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public JobGraph getJobGraphFromInserts(List<String> statements) { public JobGraph getJobGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList(); List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){ for(String statement : statements){
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be add inserts.");
}
List<Operation> operations = getParser().parse(statement); List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException("Only single statement is supported."); throw new TableException("Only single statement is supported.");
...@@ -248,17 +201,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -248,17 +201,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) { public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult(); SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
String orignSql = statement;
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
record.setParseTrue(true);
record.setType("Sql Fragment");
record.setExplain(orignSql);
record.setExplainTrue(true);
return record;
}
}
List<Operation> operations = getParser().parse(statement); List<Operation> operations = getParser().parse(statement);
record.setParseTrue(true); record.setParseTrue(true);
if (operations.size() != 1) { if (operations.size() != 1) {
...@@ -290,66 +232,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -290,66 +232,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return record; return record;
} }
@Override
public String[] getCompletionHints(String statement, int position) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return new String[0];
}
}
return super.getCompletionHints(statement, position);
}
@Override
public Table sqlQuery(String query) {
if(useSqlFragment) {
query = sqlManager.parseVariable(query);
if (query.length() == 0) {
throw new TableException("Unsupported SQL query! The SQL query parsed is null.If it's a sql fragment, and please use executeSql().");
}
if (checkShowFragments(query)) {
return sqlManager.getSqlFragmentsTable(this);
} else {
return super.sqlQuery(query);
}
}else {
return super.sqlQuery(query);
}
}
@Override
public TableResult executeSql(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return CustomTableResultImpl.TABLE_RESULT_OK;
}
if (checkShowFragments(statement)) {
return sqlManager.getSqlFragments();
} else {
return super.executeSql(statement);
}
}else{
return super.executeSql(statement);
}
}
@Override
public void sqlUpdate(String stmt) {
if(useSqlFragment) {
stmt = sqlManager.parseVariable(stmt);
if (stmt.length() == 0) {
throw new TableException("Unsupported SQL update! The SQL update parsed is null.If it's a sql fragment, and please use executeSql().");
}
}
super.sqlUpdate(stmt);
}
public boolean checkShowFragments(String sql){
return sqlManager.checkShowFragments(sql);
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) { public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction); TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo); this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
......
package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/10/22 10:02
**/
@Internal
public final class SqlManager {
private Map<String, String> sqlFragments;
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
sqlFragments = new HashMap<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public List<String> listSqlFragments() {
return new ArrayList<>(sqlFragments.keySet());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public Map<String, String> getSqlFragment() {
return sqlFragments;
}
public TableResult getSqlFragments() {
List<Row> rows = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows);
}
public Iterator getSqlFragmentsIterator() {
return sqlFragments.entrySet().iterator();
}
public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
}
return environment.fromValues(keys);
}
public boolean checkShowFragments(String sql){
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) {
return statement;
}
String[] strs = statement.split(";");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim();
if (str.length() == 0) {
continue;
}
if (str.contains(":=")) {
String[] strs2 = str.split(":=");
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(":=") + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
}
} else {
sb.append(replaceVariable(str));
}
}
return sb.toString();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
}
...@@ -45,6 +45,11 @@ ...@@ -45,6 +45,11 @@
<artifactId>dlink-client-1.13</artifactId> <artifactId>dlink-client-1.13</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.13</artifactId> <artifactId>dlink-connector-jdbc-1.13</artifactId>
......
package com.dlink.catalog.function;
import com.dlink.constant.FlinkFunctionConstant;
import com.dlink.ud.udf.GetKey;
import com.dlink.ud.udtaf.RowsToMap;
import com.dlink.ud.udtaf.Top2;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.HashMap;
import java.util.Map;
/**
* FunctionManager
*
* @author wenmo
* @since 2021/6/14 21:19
*/
public class FunctionManager {
private static Map<String,UDFunction> functions = new HashMap<String,UDFunction>(){
{
put(FlinkFunctionConstant.GET_KEY,
new UDFunction(FlinkFunctionConstant.GET_KEY,
UDFunction.UDFunctionType.Scalar,
new GetKey()));
put(FlinkFunctionConstant.TO_MAP,
new UDFunction(FlinkFunctionConstant.TO_MAP,
UDFunction.UDFunctionType.TableAggregate,
new RowsToMap()));
put(FlinkFunctionConstant.TOP2,
new UDFunction(FlinkFunctionConstant.TOP2,
UDFunction.UDFunctionType.TableAggregate,
new Top2()));
}
};
public static Map<String,UDFunction> getUsedFunctions(String statement){
Map<String,UDFunction> map = new HashMap<>();
String sql = statement.toLowerCase();
for (Map.Entry<String, UDFunction> entry : functions.entrySet()) {
if(sql.contains(entry.getKey().toLowerCase())){
map.put(entry.getKey(),entry.getValue());
}
}
return map;
}
}
package com.dlink.catalog.function;
import org.apache.flink.table.functions.FunctionDefinition;
/**
* TODO
*
* @author wenmo
* @since 2021/6/14 22:14
*/
public class UDFunction {
public enum UDFunctionType {
Scalar, Table, Aggregate, TableAggregate
}
private String name;
private UDFunctionType type;
private FunctionDefinition function;
public UDFunction(String name, UDFunctionType type, FunctionDefinition function) {
this.name = name;
this.type = type;
this.function = function;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public UDFunctionType getType() {
return type;
}
public void setType(UDFunctionType type) {
this.type = type;
}
public FunctionDefinition getFunction() {
return function;
}
public void setFunction(FunctionDefinition function) {
this.function = function;
}
}
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkConstant;
import com.dlink.constant.NetConstant;
import lombok.Getter;
import lombok.Setter;
/**
* EnvironmentSetting
*
* @author wenmo
* @since 2021/5/25 13:45
**/
@Getter
@Setter
public class EnvironmentSetting {
private String host;
private int port;
private boolean useRemote;
public static final EnvironmentSetting LOCAL = new EnvironmentSetting(false);
public EnvironmentSetting(boolean useRemote) {
this.useRemote = useRemote;
}
public EnvironmentSetting(String host, int port) {
this.host = host;
this.port = port;
this.useRemote = true;
}
public static EnvironmentSetting build(String address){
Asserts.checkNull(address,"Flink 地址不能为空");
String[] strs = address.split(NetConstant.COLON);
if (strs.length >= 2) {
return new EnvironmentSetting(strs[0],Integer.parseInt(strs[1]));
} else {
return new EnvironmentSetting(strs[0],FlinkConstant.PORT);
}
}
public String getAddress(){
return host + NetConstant.COLON + port;
}
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Executor
* @author wenmo
* @since 2021/5/25 13:39
**/
public abstract class Executor {
protected StreamExecutionEnvironment environment;
protected CustomTableEnvironmentImpl stEnvironment;
protected EnvironmentSetting environmentSetting;
protected ExecutorSetting executorSetting;
public static Executor build(){
return new LocalStreamExecutor(ExecutorSetting.DEFAULT);
}
public static Executor build(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
if(environmentSetting.isUseRemote()){
return buildRemoteExecutor(environmentSetting,executorSetting);
}else{
return buildLocalExecutor(executorSetting);
}
}
public static Executor buildLocalExecutor(ExecutorSetting executorSetting){
return new LocalStreamExecutor(executorSetting);
}
public static Executor buildRemoteExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
environmentSetting.setUseRemote(true);
return new RemoteStreamExecutor(environmentSetting,executorSetting);
}
public StreamExecutionEnvironment getEnvironment(){
return environment;
}
public CustomTableEnvironmentImpl getCustomTableEnvironmentImpl(){
return stEnvironment;
}
public ExecutorSetting getExecutorSetting(){
return executorSetting;
}
public EnvironmentSetting getEnvironmentSetting(){
return environmentSetting;
}
protected void init(){
initEnvironment();
initStreamExecutionEnvironment();
}
public void update(ExecutorSetting executorSetting){
updateEnvironment(executorSetting);
updateStreamExecutionEnvironment(executorSetting);
}
private void initEnvironment(){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
}
private void updateEnvironment(ExecutorSetting executorSetting){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
}
private void initStreamExecutionEnvironment(){
stEnvironment = CustomTableEnvironmentImpl.create(environment);
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
}
if(executorSetting.getConfig()!=null){
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
}
}
}
private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting){
copyCatalog();
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
}
if(executorSetting.getConfig()!=null){
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
}
}
}
private void copyCatalog(){
String[] catalogs = stEnvironment.listCatalogs();
CustomTableEnvironmentImpl newstEnvironment = CustomTableEnvironmentImpl.create(environment);
for (int i = 0; i < catalogs.length; i++) {
if(stEnvironment.getCatalog(catalogs[i]).isPresent()) {
newstEnvironment.getCatalogManager().unregisterCatalog(catalogs[i],true);
newstEnvironment.registerCatalog(catalogs[i], stEnvironment.getCatalog(catalogs[i]).get());
}
}
stEnvironment = newstEnvironment;
}
public JobExecutionResult execute(String jobName) throws Exception{
return stEnvironment.execute(jobName);
}
public TableResult executeSql(String statement){
return stEnvironment.executeSql(statement);
}
public Table sqlQuery(String statement){
return stEnvironment.sqlQuery(statement);
}
public String explainSql(String statement, ExplainDetail... extraDetails){
return stEnvironment.explainSql(statement,extraDetails);
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails){
return stEnvironment.explainSqlRecord(statement,extraDetails);
}
public ObjectNode getStreamGraph(String statement){
return stEnvironment.getStreamGraph(statement);
}
public void registerFunction(String name, ScalarFunction function){
stEnvironment.registerFunction(name,function);
}
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2){
stEnvironment.createTemporarySystemFunction(name,var2);
}
public CatalogManager getCatalogManager(){
return stEnvironment.getCatalogManager();
}
public JobGraph getJobGraphFromInserts(List<String> statements){
return stEnvironment.getJobGraphFromInserts(statements);
}
}
package com.dlink.executor;
import com.dlink.job.JobConfig;
import lombok.Getter;
import lombok.Setter;
import java.util.Map;
/**
* ExecutorSetting
*
* @author wenmo
* @since 2021/5/25 13:43
**/
@Setter
@Getter
public class ExecutorSetting {
private Integer checkpoint;
private Integer parallelism;
private boolean useSqlFragment;
private String savePointPath;
private String jobName;
private Map<String,String> config;
public static final ExecutorSetting DEFAULT = new ExecutorSetting(0,1,true);
public ExecutorSetting(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(Integer checkpoint) {
this.checkpoint = checkpoint;
}
public ExecutorSetting(Integer checkpoint, boolean useSqlFragment) {
this.checkpoint = checkpoint;
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
this.jobName = jobName;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName, Map<String, String> config) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
this.jobName = jobName;
this.config = config;
}
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* LocalStreamExecuter
*
* @author wenmo
* @since 2021/5/25 13:48
**/
public class LocalStreamExecutor extends Executor {
public LocalStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
init();
}
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* RemoteStreamExecutor
*
* @author wenmo
* @since 2021/5/25 14:05
**/
public class RemoteStreamExecutor extends Executor {
public RemoteStreamExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
init();
}
}
...@@ -7,7 +7,9 @@ import com.dlink.explainer.ca.*; ...@@ -7,7 +7,9 @@ import com.dlink.explainer.ca.*;
import com.dlink.explainer.trans.Trans; import com.dlink.explainer.trans.Trans;
import com.dlink.explainer.trans.TransGenerator; import com.dlink.explainer.trans.TransGenerator;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.dlink.trans.Operations;
import com.dlink.utils.FlinkUtil; import com.dlink.utils.FlinkUtil;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
...@@ -40,31 +42,43 @@ public class Explainer { ...@@ -40,31 +42,43 @@ public class Explainer {
public List<SqlExplainResult> explainSqlResult(String statement) { public List<SqlExplainResult> explainSqlResult(String statement) {
String[] sqls = SqlUtil.getStatements(statement); String[] sqls = SqlUtil.getStatements(statement);
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>(); List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
for (int i = 0; i < sqls.length; i++) { int index = 1;
String sql = sqls[i].trim(); for (String item : sqls) {
SqlExplainResult record = new SqlExplainResult();
String sql = "";
try {
sql = FlinkInterceptor.pretreatStatement(executor,item);
if(Asserts.isNullString(sql)){ if(Asserts.isNullString(sql)){
continue; continue;
} }
SqlExplainResult record = new SqlExplainResult(); SqlType operationType = Operations.getOperationType(statement);
try { if (operationType.equals(SqlType.INSERT)||operationType.equals(SqlType.SELECT)) {
if (!FlinkInterceptor.build(executor.getCustomTableEnvironmentImpl(), sqls[i])) { record = executor.explainSqlRecord(sql);
record = executor.explainSqlRecord(sqls[i]); if(Asserts.isNull(record)){
if (Asserts.isEquals(FlinkSQLConstant.DDL,record.getType())) { continue;
executor.executeSql(sqls[i]);
} }
}else{ }else{
record.setParseTrue(true); record = executor.explainSqlRecord(sql);
record.setExplainTrue(true); if(Asserts.isNull(record)){
continue;
}
executor.executeSql(sql);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
record.setError(e.getMessage()); record.setError(e.getMessage());
} finally { record.setExplainTrue(false);
record.setExplainTime(new Date()); record.setExplainTime(new Date());
record.setIndex(i + 1); record.setSql(sql);
record.setSql(sqls[i]); record.setIndex(index);
sqlExplainRecords.add(record); sqlExplainRecords.add(record);
break;
} }
record.setExplainTrue(true);
record.setExplainTime(new Date());
record.setSql(sql);
record.setIndex(index++);
sqlExplainRecords.add(record);
} }
return sqlExplainRecords; return sqlExplainRecords;
} }
...@@ -72,10 +86,10 @@ public class Explainer { ...@@ -72,10 +86,10 @@ public class Explainer {
public ObjectNode getStreamGraph(String statement){ public ObjectNode getStreamGraph(String statement){
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement); List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>(); List<String> strPlans = new ArrayList<>();
for (int i = 0; i < sqlExplainRecords.size(); i++) { for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(sqlExplainRecords.get(i).getType()) if (Asserts.isNotNull(item.getType())
&& sqlExplainRecords.get(i).getType().contains(FlinkSQLConstant.DML)) { && item.getType().contains(FlinkSQLConstant.DML)) {
strPlans.add(sqlExplainRecords.get(i).getSql()); strPlans.add(item.getSql());
} }
} }
if(strPlans.size()>0){ if(strPlans.size()>0){
......
package com.dlink.interceptor;
import com.dlink.assertion.Asserts;
import com.dlink.catalog.function.FunctionManager;
import com.dlink.catalog.function.UDFunction;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.trans.Operation;
import com.dlink.trans.Operations;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* FlinkInterceptor
*
* @author wenmo
* @since 2021/6/11 22:17
*/
public class FlinkInterceptor {
public static boolean build( CustomTableEnvironmentImpl stEnvironment,String statemnet){
initFunctions(stEnvironment,statemnet);
/*if(initConfiguration(stEnvironment,statemnet)){
return true;
}*/
Operation operation = Operations.buildOperation(statemnet);
if(Asserts.isNotNull(operation)) {
operation.build(stEnvironment);
return operation.noExecute();
}
return false;
}
private static void initFunctions(CustomTableEnvironmentImpl stEnvironment,String statemnet){
Map<String, UDFunction> usedFunctions = FunctionManager.getUsedFunctions(statemnet);
String[] udfs = stEnvironment.listUserDefinedFunctions();
List<String> udflist = Arrays.asList(udfs);
for (Map.Entry<String, UDFunction> entry : usedFunctions.entrySet()) {
if(!udflist.contains(entry.getKey())){
if( entry.getValue().getType()== UDFunction.UDFunctionType.Scalar){
stEnvironment.registerFunction(entry.getKey(),
(ScalarFunction)entry.getValue().getFunction());
}else if( entry.getValue().getType()== UDFunction.UDFunctionType.Table){
stEnvironment.registerFunction(entry.getKey(),
(TableFunction)entry.getValue().getFunction());
}else if( entry.getValue().getType()== UDFunction.UDFunctionType.Aggregate){
stEnvironment.registerFunction(entry.getKey(),
(AggregateFunction)entry.getValue().getFunction());
}else if( entry.getValue().getType()== UDFunction.UDFunctionType.TableAggregate){
stEnvironment.registerFunction(entry.getKey(),
(TableAggregateFunction)entry.getValue().getFunction());
}
}
}
}
}
package com.dlink.job; package com.dlink.job;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.GatewayConfig; import com.dlink.gateway.config.GatewayConfig;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.util.Map;
/** /**
* JobConfig * JobConfig
* *
...@@ -91,4 +94,11 @@ public class JobConfig { ...@@ -91,4 +94,11 @@ public class JobConfig {
useRemote = sessionConfig.isUseRemote(); useRemote = sessionConfig.isUseRemote();
} }
} }
public void buildGatewayConfig(Map<String,String> config){
gatewayConfig = new GatewayConfig();
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath"),
config.get("flinkLibPath"),
config.get("hadoopConfigPath")));
}
} }
...@@ -10,7 +10,6 @@ import com.dlink.explainer.Explainer; ...@@ -10,7 +10,6 @@ import com.dlink.explainer.Explainer;
import com.dlink.gateway.Gateway; import com.dlink.gateway.Gateway;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.FlinkConfig; import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
...@@ -22,14 +21,12 @@ import com.dlink.session.SessionPool; ...@@ -22,14 +21,12 @@ import com.dlink.session.SessionPool;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
...@@ -170,7 +167,7 @@ public class JobManager extends RunTime { ...@@ -170,7 +167,7 @@ public class JobManager extends RunTime {
return false; return false;
} }
@Deprecated /*@Deprecated
public SubmitResult submit(String statement) { public SubmitResult submit(String statement) {
if (statement == null || "".equals(statement)) { if (statement == null || "".equals(statement)) {
return SubmitResult.error("FlinkSql语句不存在"); return SubmitResult.error("FlinkSql语句不存在");
...@@ -234,8 +231,8 @@ public class JobManager extends RunTime { ...@@ -234,8 +231,8 @@ public class JobManager extends RunTime {
result.setMsg(LocalDateTime.now().toString() + ":任务提交成功!"); result.setMsg(LocalDateTime.now().toString() + ":任务提交成功!");
return result; return result;
} }
*/
@Deprecated /*@Deprecated
public SubmitResult submitGraph(String statement, GatewayConfig gatewayConfig) { public SubmitResult submitGraph(String statement, GatewayConfig gatewayConfig) {
if (statement == null || "".equals(statement)) { if (statement == null || "".equals(statement)) {
return SubmitResult.error("FlinkSql语句不存在"); return SubmitResult.error("FlinkSql语句不存在");
...@@ -372,7 +369,7 @@ public class JobManager extends RunTime { ...@@ -372,7 +369,7 @@ public class JobManager extends RunTime {
} }
close(); close();
return job.getJobResult(); return job.getJobResult();
} }*/
public JobResult executeSql(String statement) { public JobResult executeSql(String statement) {
String address = null; String address = null;
...@@ -391,16 +388,18 @@ public class JobManager extends RunTime { ...@@ -391,16 +388,18 @@ public class JobManager extends RunTime {
try { try {
for (StatementParam item : jobParam.getDdl()) { for (StatementParam item : jobParam.getDdl()) {
currentSql = item.getValue(); currentSql = item.getValue();
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) { /*if (!FlinkInterceptor.build(executor, item.getValue())) {
executor.executeSql(item.getValue());
}*/
executor.executeSql(item.getValue()); executor.executeSql(item.getValue());
}
} }
if(config.isUseStatementSet()&&useGateway) { if(config.isUseStatementSet()&&useGateway) {
List<String> inserts = new ArrayList<>(); List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) { for (StatementParam item : jobParam.getTrans()) {
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) { /*if (!FlinkInterceptor.build(executor, item.getValue())) {
inserts.add(item.getValue());
}*/
inserts.add(item.getValue()); inserts.add(item.getValue());
}
} }
currentSql = String.join(FlinkSQLConstant.SEPARATOR,inserts); currentSql = String.join(FlinkSQLConstant.SEPARATOR,inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts); JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
...@@ -414,10 +413,12 @@ public class JobManager extends RunTime { ...@@ -414,10 +413,12 @@ public class JobManager extends RunTime {
StatementSet statementSet = stEnvironment.createStatementSet(); StatementSet statementSet = stEnvironment.createStatementSet();
for (StatementParam item : jobParam.getTrans()) { for (StatementParam item : jobParam.getTrans()) {
if(item.getType().equals(SqlType.INSERT)) { if(item.getType().equals(SqlType.INSERT)) {
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) { /*if (!FlinkInterceptor.build(executor, item.getValue())) {
statementSet.addInsertSql(item.getValue());
inserts.add(item.getValue());
}*/
statementSet.addInsertSql(item.getValue()); statementSet.addInsertSql(item.getValue());
inserts.add(item.getValue()); inserts.add(item.getValue());
}
} }
} }
if(inserts.size()>0) { if(inserts.size()>0) {
...@@ -434,10 +435,12 @@ public class JobManager extends RunTime { ...@@ -434,10 +435,12 @@ public class JobManager extends RunTime {
}else if(!config.isUseStatementSet()&&useGateway) { }else if(!config.isUseStatementSet()&&useGateway) {
List<String> inserts = new ArrayList<>(); List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) { for (StatementParam item : jobParam.getTrans()) {
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) { /*if (!FlinkInterceptor.build(executor, item.getValue())) {
inserts.add(item.getValue());
break;
}*/
inserts.add(item.getValue()); inserts.add(item.getValue());
break; break;
}
} }
currentSql = String.join(FlinkSQLConstant.SEPARATOR,inserts); currentSql = String.join(FlinkSQLConstant.SEPARATOR,inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts); JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
...@@ -449,7 +452,7 @@ public class JobManager extends RunTime { ...@@ -449,7 +452,7 @@ public class JobManager extends RunTime {
}else{ }else{
for (StatementParam item : jobParam.getTrans()) { for (StatementParam item : jobParam.getTrans()) {
currentSql = item.getValue(); currentSql = item.getValue();
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) { if (!FlinkInterceptor.build(executor, item.getValue())) {
TableResult tableResult = executor.executeSql(item.getValue()); TableResult tableResult = executor.executeSql(item.getValue());
if (tableResult.getJobClient().isPresent()) { if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
...@@ -487,7 +490,8 @@ public class JobManager extends RunTime { ...@@ -487,7 +490,8 @@ public class JobManager extends RunTime {
List<StatementParam> ddl = new ArrayList<>(); List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>(); List<StatementParam> trans = new ArrayList<>();
for (String item : statements) { for (String item : statements) {
String statement = SqlUtil.removeNote(item); String statement = FlinkInterceptor.pretreatStatement(executor,item);
// String statement = SqlUtil.removeNote(item);
if (statement.isEmpty()) { if (statement.isEmpty()) {
continue; continue;
} }
......
package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import java.util.Arrays;
import java.util.List;
/**
* AbstractOperation
*
* @author wenmo
* @since 2021/6/14 18:18
*/
public class AbstractOperation {
protected String statement;
public AbstractOperation() {
}
public AbstractOperation(String statement) {
this.statement = statement;
}
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public boolean checkFunctionExist(CustomTableEnvironmentImpl stEnvironment,String key){
String[] udfs = stEnvironment.listUserDefinedFunctions();
List<String> udflist = Arrays.asList(udfs);
if(udflist.contains(key.toLowerCase())){
return true;
}else {
return false;
}
}
public boolean noExecute(){
return true;
}
}
package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
/**
* TODO
*
* @author wenmo
* @since 2021/6/13 19:34
*/
public interface CreateOperation extends Operation{
//void create(CustomTableEnvironmentImpl stEnvironment);
}
package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
/**
* Operation
*
* @author wenmo
* @since 2021/6/13 19:24
*/
public interface Operation {
String getHandle();
Operation create(String statement);
void build(CustomTableEnvironmentImpl stEnvironment);
boolean noExecute();
}
package com.dlink.trans;
import com.dlink.parser.SqlType;
import com.dlink.trans.ddl.CreateAggTableOperation;
import com.dlink.trans.ddl.SetOperation;
/**
* Operations
*
* @author wenmo
* @since 2021/5/25 15:50
**/
public class Operations {
private static Operation[] operations = {
new CreateAggTableOperation(),
new SetOperation()
};
public static SqlType getSqlTypeFromStatements(String statement){
String[] statements = statement.split(";");
SqlType sqlType = SqlType.UNKNOWN;
for (String item : statements) {
if (item.trim().isEmpty()) {
continue;
}
sqlType = Operations.getOperationType(item);
if(sqlType == SqlType.INSERT ||sqlType == SqlType.SELECT){
return sqlType;
}
}
return sqlType;
}
public static SqlType getOperationType(String sql) {
String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").trim().toUpperCase();
SqlType type = SqlType.UNKNOWN;
for (SqlType sqlType : SqlType.values()) {
if (sqlTrim.startsWith(sqlType.getType())) {
type = sqlType;
break;
}
}
return type;
}
public static Operation buildOperation(String statement){
String sql = statement.replace("\n"," ").replaceAll("\\s{1,}", " ").trim().toUpperCase();
for (int i = 0; i < operations.length; i++) {
if(sql.startsWith(operations[i].getHandle())){
return operations[i].create(statement);
}
}
return null;
}
}
package com.dlink.trans.ddl;
import com.dlink.parser.SingleSqlParserFactory;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
/**
* AggTable
*
* @author wenmo
* @since 2021/6/13 20:32
*/
public class AggTable {
private String statement;
private String name;
private String columns;
private String table;
private List<String> wheres;
private String groupBy;
private String aggBy;
public AggTable(String statement, String name, String columns, String table, List<String> wheres, String groupBy, String aggBy) {
this.statement = statement;
this.name = name;
this.columns = columns;
this.table = table;
this.wheres = wheres;
this.groupBy = groupBy;
this.aggBy = aggBy;
}
public static AggTable build(String statement){
Map<String,List<String>> map = SingleSqlParserFactory.generateParser(statement);
return new AggTable(statement,
getString(map,"CREATE AGGTABLE"),
getString(map,"SELECT"),
getString(map,"FROM"),
map.get("WHERE"),
getString(map,"GROUP BY"),
getString(map,"AGG BY"));
}
private static String getString(Map<String,List<String>> map,String key){
return StringUtils.join(map.get(key),",");
}
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColumns() {
return columns;
}
public void setColumns(String columns) {
this.columns = columns;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public List<String> getWheres() {
return wheres;
}
public void setWheres(List<String> wheres) {
this.wheres = wheres;
}
public String getGroupBy() {
return groupBy;
}
public void setGroupBy(String groupBy) {
this.groupBy = groupBy;
}
public String getAggBy() {
return aggBy;
}
public void setAggBy(String aggBy) {
this.aggBy = aggBy;
}
}
package com.dlink.trans.ddl;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.table.api.Table;
import java.util.List;
/**
* CreateAggTableOperation
*
* @author wenmo
* @since 2021/6/13 19:24
*/
public class CreateAggTableOperation extends AbstractOperation implements Operation{
private String KEY_WORD = "CREATE AGGTABLE";
public CreateAggTableOperation() {
}
public CreateAggTableOperation(String statement) {
super(statement);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public Operation create(String statement) {
return new CreateAggTableOperation(statement);
}
@Override
public void build(CustomTableEnvironmentImpl stEnvironment) {
AggTable aggTable = AggTable.build(statement);
Table source = stEnvironment.sqlQuery("select * from "+ aggTable.getTable());
List<String> wheres = aggTable.getWheres();
if(wheres!=null&&wheres.size()>0) {
for (String s : wheres) {
source = source.filter(s);
}
}
Table sink = source.groupBy(aggTable.getGroupBy())
.flatAggregate(aggTable.getAggBy())
.select(aggTable.getColumns());
stEnvironment.registerTable(aggTable.getName(), sink);
}
}
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* SetOperation
*
* @author wenmo
* @since 2021/10/21 19:56
**/
public class SetOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "SET";
public SetOperation() {
}
public SetOperation(String statement) {
super(statement);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public Operation create(String statement) {
return new SetOperation(statement);
}
@Override
public void build(CustomTableEnvironmentImpl stEnvironment) {
Map<String,List<String>> map = SingleSqlParserFactory.generateParser(statement);
if(Asserts.isNotNullMap(map)&&map.size()==2) {
Map<String, String> confMap = new HashMap<>();
confMap.put(StringUtils.join(map.get("SET"), "."), StringUtils.join(map.get("="), ","));
stEnvironment.getConfig().addConfiguration(Configuration.fromMap(confMap));
}
}
}
package com.dlink.utils;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
/**
* SqlUtil
*
* @author wenmo
* @since 2021/7/14 21:57
*/
public class SqlUtil {
public static String[] getStatements(String sql){
if(Asserts.isNullString(sql)){
return new String[0];
}
return sql.split(FlinkSQLConstant.SEPARATOR);
}
public static String removeNote(String sql){
if(Asserts.isNotNullString(sql)) {
sql = sql.replaceAll("--([^'\\r\\n]{0,}('[^'\\r\\n]{0,}'){0,1}[^'\\r\\n]{0,}){0,}$", "").trim();
}
return sql;
}
}
...@@ -22,41 +22,6 @@ import java.util.List; ...@@ -22,41 +22,6 @@ import java.util.List;
**/ **/
public class JobManagerTest { public class JobManagerTest {
@Test
public void submitJobTest2(){
ExecutorSetting setting = new ExecutorSetting(true);
JobManager jobManager = new JobManager("192.168.123.157:8081","test2",100, setting);
String sql1 ="CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (sid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://192.168.24.1:3306/data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" +
" 'username'='datalink',\n" +
" 'password'='datalink',\n" +
" 'table-name' = 'student'\n" +
")";
String sql2 ="CREATE TABLE man (\n" +
" pid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (pid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://192.168.24.1:3306/data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" +
" 'username'='datalink',\n" +
" 'password'='datalink',\n" +
" 'table-name' = 'man'\n" +
")";
String sql3 = "INSERT INTO man SELECT sid as pid,name from student";
List<String> sqls = new ArrayList<>();
sqls.add(sql1);
sqls.add(sql2);
sqls.add(sql3);
SubmitResult result = jobManager.submit(sqls);
System.out.println(result.isSuccess());
}
@Test @Test
public void executeJobTest(){ public void executeJobTest(){
ExecutorSetting setting = new ExecutorSetting(0,1,false,null); ExecutorSetting setting = new ExecutorSetting(0,1,false,null);
......
...@@ -48,4 +48,16 @@ public class SqlParserTest { ...@@ -48,4 +48,16 @@ public class SqlParserTest {
Map<String,List<String>> lists = SingleSqlParserFactory.generateParser(sql); Map<String,List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString()); System.out.println(lists.toString());
} }
@Test
public void regTest(){
String sql = "--并行度\n" +
"CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (sid) NOT ENFORCED\n" +
") WITH ${tb}";
sql=sql.replaceAll("--([^'\r\n]{0,}('[^'\r\n]{0,}'){0,1}[^'\r\n]{0,}){0,}","").trim();
System.out.println(sql);
}
} }
...@@ -42,17 +42,12 @@ ...@@ -42,17 +42,12 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.13</artifactId> <artifactId>dlink-client-1.13</artifactId>
<!--<scope>provided</scope>--> <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<!--<scope>provided</scope>-->
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId> <artifactId>dlink-function</artifactId>
<!--<scope>provided</scope>--> <scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>
......
package com.dlink.executor; package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.executor.custom.CustomTableEnvironmentImpl; import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.custom.SqlManager; import com.dlink.executor.custom.CustomTableResultImpl;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
...@@ -104,12 +106,8 @@ public abstract class Executor { ...@@ -104,12 +106,8 @@ public abstract class Executor {
} }
private void initStreamExecutionEnvironment(){ private void initStreamExecutionEnvironment(){
useSqlFragment = executorSetting.isUseSqlFragment();
stEnvironment = CustomTableEnvironmentImpl.create(environment); stEnvironment = CustomTableEnvironmentImpl.create(environment);
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){ if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName()); stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
} }
...@@ -121,12 +119,8 @@ public abstract class Executor { ...@@ -121,12 +119,8 @@ public abstract class Executor {
} }
private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting){ private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting){
useSqlFragment = executorSetting.isUseSqlFragment();
copyCatalog(); copyCatalog();
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){ if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName()); stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
} }
...@@ -149,28 +143,48 @@ public abstract class Executor { ...@@ -149,28 +143,48 @@ public abstract class Executor {
stEnvironment = newstEnvironment; stEnvironment = newstEnvironment;
} }
public JobExecutionResult execute(String jobName) throws Exception{ private String pretreatStatement(String statement){
return stEnvironment.execute(jobName); return FlinkInterceptor.pretreatStatement(this,statement);
}
private boolean pretreatExecute(String statement){
return !FlinkInterceptor.build(this,statement);
} }
public TableResult executeSql(String statement){ public TableResult executeSql(String statement){
statement = pretreatStatement(statement);
if(pretreatExecute(statement)) {
return stEnvironment.executeSql(statement); return stEnvironment.executeSql(statement);
}else{
return CustomTableResultImpl.TABLE_RESULT_OK;
} }
public Table sqlQuery(String statement){
return stEnvironment.sqlQuery(statement);
} }
public String explainSql(String statement, ExplainDetail... extraDetails){ public String explainSql(String statement, ExplainDetail... extraDetails){
statement = pretreatStatement(statement);
if(pretreatExecute(statement)) {
return stEnvironment.explainSql(statement,extraDetails); return stEnvironment.explainSql(statement,extraDetails);
}else{
return "";
}
} }
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails){ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails){
statement = pretreatStatement(statement);
if(Asserts.isNotNullString(statement)&&pretreatExecute(statement)) {
return stEnvironment.explainSqlRecord(statement,extraDetails); return stEnvironment.explainSqlRecord(statement,extraDetails);
}else{
return null;
}
} }
public ObjectNode getStreamGraph(String statement){ public ObjectNode getStreamGraph(String statement){
statement = pretreatStatement(statement);
if(pretreatExecute(statement)) {
return stEnvironment.getStreamGraph(statement); return stEnvironment.getStreamGraph(statement);
}else{
return null;
}
} }
public void registerFunction(String name, ScalarFunction function){ public void registerFunction(String name, ScalarFunction function){
...@@ -184,4 +198,8 @@ public abstract class Executor { ...@@ -184,4 +198,8 @@ public abstract class Executor {
public CatalogManager getCatalogManager(){ public CatalogManager getCatalogManager(){
return stEnvironment.getCatalogManager(); return stEnvironment.getCatalogManager();
} }
public JobGraph getJobGraphFromInserts(List<String> statements){
return stEnvironment.getJobGraphFromInserts(statements);
}
} }
...@@ -31,7 +31,7 @@ public class FlinkInterceptor { ...@@ -31,7 +31,7 @@ public class FlinkInterceptor {
statement = executor.getSqlManager().parseVariable(statement); statement = executor.getSqlManager().parseVariable(statement);
} }
initFunctions(executor.getCustomTableEnvironmentImpl(), statement); initFunctions(executor.getCustomTableEnvironmentImpl(), statement);
return statement; return statement.trim();
} }
public static boolean build(Executor executor, String statement) { public static boolean build(Executor executor, String statement) {
......
...@@ -20,7 +20,7 @@ public class SqlUtil { ...@@ -20,7 +20,7 @@ public class SqlUtil {
public static String removeNote(String sql){ public static String removeNote(String sql){
if(Asserts.isNotNullString(sql)) { if(Asserts.isNotNullString(sql)) {
sql = sql.replaceAll("--([^'\\r\\n]{0,}('[^'\\r\\n]{0,}'){0,1}[^'\\r\\n]{0,}){0,}$", "").trim(); sql = sql.replaceAll("--([^'\r\n]{0,}('[^'\r\n]{0,}'){0,1}[^'\r\n]{0,}){0,}", "").trim();
} }
return sql; return sql;
} }
......
...@@ -378,6 +378,9 @@ export default (): React.ReactNode => { ...@@ -378,6 +378,9 @@ export default (): React.ReactNode => {
<li> <li>
<Link>新增 FlinkSQL 语句集提交</Link> <Link>新增 FlinkSQL 语句集提交</Link>
</li> </li>
<li>
<Link>executor 模块独立并优化增强逻辑</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
...@@ -212,6 +212,11 @@ ...@@ -212,6 +212,11 @@
<artifactId>dlink-gateway</artifactId> <artifactId>dlink-gateway</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
<build> <build>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment