Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
596167e2
Unverified
Commit
596167e2
authored
May 08, 2022
by
aiwenmo
Committed by
GitHub
May 08, 2022
Browse files
Options
Browse Files
Download
Plain Diff
[Feature-275][client] Add Flink client 1.15
[Feature-275][client] Add Flink client 1.15
parents
1d46cbb4
d2d8a92b
Changes
29
Hide whitespace changes
Inline
Side-by-side
Showing
29 changed files
with
2364 additions
and
2 deletions
+2364
-2
README.en-US.md
README.en-US.md
+1
-0
README.md
README.md
+1
-0
README.zh-CN.md
README.zh-CN.md
+1
-0
pom.xml
dlink-app/dlink-app-1.15/pom.xml
+104
-0
MainApp.java
...p/dlink-app-1.15/src/main/java/com/dlink/app/MainApp.java
+27
-0
pom.xml
dlink-app/pom.xml
+1
-0
package.xml
dlink-assembly/src/main/assembly/package.xml
+14
-1
pom.xml
dlink-client/dlink-client-1.15/pom.xml
+38
-0
AbstractCDCBuilder.java
...-1.15/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+69
-0
AbstractSinkBuilder.java
...1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
+309
-0
CDCBuilder.java
...k-client-1.15/src/main/java/com/dlink/cdc/CDCBuilder.java
+32
-0
CDCBuilderFactory.java
...t-1.15/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
+33
-0
SinkBuilder.java
...-client-1.15/src/main/java/com/dlink/cdc/SinkBuilder.java
+22
-0
SinkBuilderFactory.java
...-1.15/src/main/java/com/dlink/cdc/SinkBuilderFactory.java
+35
-0
DorisSinkBuilder.java
...5/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
+89
-0
KafkaSinkBuilder.java
...5/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
+125
-0
MysqlCDCBuilder.java
...15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+183
-0
OracleCDCBuilder.java
.../src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
+113
-0
SQLSinkBuilder.java
...-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+246
-0
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+320
-0
CustomTableResultImpl.java
...c/main/java/com/dlink/executor/CustomTableResultImpl.java
+263
-0
StaticResultProvider.java
...rc/main/java/com/dlink/executor/StaticResultProvider.java
+122
-0
TableSchemaField.java
...15/src/main/java/com/dlink/executor/TableSchemaField.java
+36
-0
FlinkUtil.java
...-client-1.15/src/main/java/com/dlink/utils/FlinkUtil.java
+35
-0
pom.xml
dlink-client/pom.xml
+1
-0
pom.xml
dlink-flink/dlink-flink-1.15/pom.xml
+131
-0
pom.xml
dlink-flink/pom.xml
+1
-0
feature.md
docs/zh-CN/feature.md
+2
-1
pom.xml
pom.xml
+10
-0
No files found.
README.en-US.md
View file @
596167e2
...
...
@@ -26,6 +26,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
-
支持 Apache Flink 所有的 Connector、UDF、CDC等
-
支持 FlinkSQL 语法增强:兼容 Apache Flink SQL、表值聚合函数、全局变量、CDC多源合并、执行环境、语句合并、共享会话等
-
支持易扩展的 SQL 作业提交方式:ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、SqlServer 等
-
支持 FlinkCDC (Source 合并)整库实时入仓入湖
-
支持实时调试预览 Table 和 ChangeLog 数据及图形展示
-
支持语法逻辑检查、作业执行计划、字段级血缘分析等
-
支持 Flink 元数据、数据源元数据查询及管理
...
...
README.md
View file @
596167e2
...
...
@@ -26,6 +26,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
-
支持 Apache Flink 所有的 Connector、UDF、CDC等
-
支持 FlinkSQL 语法增强:兼容 Apache Flink SQL、表值聚合函数、全局变量、CDC多源合并、执行环境、语句合并、共享会话等
-
支持易扩展的 SQL 作业提交方式:ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、SqlServer 等
-
支持 FlinkCDC (Source 合并)整库实时入仓入湖
-
支持实时调试预览 Table 和 ChangeLog 数据及图形展示
-
支持语法逻辑检查、作业执行计划、字段级血缘分析等
-
支持 Flink 元数据、数据源元数据查询及管理
...
...
README.zh-CN.md
View file @
596167e2
...
...
@@ -26,6 +26,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
-
支持 Apache Flink 所有的 Connector、UDF、CDC等
-
支持 FlinkSQL 语法增强:兼容 Apache Flink SQL、表值聚合函数、全局变量、CDC多源合并、执行环境、语句合并、共享会话等
-
支持易扩展的 SQL 作业提交方式:ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、SqlServer 等
-
支持 FlinkCDC (Source 合并)整库实时入仓入湖
-
支持实时调试预览 Table 和 ChangeLog 数据及图形展示
-
支持语法逻辑检查、作业执行计划、字段级血缘分析等
-
支持 Flink 元数据、数据源元数据查询及管理
...
...
dlink-app/dlink-app-1.15/pom.xml
0 → 100644
View file @
596167e2
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink-app
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.6.2
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-app-1.15
</artifactId>
<properties>
<mainClass>
com.dlink.app.MainApp
</mainClass>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-app-base
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
<!-- <scope>provided</scope>-->
<version>
8.0.21
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.15
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-flink-1.15
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-executor
</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/resources
</directory>
<includes>
<include>
*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.6.0
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
UTF-8
</encoding>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-assembly-plugin
</artifactId>
<version>
2.6
</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>
com.dlink.app.MainApp
</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>
make-assembly
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
dlink-app/dlink-app-1.15/src/main/java/com/dlink/app/MainApp.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
app
;
import
java.io.IOException
;
import
java.util.Map
;
import
com.dlink.app.db.DBConfig
;
import
com.dlink.app.flinksql.Submiter
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.utils.FlinkBaseUtil
;
/**
* MainApp
*
* @author wenmo
* @since 2021/10/27
**/
public
class
MainApp
{
public
static
void
main
(
String
[]
args
)
throws
IOException
{
Map
<
String
,
String
>
params
=
FlinkBaseUtil
.
getParamsFromArgs
(
args
);
String
id
=
params
.
get
(
FlinkParamConstant
.
ID
);
Asserts
.
checkNullString
(
id
,
"请配置入参 id "
);
DBConfig
dbConfig
=
DBConfig
.
build
(
params
);
Submiter
.
submit
(
Integer
.
valueOf
(
id
),
dbConfig
);
}
}
dlink-app/pom.xml
View file @
596167e2
...
...
@@ -19,6 +19,7 @@
<module>
dlink-app-1.14
</module>
<module>
dlink-app-1.12
</module>
<module>
dlink-app-1.11
</module>
<module>
dlink-app-1.15
</module>
</modules>
<properties>
...
...
dlink-assembly/src/main/assembly/package.xml
View file @
596167e2
...
...
@@ -73,7 +73,13 @@
<include>
dlink-client-1.14-${project.version}.jar
</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/dlink-client/dlink-client-1.15/target
</directory>
<outputDirectory>
extends
</outputDirectory>
<includes>
<include>
dlink-client-1.15-${project.version}.jar
</include>
</includes>
</fileSet>
<!-- 将模块dlink-connectors的jar文件放到打包目录/lib下 -->
<fileSet>
<directory>
${project.parent.basedir}/dlink-connectors/dlink-connector-jdbc-1.13/target
</directory>
...
...
@@ -246,6 +252,13 @@
<include>
dlink-app-1.14-${project.version}-jar-with-dependencies.jar
</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/dlink-app/dlink-app-1.15/target
</directory>
<outputDirectory>
jar
</outputDirectory>
<includes>
<include>
dlink-app-1.15-${project.version}-jar-with-dependencies.jar
</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/dlink-doc/extends
</directory>
<outputDirectory>
jar
</outputDirectory>
...
...
dlink-client/dlink-client-1.15/pom.xml
0 → 100644
View file @
596167e2
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink-client
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.6.2
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-client-1.15
</artifactId>
<properties>
<java.version>
1.8
</java.version>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-common
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-flink-1.15
</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
</dependencies>
</project>
\ No newline at end of file
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.List
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public
abstract
class
AbstractCDCBuilder
{
protected
FlinkCDCConfig
config
;
public
AbstractCDCBuilder
()
{
}
public
AbstractCDCBuilder
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
public
FlinkCDCConfig
getConfig
()
{
return
config
;
}
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
public
List
<
String
>
getSchemaList
()
{
List
<
String
>
schemaList
=
new
ArrayList
<>();
String
schema
=
config
.
getSchema
();
if
(
Asserts
.
isNotNullString
(
schema
))
{
String
[]
schemas
=
schema
.
split
(
FlinkParamConstant
.
SPLIT
);
Collections
.
addAll
(
schemaList
,
schemas
);
}
List
<
String
>
tableList
=
getTableList
();
for
(
String
tableName
:
tableList
)
{
if
(
Asserts
.
isNotNullString
(
tableName
)
&&
tableName
.
contains
(
"."
))
{
String
[]
names
=
tableName
.
split
(
"\\\\."
);
if
(!
schemaList
.
contains
(
names
[
0
]))
{
schemaList
.
add
(
names
[
0
]);
}
}
}
return
schemaList
;
}
public
List
<
String
>
getTableList
()
{
List
<
String
>
tableList
=
new
ArrayList
<>();
String
table
=
config
.
getTable
();
if
(
Asserts
.
isNullString
(
table
))
{
return
tableList
;
}
String
[]
tables
=
table
.
split
(
FlinkParamConstant
.
SPLIT
);
Collections
.
addAll
(
tableList
,
tables
);
return
tableList
;
}
public
String
getSchemaFieldName
()
{
return
"schema"
;
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.BooleanType
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DoubleType
;
import
org.apache.flink.table.types.logical.FloatType
;
import
org.apache.flink.table.types.logical.IntType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.SmallIntType
;
import
org.apache.flink.table.types.logical.TimestampType
;
import
org.apache.flink.table.types.logical.TinyIntType
;
import
org.apache.flink.table.types.logical.VarCharType
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.math.BigDecimal
;
import
java.sql.Timestamp
;
import
java.time.Instant
;
import
java.time.ZoneId
;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public
abstract
class
AbstractSinkBuilder
{
protected
FlinkCDCConfig
config
;
protected
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
public
AbstractSinkBuilder
()
{
}
public
AbstractSinkBuilder
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
public
FlinkCDCConfig
getConfig
()
{
return
config
;
}
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
protected
Properties
getProperties
()
{
Properties
properties
=
new
Properties
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
sink
.
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
return
properties
;
}
protected
SingleOutputStreamOperator
<
Map
>
deserialize
(
DataStreamSource
<
String
>
dataStreamSource
)
{
return
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
}
protected
SingleOutputStreamOperator
<
Map
>
shunt
(
SingleOutputStreamOperator
<
Map
>
mapOperator
,
Table
table
,
String
schemaFieldName
)
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
return
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
}
protected
DataStream
<
RowData
>
buildRowData
(
SingleOutputStreamOperator
<
Map
>
filterOperator
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
return
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
RowData
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"r"
:
case
"c"
:
GenericRowData
igenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
igenericRowData
.
setRowKind
(
RowKind
.
INSERT
);
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
igenericRowData
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
igenericRowData
);
break
;
case
"d"
:
GenericRowData
dgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
dgenericRowData
.
setRowKind
(
RowKind
.
DELETE
);
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
dgenericRowData
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
dgenericRowData
);
break
;
case
"u"
:
GenericRowData
ubgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
ubgenericRowData
.
setRowKind
(
RowKind
.
UPDATE_BEFORE
);
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubgenericRowData
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubgenericRowData
);
GenericRowData
uagenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
uagenericRowData
.
setRowKind
(
RowKind
.
UPDATE_AFTER
);
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uagenericRowData
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uagenericRowData
);
break
;
}
}
});
}
public
abstract
void
addSink
(
StreamExecutionEnvironment
env
,
DataStream
<
RowData
>
rowDataDataStream
,
Table
table
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
);
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
deserialize
(
dataStreamSource
);
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
SingleOutputStreamOperator
<
Map
>
filterOperator
=
shunt
(
mapOperator
,
table
,
schemaFieldName
);
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
DataStream
<
RowData
>
rowDataDataStream
=
buildRowData
(
filterOperator
,
columnNameList
,
columnTypeList
);
addSink
(
env
,
rowDataDataStream
,
table
,
columnNameList
,
columnTypeList
);
}
}
}
return
dataStreamSource
;
}
protected
void
buildColumn
(
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
,
List
<
Column
>
columns
)
{
for
(
Column
column
:
columns
)
{
columnNameList
.
add
(
column
.
getName
());
columnTypeList
.
add
(
getLogicalType
(
column
.
getJavaType
()));
}
}
public
LogicalType
getLogicalType
(
ColumnType
columnType
)
{
switch
(
columnType
)
{
case
STRING:
return
new
VarCharType
();
case
BOOLEAN:
case
JAVA_LANG_BOOLEAN:
return
new
BooleanType
();
case
BYTE:
case
JAVA_LANG_BYTE:
return
new
TinyIntType
();
case
SHORT:
case
JAVA_LANG_SHORT:
return
new
SmallIntType
();
case
LONG:
case
JAVA_LANG_LONG:
return
new
BigIntType
();
case
FLOAT:
case
JAVA_LANG_FLOAT:
return
new
FloatType
();
case
DOUBLE:
case
JAVA_LANG_DOUBLE:
return
new
DoubleType
();
case
DECIMAL:
return
new
DecimalType
(
columnType
.
getPrecision
(),
columnType
.
getScale
());
case
INT:
case
INTEGER:
return
new
IntType
();
case
DATE:
case
LOCALDATE:
return
new
DateType
();
case
LOCALDATETIME:
case
TIMESTAMP:
return
new
TimestampType
();
default
:
return
new
VarCharType
();
}
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
value
==
null
)
{
return
null
;
}
if
(
logicalType
instanceof
VarCharType
)
{
return
StringData
.
fromString
((
String
)
value
);
}
else
if
(
logicalType
instanceof
DateType
)
{
return
StringData
.
fromString
(
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
().
toString
());
}
else
if
(
logicalType
instanceof
TimestampType
)
{
return
TimestampData
.
fromTimestamp
(
Timestamp
.
from
(
Instant
.
ofEpochMilli
((
long
)
value
)));
}
else
if
(
logicalType
instanceof
DecimalType
)
{
final
DecimalType
decimalType
=
((
DecimalType
)
logicalType
);
final
int
precision
=
decimalType
.
getPrecision
();
final
int
scale
=
decimalType
.
getScale
();
return
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
String
)
value
),
precision
,
scale
);
}
else
{
return
value
;
}
}
protected
String
getSinkSchemaName
(
Table
table
)
{
String
schemaName
=
table
.
getSchema
();
if
(
config
.
getSink
().
containsKey
(
"sink.db"
))
{
schemaName
=
config
.
getSink
().
get
(
"sink.db"
);
}
return
schemaName
;
}
protected
String
getSinkTableName
(
Table
table
)
{
String
tableName
=
table
.
getName
();
if
(
config
.
getSink
().
containsKey
(
"table.prefix.schema"
))
{
if
(
Boolean
.
valueOf
(
config
.
getSink
().
get
(
"table.prefix.schema"
)))
{
tableName
=
table
.
getSchema
()
+
"_"
+
tableName
;
}
}
if
(
config
.
getSink
().
containsKey
(
"table.prefix"
))
{
tableName
=
config
.
getSink
().
get
(
"table.prefix"
)
+
tableName
;
}
if
(
config
.
getSink
().
containsKey
(
"table.suffix"
))
{
tableName
=
tableName
+
config
.
getSink
().
get
(
"table.suffix"
);
}
if
(
config
.
getSink
().
containsKey
(
"table.lower"
))
{
if
(
Boolean
.
valueOf
(
config
.
getSink
().
get
(
"table.lower"
)))
{
tableName
=
tableName
.
toLowerCase
();
}
}
if
(
config
.
getSink
().
containsKey
(
"table.upper"
))
{
if
(
Boolean
.
valueOf
(
config
.
getSink
().
get
(
"table.upper"
)))
{
tableName
=
tableName
.
toUpperCase
();
}
}
return
tableName
;
}
protected
List
<
String
>
getPKList
(
Table
table
){
List
<
String
>
pks
=
new
ArrayList
<>();
if
(
Asserts
.
isNullCollection
(
table
.
getColumns
())){
return
pks
;
}
for
(
Column
column:
table
.
getColumns
()){
if
(
column
.
isKeyFlag
()){
pks
.
add
(
column
.
getName
());
}
}
return
pks
;
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/CDCBuilder.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
java.util.List
;
import
java.util.Map
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public
interface
CDCBuilder
{
String
getHandle
();
CDCBuilder
create
(
FlinkCDCConfig
config
);
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
);
List
<
String
>
getSchemaList
();
List
<
String
>
getTableList
();
Map
<
String
,
Map
<
String
,
String
>>
parseMetaDataConfigs
();
String
getSchemaFieldName
();
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.mysql.MysqlCDCBuilder
;
import
com.dlink.cdc.oracle.OracleCDCBuilder
;
import
com.dlink.exception.FlinkClientException
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public
class
CDCBuilderFactory
{
private
static
CDCBuilder
[]
cdcBuilders
=
{
new
MysqlCDCBuilder
(),
new
OracleCDCBuilder
()
};
public
static
CDCBuilder
buildCDCBuilder
(
FlinkCDCConfig
config
)
{
if
(
Asserts
.
isNull
(
config
)
||
Asserts
.
isNullString
(
config
.
getType
()))
{
throw
new
FlinkClientException
(
"请指定 CDC Source 类型。"
);
}
for
(
int
i
=
0
;
i
<
cdcBuilders
.
length
;
i
++)
{
if
(
config
.
getType
().
equals
(
cdcBuilders
[
i
].
getHandle
()))
{
return
cdcBuilders
[
i
].
create
(
config
);
}
}
throw
new
FlinkClientException
(
"未匹配到对应 CDC Source 类型的【"
+
config
.
getType
()
+
"】。"
);
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/SinkBuilder.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* SinkBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public
interface
SinkBuilder
{
String
getHandle
();
SinkBuilder
create
(
FlinkCDCConfig
config
);
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
);
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/SinkBuilderFactory.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.doris.DorisSinkBuilder
;
import
com.dlink.cdc.kafka.KafkaSinkBuilder
;
import
com.dlink.cdc.sql.SQLSinkBuilder
;
import
com.dlink.exception.FlinkClientException
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* SinkBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public
class
SinkBuilderFactory
{
private
static
SinkBuilder
[]
sinkBuilders
=
{
new
KafkaSinkBuilder
(),
new
DorisSinkBuilder
(),
new
SQLSinkBuilder
()
};
public
static
SinkBuilder
buildSinkBuilder
(
FlinkCDCConfig
config
)
{
if
(
Asserts
.
isNull
(
config
)
||
Asserts
.
isNullString
(
config
.
getSink
().
get
(
"connector"
)))
{
throw
new
FlinkClientException
(
"请指定 Sink connector。"
);
}
for
(
int
i
=
0
;
i
<
sinkBuilders
.
length
;
i
++)
{
if
(
config
.
getSink
().
get
(
"connector"
).
equals
(
sinkBuilders
[
i
].
getHandle
()))
{
return
sinkBuilders
[
i
].
create
(
config
);
}
}
return
new
SQLSinkBuilder
().
create
(
config
);
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
.
doris
;
import
org.apache.doris.flink.cfg.DorisExecutionOptions
;
import
org.apache.doris.flink.cfg.DorisOptions
;
import
org.apache.doris.flink.cfg.DorisReadOptions
;
import
org.apache.doris.flink.cfg.DorisSink
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
java.io.Serializable
;
import
java.util.List
;
import
java.util.Map
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Table
;
/**
* DorisSinkBuilder
*
* @author wenmo
* @since 2022/4/20 19:20
**/
public
class
DorisSinkBuilder
extends
AbstractSinkBuilder
implements
SinkBuilder
,
Serializable
{
private
final
static
String
KEY_WORD
=
"datastream-doris"
;
private
static
final
long
serialVersionUID
=
8330362249137471854L
;
public
DorisSinkBuilder
()
{
}
public
DorisSinkBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
SinkBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
DorisSinkBuilder
(
config
);
}
@Override
public
void
addSink
(
StreamExecutionEnvironment
env
,
DataStream
<
RowData
>
rowDataDataStream
,
Table
table
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
DorisExecutionOptions
.
Builder
dorisExecutionOptionsBuilder
=
DorisExecutionOptions
.
builder
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
if
(
sink
.
containsKey
(
"sink.batch.size"
))
{
dorisExecutionOptionsBuilder
.
setBatchSize
(
Integer
.
valueOf
(
sink
.
get
(
"sink.batch.size"
)));
}
if
(
sink
.
containsKey
(
"sink.batch.interval"
))
{
dorisExecutionOptionsBuilder
.
setBatchIntervalMs
(
Long
.
valueOf
(
sink
.
get
(
"sink.batch.interval"
)));
}
if
(
sink
.
containsKey
(
"sink.max-retries"
))
{
dorisExecutionOptionsBuilder
.
setMaxRetries
(
Integer
.
valueOf
(
sink
.
get
(
"sink.max-retries"
)));
}
if
(
sink
.
containsKey
(
"sink.enable-delete"
))
{
dorisExecutionOptionsBuilder
.
setEnableDelete
(
Boolean
.
valueOf
(
sink
.
get
(
"sink.enable-delete"
)));
}
dorisExecutionOptionsBuilder
.
setStreamLoadProp
(
getProperties
());
final
String
[]
columnNames
=
columnNameList
.
toArray
(
new
String
[
columnNameList
.
size
()]);
final
LogicalType
[]
columnTypes
=
columnTypeList
.
toArray
(
new
LogicalType
[
columnTypeList
.
size
()]);
rowDataDataStream
.
addSink
(
DorisSink
.
sink
(
columnNames
,
columnTypes
,
DorisReadOptions
.
builder
().
build
(),
dorisExecutionOptionsBuilder
.
build
(),
DorisOptions
.
builder
()
.
setFenodes
(
config
.
getSink
().
get
(
"fenodes"
))
.
setTableIdentifier
(
getSinkSchemaName
(
table
)
+
"."
+
getSinkTableName
(
table
))
.
setUsername
(
config
.
getSink
().
get
(
"username"
))
.
setPassword
(
config
.
getSink
().
get
(
"password"
)).
build
()
));
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
.
kafka
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
;
import
org.apache.flink.connector.kafka.sink.KafkaSink
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public
class
KafkaSinkBuilder
extends
AbstractSinkBuilder
implements
SinkBuilder
{
private
final
static
String
KEY_WORD
=
"datastream-kafka"
;
public
KafkaSinkBuilder
()
{
}
public
KafkaSinkBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
void
addSink
(
StreamExecutionEnvironment
env
,
DataStream
<
RowData
>
rowDataDataStream
,
Table
table
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
SinkBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
KafkaSinkBuilder
(
config
);
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
if
(
Asserts
.
isNotNullString
(
config
.
getSink
().
get
(
"topic"
)))
{
dataStreamSource
.
sinkTo
(
KafkaSink
.<
String
>
builder
()
.
setBootstrapServers
(
config
.
getSink
().
get
(
"brokers"
))
.
setRecordSerializer
(
KafkaRecordSerializationSchema
.
builder
()
.
setTopic
(
config
.
getSink
().
get
(
"topic"
))
.
setValueSerializationSchema
(
new
SimpleStringSchema
())
.
build
()
)
.
build
());
}
else
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
SingleOutputStreamOperator
<
Map
>
filterOperator
=
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
SingleOutputStreamOperator
<
String
>
stringOperator
=
filterOperator
.
map
(
new
MapFunction
<
Map
,
String
>()
{
@Override
public
String
map
(
Map
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
writeValueAsString
(
value
);
}
});
stringOperator
.
sinkTo
(
KafkaSink
.<
String
>
builder
()
.
setBootstrapServers
(
config
.
getSink
().
get
(
"brokers"
))
.
setRecordSerializer
(
KafkaRecordSerializationSchema
.
builder
()
.
setTopic
(
getSinkTableName
(
table
))
.
setValueSerializationSchema
(
new
SimpleStringSchema
())
.
build
()
)
.
build
());
}
}
}
}
return
dataStreamSource
;
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
.
mysql
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
java.time.Duration
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.constant.ClientConstant
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSource
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder
;
import
com.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public
class
MysqlCDCBuilder
extends
AbstractCDCBuilder
implements
CDCBuilder
{
private
final
static
String
KEY_WORD
=
"mysql-cdc"
;
private
final
static
String
METADATA_TYPE
=
"MySql"
;
public
MysqlCDCBuilder
()
{
}
public
MysqlCDCBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
CDCBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
MysqlCDCBuilder
(
config
);
}
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
String
database
=
config
.
getDatabase
();
String
serverId
=
config
.
getSource
().
get
(
"server-id"
);
String
serverTimeZone
=
config
.
getSource
().
get
(
"server-time-zone"
);
String
fetchSize
=
config
.
getSource
().
get
(
"scan.snapshot.fetch.size"
);
String
connectTimeout
=
config
.
getSource
().
get
(
"connect.timeout"
);
String
connectMaxRetries
=
config
.
getSource
().
get
(
"connect.max-retries"
);
String
connectionPoolSize
=
config
.
getSource
().
get
(
"connection.pool.size"
);
String
heartbeatInterval
=
config
.
getSource
().
get
(
"heartbeat.interval"
);
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNullString
(
database
))
{
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
sourceBuilder
.
databaseList
(
databases
);
}
else
{
sourceBuilder
.
databaseList
(
new
String
[
0
]);
}
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
}
else
{
sourceBuilder
.
tableList
(
new
String
[
0
]);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
case
"initial"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"latest-offset"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
if
(
Asserts
.
isNotNullString
(
serverId
))
{
sourceBuilder
.
serverId
(
serverId
);
}
if
(
Asserts
.
isNotNullString
(
serverTimeZone
))
{
sourceBuilder
.
serverTimeZone
(
serverTimeZone
);
}
if
(
Asserts
.
isNotNullString
(
fetchSize
))
{
sourceBuilder
.
fetchSize
(
Integer
.
valueOf
(
fetchSize
));
}
if
(
Asserts
.
isNotNullString
(
connectTimeout
))
{
sourceBuilder
.
connectTimeout
(
Duration
.
ofMillis
(
Long
.
valueOf
(
connectTimeout
)));
}
if
(
Asserts
.
isNotNullString
(
connectMaxRetries
))
{
sourceBuilder
.
connectMaxRetries
(
Integer
.
valueOf
(
connectMaxRetries
));
}
if
(
Asserts
.
isNotNullString
(
connectionPoolSize
))
{
sourceBuilder
.
connectionPoolSize
(
Integer
.
valueOf
(
connectionPoolSize
));
}
if
(
Asserts
.
isNotNullString
(
heartbeatInterval
))
{
sourceBuilder
.
heartbeatInterval
(
Duration
.
ofMillis
(
Long
.
valueOf
(
heartbeatInterval
)));
}
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
}
public
List
<
String
>
getSchemaList
()
{
List
<
String
>
schemaList
=
new
ArrayList
<>();
String
schema
=
config
.
getDatabase
();
if
(
Asserts
.
isNotNullString
(
schema
))
{
String
[]
schemas
=
schema
.
split
(
FlinkParamConstant
.
SPLIT
);
Collections
.
addAll
(
schemaList
,
schemas
);
}
List
<
String
>
tableList
=
getTableList
();
for
(
String
tableName
:
tableList
)
{
if
(
Asserts
.
isNotNullString
(
tableName
)
&&
tableName
.
contains
(
"."
))
{
String
[]
names
=
tableName
.
split
(
"\\\\."
);
if
(!
schemaList
.
contains
(
names
[
0
]))
{
schemaList
.
add
(
names
[
0
]);
}
}
}
return
schemaList
;
}
public
Map
<
String
,
Map
<
String
,
String
>>
parseMetaDataConfigs
()
{
Map
<
String
,
Map
<
String
,
String
>>
allConfigMap
=
new
HashMap
<>();
List
<
String
>
schemaList
=
getSchemaList
();
for
(
String
schema
:
schemaList
)
{
Map
<
String
,
String
>
configMap
=
new
HashMap
<>();
configMap
.
put
(
ClientConstant
.
METADATA_TYPE
,
METADATA_TYPE
);
StringBuilder
sb
=
new
StringBuilder
(
"jdbc:mysql://"
);
sb
.
append
(
config
.
getHostname
());
sb
.
append
(
":"
);
sb
.
append
(
config
.
getPort
());
sb
.
append
(
"/"
);
sb
.
append
(
schema
);
configMap
.
put
(
ClientConstant
.
METADATA_NAME
,
sb
.
toString
());
configMap
.
put
(
ClientConstant
.
METADATA_URL
,
sb
.
toString
());
configMap
.
put
(
ClientConstant
.
METADATA_USERNAME
,
config
.
getUsername
());
configMap
.
put
(
ClientConstant
.
METADATA_PASSWORD
,
config
.
getPassword
());
allConfigMap
.
put
(
schema
,
configMap
);
}
return
allConfigMap
;
}
@Override
public
String
getSchemaFieldName
()
{
return
"db"
;
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
.
oracle
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.constant.ClientConstant
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.oracle.OracleSource
;
import
com.ververica.cdc.connectors.oracle.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public
class
OracleCDCBuilder
extends
AbstractCDCBuilder
implements
CDCBuilder
{
private
final
static
String
KEY_WORD
=
"oracle-cdc"
;
private
final
static
String
METADATA_TYPE
=
"Oracle"
;
public
OracleCDCBuilder
()
{
}
public
OracleCDCBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
CDCBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
OracleCDCBuilder
(
config
);
}
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
OracleSource
.
Builder
<
String
>
sourceBuilder
=
OracleSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
())
.
database
(
config
.
getDatabase
());
String
schema
=
config
.
getSchema
();
if
(
Asserts
.
isNotNullString
(
schema
))
{
String
[]
schemas
=
schema
.
split
(
FlinkParamConstant
.
SPLIT
);
sourceBuilder
.
schemaList
(
schemas
);
}
else
{
sourceBuilder
.
schemaList
(
new
String
[
0
]);
}
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
}
else
{
sourceBuilder
.
tableList
(
new
String
[
0
]);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
case
"initial"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"latest-offset"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
return
env
.
addSource
(
sourceBuilder
.
build
(),
"Oracle CDC Source"
);
}
public
Map
<
String
,
Map
<
String
,
String
>>
parseMetaDataConfigs
()
{
Map
<
String
,
Map
<
String
,
String
>>
allConfigList
=
new
HashMap
<>();
List
<
String
>
schemaList
=
getSchemaList
();
for
(
String
schema
:
schemaList
)
{
Map
<
String
,
String
>
configMap
=
new
HashMap
<>();
configMap
.
put
(
ClientConstant
.
METADATA_TYPE
,
METADATA_TYPE
);
StringBuilder
sb
=
new
StringBuilder
(
"jdbc:oracle:thin:@"
);
sb
.
append
(
config
.
getHostname
());
sb
.
append
(
":"
);
sb
.
append
(
config
.
getPort
());
sb
.
append
(
":"
);
sb
.
append
(
config
.
getDatabase
());
configMap
.
put
(
ClientConstant
.
METADATA_NAME
,
sb
.
toString
());
configMap
.
put
(
ClientConstant
.
METADATA_URL
,
sb
.
toString
());
configMap
.
put
(
ClientConstant
.
METADATA_USERNAME
,
config
.
getUsername
());
configMap
.
put
(
ClientConstant
.
METADATA_PASSWORD
,
config
.
getPassword
());
allConfigList
.
put
(
schema
,
configMap
);
}
return
allConfigList
;
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
cdc
.
sql
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.api.java.typeutils.RowTypeInfo
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.TimestampType
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.io.Serializable
;
import
java.math.BigDecimal
;
import
java.time.Instant
;
import
java.time.ZoneId
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.SqlUtil
;
/**
* SQLSinkBuilder
*
* @author wenmo
* @since 2022/4/25 23:02
*/
public
class
SQLSinkBuilder
extends
AbstractSinkBuilder
implements
SinkBuilder
,
Serializable
{
private
final
static
String
KEY_WORD
=
"sql"
;
private
static
final
long
serialVersionUID
=
-
3699685106324048226L
;
public
SQLSinkBuilder
()
{
}
public
SQLSinkBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
void
addSink
(
StreamExecutionEnvironment
env
,
DataStream
<
RowData
>
rowDataDataStream
,
Table
table
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
}
protected
DataStream
<
Row
>
buildRow
(
SingleOutputStreamOperator
<
Map
>
filterOperator
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
final
String
[]
columnNames
=
columnNameList
.
toArray
(
new
String
[
columnNameList
.
size
()]);
final
LogicalType
[]
columnTypes
=
columnTypeList
.
toArray
(
new
LogicalType
[
columnTypeList
.
size
()]);
TypeInformation
<?>[]
typeInformations
=
TypeConversions
.
fromDataTypeToLegacyInfo
(
TypeConversions
.
fromLogicalToDataType
(
columnTypes
));
RowTypeInfo
rowTypeInfo
=
new
RowTypeInfo
(
typeInformations
,
columnNames
);
return
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
Row
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
Row
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"r"
:
case
"c"
:
Row
irow
=
Row
.
withPositions
(
RowKind
.
INSERT
,
columnNameList
.
size
());
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
irow
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
irow
);
break
;
case
"d"
:
Row
drow
=
Row
.
withPositions
(
RowKind
.
DELETE
,
columnNameList
.
size
());
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
drow
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
drow
);
break
;
case
"u"
:
Row
ubrow
=
Row
.
withPositions
(
RowKind
.
UPDATE_BEFORE
,
columnNameList
.
size
());
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubrow
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubrow
);
Row
uarow
=
Row
.
withPositions
(
RowKind
.
UPDATE_AFTER
,
columnNameList
.
size
());
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uarow
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uarow
);
break
;
}
}
},
rowTypeInfo
);
}
public
void
addTableSink
(
CustomTableEnvironment
customTableEnvironment
,
DataStream
<
Row
>
rowDataDataStream
,
Table
table
,
List
<
String
>
columnNameList
)
{
String
sinkTableName
=
getSinkTableName
(
table
);
customTableEnvironment
.
createTemporaryView
(
table
.
getSchemaTableNameWithUnderline
(),
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
customTableEnvironment
.
executeSql
(
getFlinkDDL
(
table
,
sinkTableName
));
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
table
.
getCDCSqlInsert
(
sinkTableName
,
table
.
getSchemaTableNameWithUnderline
()));
if
(
operations
.
size
()
>
0
)
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
}
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
SinkBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
SQLSinkBuilder
(
config
);
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
deserialize
(
dataStreamSource
);
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
SingleOutputStreamOperator
<
Map
>
filterOperator
=
shunt
(
mapOperator
,
table
,
schemaFieldName
);
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
DataStream
<
Row
>
rowDataDataStream
=
buildRow
(
filterOperator
,
columnNameList
,
columnTypeList
);
addTableSink
(
customTableEnvironment
,
rowDataDataStream
,
table
,
columnNameList
);
}
}
List
<
Transformation
<?>>
trans
=
customTableEnvironment
.
getPlanner
().
translate
(
modifyOperations
);
for
(
Transformation
<?>
item
:
trans
)
{
env
.
addOperator
(
item
);
}
}
return
dataStreamSource
;
}
public
String
getFlinkDDL
(
Table
table
,
String
tableName
)
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"CREATE TABLE IF NOT EXISTS "
);
sb
.
append
(
tableName
);
sb
.
append
(
" (\n"
);
List
<
String
>
pks
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
String
type
=
table
.
getColumns
().
get
(
i
).
getJavaType
().
getFlinkType
();
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
"`"
);
sb
.
append
(
table
.
getColumns
().
get
(
i
).
getName
());
sb
.
append
(
"` "
);
sb
.
append
(
convertSinkColumnType
(
type
));
sb
.
append
(
"\n"
);
if
(
table
.
getColumns
().
get
(
i
).
isKeyFlag
())
{
pks
.
add
(
table
.
getColumns
().
get
(
i
).
getName
());
}
}
StringBuilder
pksb
=
new
StringBuilder
(
"PRIMARY KEY ( "
);
for
(
int
i
=
0
;
i
<
pks
.
size
();
i
++)
{
if
(
i
>
0
)
{
pksb
.
append
(
","
);
}
pksb
.
append
(
"`"
);
pksb
.
append
(
pks
.
get
(
i
));
pksb
.
append
(
"`"
);
}
pksb
.
append
(
" ) NOT ENFORCED\n"
);
if
(
pks
.
size
()
>
0
)
{
sb
.
append
(
" ,"
);
sb
.
append
(
pksb
);
}
sb
.
append
(
") WITH (\n"
);
sb
.
append
(
getSinkConfigurationString
(
table
));
sb
.
append
(
")\n"
);
return
sb
.
toString
();
}
protected
String
convertSinkColumnType
(
String
type
)
{
if
(
config
.
getSink
().
get
(
"connector"
).
equals
(
"hudi"
))
{
if
(
type
.
equals
(
"TIMESTAMP"
))
{
return
"TIMESTAMP(3)"
;
}
}
return
type
;
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
value
==
null
)
{
return
null
;
}
if
(
logicalType
instanceof
DateType
)
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
else
if
(
logicalType
instanceof
TimestampType
)
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
else
if
(
logicalType
instanceof
DecimalType
)
{
return
new
BigDecimal
((
String
)
value
);
}
else
{
return
value
;
}
}
protected
String
getSinkConfigurationString
(
Table
table
)
{
String
configurationString
=
SqlUtil
.
replaceAllParam
(
config
.
getSinkConfigurationString
(),
"schemaName"
,
getSinkSchemaName
(
table
));
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"tableName"
,
getSinkTableName
(
table
));
if
(
configurationString
.
contains
(
"${pkList}"
))
{
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"pkList"
,
StringUtils
.
join
(
getPKList
(
table
),
"."
));
}
return
configurationString
;
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
executor
;
import
org.apache.flink.api.common.RuntimeExecutionMode
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.ExecutionOptions
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.JSONGenerator
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.Table
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.FunctionCatalog
;
import
org.apache.flink.table.catalog.GenericInMemoryCatalog
;
import
org.apache.flink.table.delegation.Executor
;
import
org.apache.flink.table.delegation.ExpressionParser
;
import
org.apache.flink.table.delegation.Planner
;
import
org.apache.flink.table.expressions.Expression
;
import
org.apache.flink.table.factories.PlannerFactoryUtil
;
import
org.apache.flink.table.module.ModuleManager
;
import
org.apache.flink.table.operations.DataStreamQueryOperation
;
import
org.apache.flink.table.operations.ExplainOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.QueryOperation
;
import
org.apache.flink.table.operations.command.ResetOperation
;
import
org.apache.flink.table.operations.command.SetOperation
;
import
org.apache.flink.table.typeutils.FieldInfoUtils
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Optional
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.result.SqlExplainResult
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
/**
* CustomTableEnvironmentImpl
*
* @author wenmo
* @since 2022/05/08
**/
public
class
CustomTableEnvironmentImpl
extends
AbstractStreamTableEnvironmentImpl
implements
CustomTableEnvironment
{
public
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
FunctionCatalog
functionCatalog
,
TableConfig
tableConfig
,
StreamExecutionEnvironment
executionEnvironment
,
Planner
planner
,
Executor
executor
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
,
executionEnvironment
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
)
{
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
());
}
public
static
CustomTableEnvironmentImpl
createBatch
(
StreamExecutionEnvironment
executionEnvironment
)
{
Configuration
configuration
=
new
Configuration
();
configuration
.
set
(
ExecutionOptions
.
RUNTIME_MODE
,
RuntimeExecutionMode
.
BATCH
);
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
inBatchMode
().
build
());
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
// temporary solution until FLINK-15635 is fixed
final
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
final
Executor
executor
=
lookupExecutor
(
classLoader
,
executionEnvironment
);
final
TableConfig
tableConfig
=
TableConfig
.
getDefault
();
tableConfig
.
setRootConfiguration
(
executor
.
getConfiguration
());
tableConfig
.
addConfiguration
(
settings
.
getConfiguration
());
final
ModuleManager
moduleManager
=
new
ModuleManager
();
final
CatalogManager
catalogManager
=
CatalogManager
.
newBuilder
()
.
classLoader
(
classLoader
)
.
config
(
tableConfig
)
.
defaultCatalog
(
settings
.
getBuiltInCatalogName
(),
new
GenericInMemoryCatalog
(
settings
.
getBuiltInCatalogName
(),
settings
.
getBuiltInDatabaseName
()))
.
executionConfig
(
executionEnvironment
.
getConfig
())
.
build
();
final
FunctionCatalog
functionCatalog
=
new
FunctionCatalog
(
tableConfig
,
catalogManager
,
moduleManager
);
final
Planner
planner
=
PlannerFactoryUtil
.
createPlanner
(
executor
,
tableConfig
,
moduleManager
,
catalogManager
,
functionCatalog
);
return
new
CustomTableEnvironmentImpl
(
catalogManager
,
moduleManager
,
functionCatalog
,
tableConfig
,
executionEnvironment
,
planner
,
executor
,
settings
.
isStreamingMode
(),
classLoader
);
}
public
ObjectNode
getStreamGraph
(
String
statement
)
{
List
<
Operation
>
operations
=
super
.
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
else
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
operations
.
size
();
i
++)
{
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
}
}
List
<
Transformation
<?>>
trans
=
super
.
planner
.
translate
(
modifyOperations
);
for
(
Transformation
<?>
transformation
:
trans
)
{
executionEnvironment
.
addOperator
(
transformation
);
}
StreamGraph
streamGraph
=
executionEnvironment
.
getStreamGraph
();
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
streamGraph
.
setJobName
(
tableConfig
.
getConfiguration
().
getString
(
PipelineOptions
.
NAME
));
}
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
return
objectNode
;
}
}
}
@Override
public
JobPlanInfo
getJobPlanInfo
(
List
<
String
>
statements
)
{
return
new
JobPlanInfo
(
JsonPlanGenerator
.
generatePlan
(
getJobGraphFromInserts
(
statements
)));
}
public
StreamGraph
getStreamGraphFromInserts
(
List
<
String
>
statements
)
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
for
(
String
statement
:
statements
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Only single statement is supported."
);
}
else
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
else
{
throw
new
TableException
(
"Only insert statement is supported now."
);
}
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
for
(
Transformation
<?>
transformation
:
trans
)
{
executionEnvironment
.
addOperator
(
transformation
);
}
StreamGraph
streamGraph
=
executionEnvironment
.
getStreamGraph
();
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
streamGraph
.
setJobName
(
tableConfig
.
getConfiguration
().
getString
(
PipelineOptions
.
NAME
));
}
return
streamGraph
;
}
public
JobGraph
getJobGraphFromInserts
(
List
<
String
>
statements
)
{
return
getStreamGraphFromInserts
(
statements
).
getJobGraph
();
}
public
SqlExplainResult
explainSqlRecord
(
String
statement
,
ExplainDetail
...
extraDetails
)
{
SqlExplainResult
record
=
new
SqlExplainResult
();
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
record
.
setParseTrue
(
true
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
List
<
Operation
>
operationlist
=
new
ArrayList
<>(
operations
);
for
(
int
i
=
0
;
i
<
operationlist
.
size
();
i
++)
{
Operation
operation
=
operationlist
.
get
(
i
);
if
(
operation
instanceof
ModifyOperation
)
{
record
.
setType
(
"Modify DML"
);
}
else
if
(
operation
instanceof
ExplainOperation
)
{
record
.
setType
(
"Explain DML"
);
}
else
if
(
operation
instanceof
QueryOperation
)
{
record
.
setType
(
"Query DML"
);
}
else
{
record
.
setExplain
(
operation
.
asSummaryString
());
operationlist
.
remove
(
i
);
record
.
setType
(
"DDL"
);
i
=
i
-
1
;
}
}
record
.
setExplainTrue
(
true
);
if
(
operationlist
.
size
()
==
0
)
{
//record.setExplain("DDL语句不进行解释。");
return
record
;
}
record
.
setExplain
(
planner
.
explain
(
operationlist
,
extraDetails
));
return
record
;
}
public
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
for
(
Operation
operation
:
operations
)
{
if
(
operation
instanceof
SetOperation
)
{
callSet
((
SetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
else
if
(
operation
instanceof
ResetOperation
)
{
callReset
((
ResetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
}
return
false
;
}
private
void
callSet
(
SetOperation
setOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
setOperation
.
getKey
().
isPresent
()
&&
setOperation
.
getValue
().
isPresent
())
{
String
key
=
setOperation
.
getKey
().
get
().
trim
();
String
value
=
setOperation
.
getValue
().
get
().
trim
();
if
(
Asserts
.
isNullString
(
key
)
||
Asserts
.
isNullString
(
value
))
{
return
;
}
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
value
);
setMap
.
put
(
key
,
value
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
}
private
void
callReset
(
ResetOperation
resetOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
resetOperation
.
getKey
().
isPresent
())
{
String
key
=
resetOperation
.
getKey
().
get
().
trim
();
if
(
Asserts
.
isNullString
(
key
))
{
return
;
}
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
null
);
setMap
.
remove
(
key
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
else
{
setMap
.
clear
();
}
}
public
<
T
>
Table
fromDataStream
(
DataStream
<
T
>
dataStream
,
Expression
...
fields
)
{
return
createTable
(
asQueryOperation
(
dataStream
,
Optional
.
of
(
Arrays
.
asList
(
fields
))));
}
public
<
T
>
Table
fromDataStream
(
DataStream
<
T
>
dataStream
,
String
fields
)
{
List
<
Expression
>
expressions
=
ExpressionParser
.
INSTANCE
.
parseExpressionList
(
fields
);
return
fromDataStream
(
dataStream
,
expressions
.
toArray
(
new
Expression
[
0
]));
}
@Override
public
<
T
>
void
createTemporaryView
(
String
path
,
DataStream
<
T
>
dataStream
,
String
fields
)
{
createTemporaryView
(
path
,
fromDataStream
(
dataStream
,
fields
));
}
@Override
public
<
T
>
void
createTemporaryView
(
String
path
,
DataStream
<
T
>
dataStream
,
Expression
...
fields
)
{
createTemporaryView
(
path
,
fromDataStream
(
dataStream
,
fields
));
}
protected
<
T
>
DataStreamQueryOperation
<
T
>
asQueryOperation
(
DataStream
<
T
>
dataStream
,
Optional
<
List
<
Expression
>>
fields
)
{
TypeInformation
<
T
>
streamType
=
dataStream
.
getType
();
FieldInfoUtils
.
TypeInfoSchema
typeInfoSchema
=
(
FieldInfoUtils
.
TypeInfoSchema
)
fields
.
map
((
f
)
->
{
FieldInfoUtils
.
TypeInfoSchema
fieldsInfo
=
FieldInfoUtils
.
getFieldsInfo
(
streamType
,
(
Expression
[])
f
.
toArray
(
new
Expression
[
0
]));
this
.
validateTimeCharacteristic
(
fieldsInfo
.
isRowtimeDefined
());
return
fieldsInfo
;
}).
orElseGet
(()
->
{
return
FieldInfoUtils
.
getFieldsInfo
(
streamType
);
});
return
new
DataStreamQueryOperation
(
dataStream
,
typeInfoSchema
.
getIndices
(),
typeInfoSchema
.
toResolvedSchema
());
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/executor/CustomTableResultImpl.java
0 → 100644
View file @
596167e2
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com
.
dlink
.
executor
;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.core.execution.JobClient
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.api.ResultKind
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.api.internal.ResultProvider
;
import
org.apache.flink.table.api.internal.TableResultInternal
;
import
org.apache.flink.table.catalog.Column
;
import
org.apache.flink.table.catalog.ResolvedSchema
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.types.DataType
;
import
org.apache.flink.table.utils.print.PrintStyle
;
import
org.apache.flink.table.utils.print.RowDataToStringConverter
;
import
org.apache.flink.table.utils.print.TableauStyle
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.util.CloseableIterator
;
import
org.apache.flink.util.Preconditions
;
import
javax.annotation.Nullable
;
import
java.io.PrintWriter
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Optional
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
/** Implementation for {@link TableResult}. */
@Internal
public
class
CustomTableResultImpl
implements
TableResultInternal
{
public
static
final
TableResult
TABLE_RESULT_OK
=
CustomTableResultImpl
.
builder
()
.
resultKind
(
ResultKind
.
SUCCESS
)
.
schema
(
ResolvedSchema
.
of
(
Column
.
physical
(
"result"
,
DataTypes
.
STRING
())))
.
data
(
Collections
.
singletonList
(
Row
.
of
(
"OK"
)))
.
build
();
private
final
JobClient
jobClient
;
private
final
ResolvedSchema
resolvedSchema
;
private
final
ResultKind
resultKind
;
private
final
ResultProvider
resultProvider
;
private
final
PrintStyle
printStyle
;
private
CustomTableResultImpl
(
@Nullable
JobClient
jobClient
,
ResolvedSchema
resolvedSchema
,
ResultKind
resultKind
,
ResultProvider
resultProvider
,
PrintStyle
printStyle
)
{
this
.
jobClient
=
jobClient
;
this
.
resolvedSchema
=
Preconditions
.
checkNotNull
(
resolvedSchema
,
"resolvedSchema should not be null"
);
this
.
resultKind
=
Preconditions
.
checkNotNull
(
resultKind
,
"resultKind should not be null"
);
Preconditions
.
checkNotNull
(
resultProvider
,
"result provider should not be null"
);
this
.
resultProvider
=
resultProvider
;
this
.
printStyle
=
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
}
public
static
TableResult
buildTableResult
(
List
<
TableSchemaField
>
fields
,
List
<
Row
>
rows
)
{
Builder
builder
=
builder
().
resultKind
(
ResultKind
.
SUCCESS
);
if
(
fields
.
size
()
>
0
)
{
List
<
String
>
columnNames
=
new
ArrayList
<>();
List
<
DataType
>
columnTypes
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
fields
.
size
();
i
++)
{
columnNames
.
add
(
fields
.
get
(
i
).
getName
());
columnTypes
.
add
(
fields
.
get
(
i
).
getType
());
}
builder
.
schema
(
ResolvedSchema
.
physical
(
columnNames
,
columnTypes
)).
data
(
rows
);
}
return
builder
.
build
();
}
@Override
public
Optional
<
JobClient
>
getJobClient
()
{
return
Optional
.
ofNullable
(
jobClient
);
}
@Override
public
void
await
()
throws
InterruptedException
,
ExecutionException
{
try
{
awaitInternal
(-
1
,
TimeUnit
.
MILLISECONDS
);
}
catch
(
TimeoutException
e
)
{
// do nothing
}
}
@Override
public
void
await
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
awaitInternal
(
timeout
,
unit
);
}
private
void
awaitInternal
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
if
(
jobClient
==
null
)
{
return
;
}
ExecutorService
executor
=
Executors
.
newFixedThreadPool
(
1
,
r
->
new
Thread
(
r
,
"TableResult-await-thread"
));
try
{
CompletableFuture
<
Void
>
future
=
CompletableFuture
.
runAsync
(
()
->
{
while
(!
resultProvider
.
isFirstRowReady
())
{
try
{
Thread
.
sleep
(
100
);
}
catch
(
InterruptedException
e
)
{
throw
new
TableException
(
"Thread is interrupted"
);
}
}
},
executor
);
if
(
timeout
>=
0
)
{
future
.
get
(
timeout
,
unit
);
}
else
{
future
.
get
();
}
}
finally
{
executor
.
shutdown
();
}
}
@Override
public
ResolvedSchema
getResolvedSchema
()
{
return
resolvedSchema
;
}
@Override
public
ResultKind
getResultKind
()
{
return
resultKind
;
}
@Override
public
CloseableIterator
<
Row
>
collect
()
{
return
resultProvider
.
toExternalIterator
();
}
@Override
public
CloseableIterator
<
RowData
>
collectInternal
()
{
return
resultProvider
.
toInternalIterator
();
}
@Override
public
RowDataToStringConverter
getRowDataToStringConverter
()
{
return
resultProvider
.
getRowDataStringConverter
();
}
@Override
public
void
print
()
{
Iterator
<
RowData
>
it
=
resultProvider
.
toInternalIterator
();
printStyle
.
print
(
it
,
new
PrintWriter
(
System
.
out
));
}
public
static
Builder
builder
()
{
return
new
Builder
();
}
/** Builder for creating a {@link CustomTableResultImpl}. */
public
static
class
Builder
{
private
JobClient
jobClient
=
null
;
private
ResolvedSchema
resolvedSchema
=
null
;
private
ResultKind
resultKind
=
null
;
private
ResultProvider
resultProvider
=
null
;
private
PrintStyle
printStyle
=
null
;
private
Builder
()
{}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public
Builder
jobClient
(
JobClient
jobClient
)
{
this
.
jobClient
=
jobClient
;
return
this
;
}
/**
* Specifies schema of the execution result.
*
* @param resolvedSchema a {@link ResolvedSchema} for the execution result.
*/
public
Builder
schema
(
ResolvedSchema
resolvedSchema
)
{
Preconditions
.
checkNotNull
(
resolvedSchema
,
"resolvedSchema should not be null"
);
this
.
resolvedSchema
=
resolvedSchema
;
return
this
;
}
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public
Builder
resultKind
(
ResultKind
resultKind
)
{
Preconditions
.
checkNotNull
(
resultKind
,
"resultKind should not be null"
);
this
.
resultKind
=
resultKind
;
return
this
;
}
public
Builder
resultProvider
(
ResultProvider
resultProvider
)
{
Preconditions
.
checkNotNull
(
resultProvider
,
"resultProvider should not be null"
);
this
.
resultProvider
=
resultProvider
;
return
this
;
}
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public
Builder
data
(
List
<
Row
>
rowList
)
{
Preconditions
.
checkNotNull
(
rowList
,
"listRows should not be null"
);
this
.
resultProvider
=
new
StaticResultProvider
(
rowList
);
return
this
;
}
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
public
Builder
setPrintStyle
(
PrintStyle
printStyle
)
{
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
this
.
printStyle
=
printStyle
;
return
this
;
}
/** Returns a {@link TableResult} instance. */
public
TableResultInternal
build
()
{
if
(
printStyle
==
null
)
{
printStyle
=
PrintStyle
.
rawContent
(
resultProvider
.
getRowDataStringConverter
());
}
return
new
CustomTableResultImpl
(
jobClient
,
resolvedSchema
,
resultKind
,
resultProvider
,
printStyle
);
}
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/executor/StaticResultProvider.java
0 → 100644
View file @
596167e2
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com
.
dlink
.
executor
;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.annotation.VisibleForTesting
;
import
org.apache.flink.core.execution.JobClient
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.internal.ResultProvider
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.utils.print.PrintStyle
;
import
org.apache.flink.table.utils.print.RowDataToStringConverter
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.util.CloseableIterator
;
import
java.util.List
;
import
java.util.function.Function
;
/** Create result provider from a static set of data using external types. */
@Internal
class
StaticResultProvider
implements
ResultProvider
{
/**
* This converter supports only String, long, int and boolean fields. Moreover, this converter
* works only with {@link GenericRowData}.
*/
static
final
RowDataToStringConverter
SIMPLE_ROW_DATA_TO_STRING_CONVERTER
=
rowData
->
{
GenericRowData
genericRowData
=
(
GenericRowData
)
rowData
;
String
[]
results
=
new
String
[
rowData
.
getArity
()];
for
(
int
i
=
0
;
i
<
results
.
length
;
i
++)
{
Object
value
=
genericRowData
.
getField
(
i
);
if
(
Boolean
.
TRUE
.
equals
(
value
))
{
results
[
i
]
=
"TRUE"
;
}
else
if
(
Boolean
.
FALSE
.
equals
(
value
))
{
results
[
i
]
=
"FALSE"
;
}
else
{
results
[
i
]
=
value
==
null
?
PrintStyle
.
NULL_VALUE
:
""
+
value
;
}
}
return
results
;
};
private
final
List
<
Row
>
rows
;
private
final
Function
<
Row
,
RowData
>
externalToInternalConverter
;
public
StaticResultProvider
(
List
<
Row
>
rows
)
{
this
(
rows
,
StaticResultProvider:
:
rowToInternalRow
);
}
public
StaticResultProvider
(
List
<
Row
>
rows
,
Function
<
Row
,
RowData
>
externalToInternalConverter
)
{
this
.
rows
=
rows
;
this
.
externalToInternalConverter
=
externalToInternalConverter
;
}
@Override
public
StaticResultProvider
setJobClient
(
JobClient
jobClient
)
{
return
this
;
}
@Override
public
CloseableIterator
<
RowData
>
toInternalIterator
()
{
return
CloseableIterator
.
adapterForIterator
(
this
.
rows
.
stream
().
map
(
this
.
externalToInternalConverter
).
iterator
());
}
@Override
public
CloseableIterator
<
Row
>
toExternalIterator
()
{
return
CloseableIterator
.
adapterForIterator
(
this
.
rows
.
iterator
());
}
@Override
public
RowDataToStringConverter
getRowDataStringConverter
()
{
return
SIMPLE_ROW_DATA_TO_STRING_CONVERTER
;
}
@Override
public
boolean
isFirstRowReady
()
{
return
true
;
}
/** This function supports only String, long, int and boolean fields. */
@VisibleForTesting
static
RowData
rowToInternalRow
(
Row
row
)
{
Object
[]
values
=
new
Object
[
row
.
getArity
()];
for
(
int
i
=
0
;
i
<
row
.
getArity
();
i
++)
{
Object
value
=
row
.
getField
(
i
);
if
(
value
==
null
)
{
values
[
i
]
=
null
;
}
else
if
(
value
instanceof
String
)
{
values
[
i
]
=
StringData
.
fromString
((
String
)
value
);
}
else
if
(
value
instanceof
Boolean
||
value
instanceof
Long
||
value
instanceof
Integer
)
{
values
[
i
]
=
value
;
}
else
{
throw
new
TableException
(
"Cannot convert row type"
);
}
}
return
GenericRowData
.
of
(
values
);
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/executor/TableSchemaField.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
executor
;
import
org.apache.flink.table.types.DataType
;
/**
* @author wenmo
* @since 2022/05/08
**/
public
class
TableSchemaField
{
private
String
name
;
private
DataType
type
;
public
TableSchemaField
(
String
name
,
DataType
type
)
{
this
.
name
=
name
;
this
.
type
=
type
;
}
public
String
getName
()
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
DataType
getType
()
{
return
type
;
}
public
void
setType
(
DataType
type
)
{
this
.
type
=
type
;
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/utils/FlinkUtil.java
0 → 100644
View file @
596167e2
package
com
.
dlink
.
utils
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.ContextResolvedTable
;
import
org.apache.flink.table.catalog.ObjectIdentifier
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Optional
;
/**
* FlinkUtil
*
* @author wenmo
* @since 2022/05/08
*/
public
class
FlinkUtil
{
public
static
List
<
String
>
getFieldNamesFromCatalogManager
(
CatalogManager
catalogManager
,
String
catalog
,
String
database
,
String
table
)
{
Optional
<
ContextResolvedTable
>
tableOpt
=
catalogManager
.
getTable
(
ObjectIdentifier
.
of
(
catalog
,
database
,
table
)
);
if
(
tableOpt
.
isPresent
())
{
return
tableOpt
.
get
().
getResolvedSchema
().
getColumnNames
();
}
else
{
return
new
ArrayList
<
String
>();
}
}
public
static
List
<
String
>
catchColumn
(
TableResult
tableResult
)
{
return
tableResult
.
getResolvedSchema
().
getColumnNames
();
}
}
dlink-client/pom.xml
View file @
596167e2
...
...
@@ -19,5 +19,6 @@
<module>
dlink-client-1.14
</module>
<module>
dlink-client-hadoop
</module>
<module>
dlink-client-base
</module>
<module>
dlink-client-1.15
</module>
</modules>
</project>
\ No newline at end of file
dlink-flink/dlink-flink-1.15/pom.xml
0 → 100644
View file @
596167e2
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.6.2
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-flink-1.15
</artifactId>
<properties>
<java.version>
1.8
</java.version>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<flink.version>
1.15.0
</flink.version>
<flinkcdc.version>
2.2.1
</flinkcdc.version>
<commons.version>
1.3.1
</commons.version>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
<junit.version>
4.12
</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-planner-loader
</artifactId>
<exclusions>
<exclusion>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</exclusion>
</exclusions>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-api-java-bridge
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-core
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-common
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-api-java
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-clients
</artifactId>
<exclusions>
<exclusion>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</exclusion>
</exclusions>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-yarn
</artifactId>
<version>
${flink.version}
</version>
<exclusions>
<exclusion>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-yarn-common
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-common
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-hdfs
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-yarn-client
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-mapreduce-client-core
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-kubernetes
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-connector-kafka
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
com.ververica
</groupId>
<artifactId>
flink-connector-mysql-cdc
</artifactId>
<version>
${flinkcdc.version}
</version>
</dependency>
<dependency>
<groupId>
com.ververica
</groupId>
<artifactId>
flink-connector-oracle-cdc
</artifactId>
<version>
${flinkcdc.version}
</version>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
commons-cli
</groupId>
<artifactId>
commons-cli
</artifactId>
<version>
${commons.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.doris
</groupId>
<artifactId>
flink-doris-connector-1.14_2.12
</artifactId>
<version>
1.0.3
</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
dlink-flink/pom.xml
View file @
596167e2
...
...
@@ -16,6 +16,7 @@
<module>
dlink-flink-1.12
</module>
<module>
dlink-flink-1.13
</module>
<module>
dlink-flink-1.14
</module>
<module>
dlink-flink-1.15
</module>
</modules>
<properties>
...
...
docs/zh-CN/feature.md
View file @
596167e2
...
...
@@ -11,7 +11,7 @@
| | | 支持 INSERT 语句集 | 0.4.0 |
| | | 新增 SQL 片段语法(全局变量) | 0.4.0 |
| | | 新增 AGGTABLE 表值聚合语法及 UDATF 支持 | 0.4.0 |
| | | 新增 CDCSOURCE
多源合并语法支持 | 0.6.0
|
| | | 新增 CDCSOURCE
整库实时入仓入湖语法支持 | 0.6.3
|
| | | 新增 FlinkSQLEnv 执行环境复用 | 0.5.0 |
| | | 新增 Flink Catalog 交互查询 | 0.4.0 |
| | | 新增 执行环境的共享与私有会话机制 | 0.4.0 |
...
...
@@ -47,6 +47,7 @@
| | | 支持 1.12.0+ | 0.4.0 |
| | | 支持 1.13.0+ | 0.4.0 |
| | | 支持 1.14.0+ | 0.4.0 |
| | | 支持 1.15.0+ | 0.6.2 |
| | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 |
| | | 新增 外部数据源的 SQL 执行与预览 | 0.5.0 |
| | BI | 新增 折线图的渲染 | 0.5.0 |
...
...
pom.xml
View file @
596167e2
...
...
@@ -210,6 +210,11 @@
<artifactId>
dlink-client-1.11
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.15
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-connector-jdbc-1.11
</artifactId>
...
...
@@ -360,6 +365,11 @@
<artifactId>
dlink-flink-1.14
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-flink-1.15
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpclient
</artifactId>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment