Commit db2a929c authored by wenmo's avatar wenmo

Flink主版本改为1.13.3

parent 67918f28
......@@ -323,6 +323,8 @@ AGG BY TOP2(value) as (value,rank);
5.[Dlink-0.3.2更新说明](https://github.com/DataLinkDC/dlink/blob/main/dlink-doc/doc/Dlink-0.3.2%E6%9B%B4%E6%96%B0%E8%AF%B4%E6%98%8E.md)
6.[Dlink 读写 Hive 的实践](https://github.com/DataLinkDC/dlink/blob/dev/dlink-doc/doc/Dlink%E8%AF%BB%E5%86%99Hive%E7%9A%84%E5%AE%9E%E8%B7%B5.md)
#### 常见问题及解决
(=。=)~ 敬请期待。
......
......@@ -114,35 +114,11 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-base</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
<scope>provided</scope>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>1.12.5</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<!--<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-mysql</artifactId>
<scope>provided</scope>
</dependency>-->
<!--<dependency>
<groupId>com.alibaba.ververica</groupId>
&lt;!&ndash; add the dependency matching your database &ndash;&gt;
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>-->
</dependencies>
<build>
<plugins>
......
......@@ -46,10 +46,10 @@
<!-- 将模块dlink-client的jar文件放到打包目录/lib下 -->
<fileSet>
<directory>${project.parent.basedir}/dlink-client/dlink-client-1.12/target</directory>
<directory>${project.parent.basedir}/dlink-client/dlink-client-1.13/target</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>dlink-client-1.12-${project.version}.jar</include>
<include>dlink-client-1.13-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
......@@ -60,10 +60,10 @@
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-client/dlink-client-1.13/target</directory>
<directory>${project.parent.basedir}/dlink-client/dlink-client-1.12/target</directory>
<outputDirectory>extends</outputDirectory>
<includes>
<include>dlink-client-1.13-${project.version}.jar</include>
<include>dlink-client-1.12-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
......@@ -76,10 +76,10 @@
<!-- 将模块dlink-connectors的jar文件放到打包目录/lib下 -->
<fileSet>
<directory>${project.parent.basedir}/dlink-connectors/dlink-connector-jdbc-1.12/target</directory>
<directory>${project.parent.basedir}/dlink-connectors/dlink-connector-jdbc-1.13/target</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>dlink-connector-jdbc-1.12-${project.version}.jar</include>
<include>dlink-connector-jdbc-1.13-${project.version}.jar</include>
</includes>
</fileSet>
......
......@@ -6,6 +6,7 @@ import org.apache.flink.api.dag.Transformation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
......@@ -156,6 +157,39 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be add inserts.");
}
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation)operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
return streamGraph.getJobGraph();
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
......
......@@ -176,7 +176,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = (Operation)operations.get(0);
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation)operation);
} else {
......@@ -312,8 +312,4 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public Parser getParser(){
return super.parser;
}
}
......@@ -6,6 +6,7 @@ import org.apache.flink.api.dag.Transformation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
......@@ -151,6 +152,39 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be add inserts.");
}
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation)operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
return streamGraph.getJobGraph();
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
......
......@@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
......@@ -212,6 +213,39 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be add inserts.");
}
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation)operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof DefaultExecutor){
StreamGraph streamGraph = ((DefaultExecutor) execEnv).getExecutionEnvironment().generateStreamGraph(trans);
return streamGraph.getJobGraph();
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
......
......@@ -42,12 +42,12 @@
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<artifactId>dlink-client-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.12</artifactId>
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
......
......@@ -41,12 +41,12 @@
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<artifactId>dlink-client-1.13</artifactId>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.12</artifactId>
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
......
......@@ -13,7 +13,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.5</flink.version>
<flink.version>1.13.3</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<mysql-connector-java.version>8.0.22</mysql-connector-java.version>
<ojdbc8.version>12.2.0.1</ojdbc8.version>
......
......@@ -14,7 +14,7 @@
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<artifactId>dlink-client-1.13</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
......
......@@ -344,7 +344,7 @@ export default (): React.ReactNode => {
</ul>
</Paragraph>
</Timeline.Item>
<Timeline.Item><Text code>0.3.3</Text> <Text type="secondary">2021-11-?</Text>
<Timeline.Item><Text code>0.4.0</Text> <Text type="secondary">2021-11-?</Text>
<p> </p>
<Paragraph>
<ul>
......@@ -357,6 +357,12 @@ export default (): React.ReactNode => {
<li>
<Link>修复 Flink 1.14.0 远程提交无法正确提交任务至集群的问题</Link>
</li>
<li>
<Link>新增 yarn-perjob 与 yarn-application 的任务提交方式</Link>
</li>
<li>
<Link>更新 dlink 的 flink 主版本号为 1.13.3</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment