Commit baee5b73 authored by wenmo's avatar wenmo

解决set的兼容问题,升级所有Flink依赖版本

parent 29280b25
......@@ -42,6 +42,7 @@ Dlink 是一个专业的 FlinkSQL Studio,可以在线开发、补全、校验
| | | 新增 关键字高亮与代码缩略图 | 0.4.0 |
| | | 新增 选中片段执行 | 0.4.0 |
| | | 新增 布局拖拽 | 0.4.0 |
| | | 新增 SQL导出 | 0.5.0 |
| | | 支持 local 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 standalone 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn session 模式下 FlinkSQL 提交 | 0.4.0 |
......
......@@ -135,7 +135,7 @@
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>1.13.3</version>
<version>1.13.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
......
......@@ -13,69 +13,10 @@
<properties>
<mainClass>com.dlink.app.MainApp</mainClass>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.2</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>-->
<!--<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
......
package com.dlink.executor.custom;
package com.dlink.executor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......
package com.dlink.executor.custom;
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
......
......@@ -13,7 +13,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.4</flink.version>
<flink.version>1.11.6</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......
package com.dlink.executor.custom;
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
......@@ -8,13 +8,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
......@@ -35,6 +34,7 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row;
import java.lang.reflect.Method;
import java.util.ArrayList;
......@@ -118,6 +118,10 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public JobPlanInfo getJobPlanInfo(List<String> statements) {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
......@@ -198,4 +202,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
return false;
}
}
package com.dlink.executor.custom;
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
......
......@@ -14,7 +14,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.5</flink.version>
<flink.version>1.12.7</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......
package com.dlink.executor.custom;
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
......@@ -34,6 +34,7 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row;
import java.lang.reflect.Method;
import java.util.ArrayList;
......@@ -87,32 +88,32 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
public ObjectNode getStreamGraph(String statement) {
List<Operation> operations = super.getParser().parse(statement);
List<Operation> operations = super.parser.parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
if (operations.get(i) instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operations.get(i));
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if (execEnv instanceof ExecutorBase) {
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
ObjectNode objectNode = mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
} finally {
return objectNode;
}
}else{
throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query.");
} else {
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
}
......@@ -123,27 +124,27 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
for (String statement : statements) {
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation)operation);
modifyOperations.add((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
if (execEnv instanceof ExecutorBase) {
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
if(tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
if (tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
}else{
} else {
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
......@@ -154,7 +155,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = getParser().parse(statement);
List<Operation> operations = parser.parse(statement);
record.setParseTrue(true);
if (operations.size() != 1) {
throw new TableException(
......@@ -173,12 +174,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
record.setExplain(operation.asSummaryString());
operationlist.remove(i);
record.setType("DDL");
i=i-1;
i = i - 1;
}
}
record.setExplainTrue(true);
if(operationlist.size()==0){
//record.setExplain("DDL语句不进行解释。");
if (operationlist.size() == 0) {
return record;
}
record.setExplain(planner.explain(operationlist, extraDetails));
......@@ -202,4 +202,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
return false;
}
}
\ No newline at end of file
package com.dlink.executor.custom;
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
......
......@@ -14,7 +14,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.3</flink.version>
<flink.version>1.13.5</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......
package com.dlink.executor.custom;
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
......@@ -33,11 +33,15 @@ import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -88,7 +92,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
public ObjectNode getStreamGraph(String statement) {
List<Operation> operations = super.parser.parse(statement);
List<Operation> operations = super.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
......@@ -98,7 +102,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
modifyOperations.add((ModifyOperation)operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
......@@ -113,9 +117,13 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return objectNode;
}
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query.");
}
}
}
public JobPlanInfo getJobPlanInfo(List<String> statements) {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
......@@ -151,7 +159,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = parser.parse(statement);
List<Operation> operations = getParser().parse(statement);
record.setParseTrue(true);
if (operations.size() != 1) {
throw new TableException(
......@@ -175,6 +183,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
record.setExplainTrue(true);
if(operationlist.size()==0){
//record.setExplain("DDL语句不进行解释。");
return record;
}
record.setExplain(planner.explain(operationlist, extraDetails));
......@@ -197,4 +206,45 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public boolean parseAndLoadConfiguration(String statement,StreamExecutionEnvironment environment,Map<String,Object> setMap){
List<Operation> operations = getParser().parse(statement);
for(Operation operation : operations){
if(operation instanceof SetOperation){
callSet((SetOperation)operation,environment,setMap);
return true;
} else if (operation instanceof ResetOperation){
callReset((ResetOperation)operation,environment,setMap);
return true;
}
}
return false;
}
private void callSet(SetOperation setOperation,StreamExecutionEnvironment environment,Map<String,Object> setMap){
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,value);
setMap.put(key,value);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
getConfig().addConfiguration(configuration);
}
}
private void callReset(ResetOperation resetOperation,StreamExecutionEnvironment environment,Map<String,Object> setMap) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,null);
setMap.remove(key);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
getConfig().addConfiguration(configuration);
}else {
setMap.clear();
}
}
}
package com.dlink.executor.custom;
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
......
package com.dlink.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.Operation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.*;
/**
* FlinkUtil
......@@ -26,4 +25,5 @@ public class FlinkUtil {
return new ArrayList<String>();
}
}
}
......@@ -13,7 +13,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.0</flink.version>
<flink.version>1.14.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......
package com.dlink.executor.custom;
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
......@@ -6,15 +6,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
......@@ -33,11 +31,16 @@ import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.planner.delegation.DefaultExecutor;
import org.apache.flink.types.Row;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 定制TableEnvironmentImpl
......@@ -256,4 +259,45 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String,Object> setMap){
List<Operation> operations = getParser().parse(statement);
for(Operation operation : operations){
if(operation instanceof SetOperation){
callSet((SetOperation)operation,environment,setMap);
return true;
} else if (operation instanceof ResetOperation){
callReset((ResetOperation)operation,environment,setMap);
return true;
}
}
return false;
}
private void callSet(SetOperation setOperation,StreamExecutionEnvironment environment,Map<String,Object> setMap){
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,value);
setMap.put(key,value);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
getConfig().addConfiguration(configuration);
}
}
private void callReset(ResetOperation resetOperation,StreamExecutionEnvironment environment,Map<String,Object> setMap) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,null);
setMap.remove(key);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
getConfig().addConfiguration(configuration);
}else {
setMap.clear();
}
}
}
package com.dlink.executor.custom;
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
......
......@@ -10,7 +10,7 @@ public interface CommonConstant {
/**
* 项目版本号(banner使用)
*/
String PROJECT_VERSION = "0.4.0";
String PROJECT_VERSION = "0.5.0-SNAPSHOT";
/**
* 实例健康
*/
......
......@@ -14,7 +14,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.5</flink.version>
<flink.version>1.12.7</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......
......@@ -14,7 +14,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.2</flink.version>
<flink.version>1.13.5</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.custom.CustomTableResultImpl;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
......@@ -21,9 +19,6 @@ import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import java.util.HashMap;
import java.util.List;
......@@ -286,43 +281,6 @@ public abstract class Executor {
}
public boolean parseAndLoadConfiguration(String statement){
List<Operation> operations = stEnvironment.getParser().parse(statement);
for(Operation operation : operations){
if(operation instanceof SetOperation){
callSet((SetOperation)operation);
return true;
} else if (operation instanceof ResetOperation){
callReset((ResetOperation)operation);
return true;
}
}
return false;
}
private void callSet(SetOperation setOperation){
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,value);
setConfig.put(key,value);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
stEnvironment.getConfig().addConfiguration(configuration);
}
}
private void callReset(ResetOperation resetOperation) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,null);
setConfig.remove(key);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
stEnvironment.getConfig().addConfiguration(configuration);
}else {
setConfig.clear();
}
return stEnvironment.parseAndLoadConfiguration(statement,environment,setConfig);
}
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.custom.CustomTableResultImpl;
import com.dlink.executor.custom.TableSchemaField;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
......
......@@ -4,7 +4,7 @@ import com.dlink.assertion.Asserts;
import com.dlink.catalog.function.FunctionManager;
import com.dlink.catalog.function.UDFunction;
import com.dlink.executor.Executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.CustomTableEnvironmentImpl;
import com.dlink.trans.Operation;
import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
......@@ -38,12 +38,9 @@ public class FlinkInterceptor {
public static boolean build(Executor executor, String statement) {
Operation operation = Operations.buildOperation(statement);
if (Asserts.isNotNull(operation)) {
operation.build(executor.getCustomTableEnvironmentImpl());
operation.build(executor);
return operation.noExecute();
}
if(executor.parseAndLoadConfiguration(statement)){
return true;
}
return false;
}
......
package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.CustomTableEnvironmentImpl;
import java.util.Arrays;
import java.util.List;
......
package com.dlink.trans;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.Executor;
/**
* Operation
......@@ -14,7 +14,7 @@ public interface Operation {
Operation create(String statement);
void build(CustomTableEnvironmentImpl stEnvironment);
void build(Executor executor);
boolean noExecute();
}
......@@ -14,7 +14,7 @@ public class Operations {
private static Operation[] operations = {
new CreateAggTableOperation()
// , new SetOperation()
, new SetOperation()
};
public static SqlType getSqlTypeFromStatements(String statement){
......
package com.dlink.trans.ddl;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.Executor;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.table.api.Table;
......@@ -35,9 +35,9 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
}
@Override
public void build(CustomTableEnvironmentImpl stEnvironment) {
public void build(Executor executor) {
AggTable aggTable = AggTable.build(statement);
Table source = stEnvironment.sqlQuery("select * from "+ aggTable.getTable());
Table source = executor.getCustomTableEnvironmentImpl().sqlQuery("select * from "+ aggTable.getTable());
List<String> wheres = aggTable.getWheres();
if(wheres!=null&&wheres.size()>0) {
for (String s : wheres) {
......@@ -47,6 +47,6 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
Table sink = source.groupBy(aggTable.getGroupBy())
.flatAggregate(aggTable.getAggBy())
.select(aggTable.getColumns());
stEnvironment.registerTable(aggTable.getName(), sink);
executor.getCustomTableEnvironmentImpl().registerTable(aggTable.getName(), sink);
}
}
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.executor.Executor;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
......@@ -18,7 +18,6 @@ import java.util.Map;
* @author wenmo
* @since 2021/10/21 19:56
**/
@Deprecated
public class SetOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "SET";
......@@ -41,12 +40,22 @@ public class SetOperation extends AbstractOperation implements Operation {
}
@Override
public void build(CustomTableEnvironmentImpl stEnvironment) {
public void build(Executor executor) {
try {
if(null != Class.forName("org.apache.log4j.Logger")){
executor.parseAndLoadConfiguration(statement);
return;
}
} catch (ClassNotFoundException e) {
}
Map<String,List<String>> map = SingleSqlParserFactory.generateParser(statement);
if(Asserts.isNotNullMap(map)&&map.size()==2) {
Map<String, String> confMap = new HashMap<>();
confMap.put(StringUtils.join(map.get("SET"), "."), StringUtils.join(map.get("="), ","));
stEnvironment.getConfig().addConfiguration(Configuration.fromMap(confMap));
executor.getCustomTableEnvironmentImpl().getConfig().addConfiguration(Configuration.fromMap(confMap));
Configuration configuration = Configuration.fromMap(confMap);
executor.getEnvironment().getConfig().configure(configuration,null);
executor.getCustomTableEnvironmentImpl().getConfig().addConfiguration(configuration);
}
}
}
......@@ -13,7 +13,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.3</flink.version>
<flink.version>1.13.5</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<mysql-connector-java.version>8.0.22</mysql-connector-java.version>
<ojdbc8.version>12.2.0.1</ojdbc8.version>
......
......@@ -41,7 +41,7 @@
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.14</artifactId>
<artifactId>dlink-client-1.13</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
......
......@@ -29,7 +29,7 @@ export default {
'pages.welcome.link': '欢迎加入',
'pages.welcome.star': '欢迎 Star ',
'pages.welcome.advancedLayout': 'Github',
'pages.welcome.alertMessage': '实时计算平台 Dlink & Apache Flink 即将发布,目前为体验版,版本号为 0.4.0。',
'pages.welcome.alertMessage': '实时计算平台 Dlink & Apache Flink 即将发布,目前为体验版,版本号为 0.5.0-SNAPSHOT。',
'pages.admin.subPage.title': ' 这个页面只有 admin 权限才能查看',
'pages.admin.subPage.alertMessage': 'umi ui 现已发布,欢迎使用 npm run ui 启动体验。',
'pages.searchTable.createForm.newRule': '新建规则',
......
......@@ -20,7 +20,7 @@ export default (): React.ReactNode => {
<Alert
message={intl.formatMessage({
id: 'pages.welcome.alertMessage',
defaultMessage: '实时计算平台 Dlink & Apache Flink 即将发布,目前为体验版,版本号为 0.4.0。',
defaultMessage: '实时计算平台 Dlink & Apache Flink 即将发布,目前为体验版,版本号为 0.5.0-SNAPSHOT。',
})}
type="success"
showIcon
......@@ -523,6 +523,12 @@ export default (): React.ReactNode => {
<li>
<Link>新增 集群与数据源的 Studio 管理交互</Link>
</li>
<li>
<Link>修复 set 语法在1.11和1.12的兼容问题</Link>
</li>
<li>
<Link>升级各版本 Flink 依赖至最新版本以解决核弹问题</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment