Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
b1455045
Commit
b1455045
authored
Apr 10, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[Optimization-384][client] Optimization CDCSourceMerge
parent
d88154a7
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
89 additions
and
107 deletions
+89
-107
pom.xml
dlink-client/dlink-client-1.11/pom.xml
+2
-2
FlinkCDCMergeBuilder.java
....11/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+15
-35
pom.xml
dlink-client/dlink-client-1.12/pom.xml
+2
-2
FlinkCDCMergeBuilder.java
....12/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+21
-23
pom.xml
dlink-client/dlink-client-1.13/pom.xml
+1
-1
FlinkCDCMergeBuilder.java
....13/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+19
-20
pom.xml
dlink-client/dlink-client-1.14/pom.xml
+1
-1
FlinkCDCMergeBuilder.java
....14/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+28
-23
No files found.
dlink-client/dlink-client-1.11/pom.xml
View file @
b1455045
...
...
@@ -14,7 +14,7 @@
<java.version>
1.8
</java.version>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<flink.version>
1.11.6
</flink.version>
<flinkcdc.version>
2.1.1
</flinkcdc.version>
<flinkcdc.version>
1.1.0
</flinkcdc.version>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
<junit.version>
4.12
</junit.version>
...
...
@@ -90,7 +90,7 @@
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
com.ververica
</groupId>
<groupId>
com.
alibaba.
ververica
</groupId>
<artifactId>
flink-connector-mysql-cdc
</artifactId>
<version>
${flinkcdc.version}
</version>
</dependency>
...
...
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
b1455045
package
com
.
dlink
.
cdc
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSource
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder
;
import
com.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
;
import
com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* FlinkCDCMergeBuilder
*
...
...
@@ -27,44 +25,26 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
}
MyS
qlSourceBuilder
<
String
>
sourceBuilder
=
MySql
Source
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
MyS
QLSource
.
Builder
<
String
>
sourceBuilder
=
MySQL
Source
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNull
(
config
.
getDatabase
())
&&
config
.
getDatabase
().
size
()
>
0
)
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
toArray
(
new
String
[
0
]));
}
if
(
Asserts
.
isNotNull
(
config
.
getTable
())
&&
config
.
getTable
().
size
()
>
0
)
{
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
MySqlSourceBuilder
<
String
>
builder
=
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
builder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
builder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
MySqlSource
<
String
>
sourceFunction
=
builder
.
build
();
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
sourceBuilder
.
deserializer
(
new
StringDebeziumDeserializationSchema
());
DataStreamSource
<
String
>
streamSource
=
env
.
addSource
(
sourceBuilder
.
build
(),
"MySQL CDC Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
private
static
FlinkKafkaProducer
<
String
>
getKafkaProducer
(
String
brokers
,
String
topic
)
{
return
new
FlinkKafkaProducer
<
String
>(
brokers
,
topic
,
new
SimpleStringSchema
());
topic
,
new
SimpleStringSchema
());
}
}
dlink-client/dlink-client-1.12/pom.xml
View file @
b1455045
...
...
@@ -15,7 +15,7 @@
<java.version>
1.8
</java.version>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<flink.version>
1.12.7
</flink.version>
<flinkcdc.version>
2.1.1
</flinkcdc.version>
<flinkcdc.version>
1.3.0
</flinkcdc.version>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
<junit.version>
4.12
</junit.version>
...
...
@@ -91,7 +91,7 @@
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
com.ververica
</groupId>
<groupId>
com.
alibaba.
ververica
</groupId>
<artifactId>
flink-connector-mysql-cdc
</artifactId>
<version>
${flinkcdc.version}
</version>
</dependency>
...
...
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
b1455045
package
com
.
dlink
.
cdc
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSource
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder
;
import
com.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
;
import
com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* FlinkCDCMergeBuilder
*
...
...
@@ -27,44 +26,43 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
}
MyS
qlSourceBuilder
<
String
>
sourceBuilder
=
MySql
Source
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
MyS
QLSource
.
Builder
<
String
>
sourceBuilder
=
MySQL
Source
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNull
(
config
.
getDatabase
())
&&
config
.
getDatabase
().
size
()
>
0
)
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
toArray
(
new
String
[
0
]));
}
if
(
Asserts
.
isNotNull
(
config
.
getTable
())
&&
config
.
getTable
().
size
()
>
0
)
{
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
MySqlSourceBuilder
<
String
>
builder
=
sourceBuilder
.
deserializer
(
new
Json
DebeziumDeserializationSchema
());
sourceBuilder
.
deserializer
(
new
String
DebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
b
uilder
.
startupOptions
(
StartupOptions
.
initial
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
b
uilder
.
startupOptions
(
StartupOptions
.
earliest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
b
uilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
b
uilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
b
uilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
latest
());
}
MySqlSource
<
String
>
sourceFunction
=
builder
.
build
();
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
DataStreamSource
<
String
>
streamSource
=
env
.
addSource
(
sourceBuilder
.
build
(),
"MySQL CDC Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
private
static
FlinkKafkaProducer
<
String
>
getKafkaProducer
(
String
brokers
,
String
topic
)
{
return
new
FlinkKafkaProducer
<
String
>(
brokers
,
topic
,
new
SimpleStringSchema
());
topic
,
new
SimpleStringSchema
());
}
}
dlink-client/dlink-client-1.13/pom.xml
View file @
b1455045
...
...
@@ -15,7 +15,7 @@
<java.version>
1.8
</java.version>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<flink.version>
1.13.6
</flink.version>
<flinkcdc.version>
2.
1.1
</flinkcdc.version>
<flinkcdc.version>
2.
2.0
</flinkcdc.version>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
<junit.version>
4.12
</junit.version>
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
b1455045
package
com
.
dlink
.
cdc
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSource
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder
;
import
com.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
/**
* FlinkCDCMergeBuilder
...
...
@@ -28,43 +29,41 @@ public class FlinkCDCMergeBuilder {
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNull
(
config
.
getDatabase
())
&&
config
.
getDatabase
().
size
()
>
0
)
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
toArray
(
new
String
[
0
]));
}
if
(
Asserts
.
isNotNull
(
config
.
getTable
())
&&
config
.
getTable
().
size
()
>
0
)
{
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
MySqlSourceBuilder
<
String
>
builder
=
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
b
uilder
.
startupOptions
(
StartupOptions
.
initial
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
b
uilder
.
startupOptions
(
StartupOptions
.
earliest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
b
uilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
b
uilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
b
uilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
latest
());
}
MySqlSource
<
String
>
sourceFunction
=
builder
.
build
();
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
private
static
FlinkKafkaProducer
<
String
>
getKafkaProducer
(
String
brokers
,
String
topic
)
{
return
new
FlinkKafkaProducer
<
String
>(
brokers
,
topic
,
new
SimpleStringSchema
());
topic
,
new
SimpleStringSchema
());
}
}
dlink-client/dlink-client-1.14/pom.xml
View file @
b1455045
...
...
@@ -14,7 +14,7 @@
<java.version>
1.8
</java.version>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<flink.version>
1.14.3
</flink.version>
<flinkcdc.version>
2.
1.1
</flinkcdc.version>
<flinkcdc.version>
2.
2.0
</flinkcdc.version>
<commons.version>
1.3.1
</commons.version>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
b1455045
package
com
.
dlink
.
cdc
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
;
import
org.apache.flink.connector.kafka.sink.KafkaSink
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSource
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder
;
import
com.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
/**
* FlinkCDCMergeBuilder
...
...
@@ -28,43 +30,46 @@ public class FlinkCDCMergeBuilder {
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNull
(
config
.
getDatabase
())
&&
config
.
getDatabase
().
size
()
>
0
)
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
toArray
(
new
String
[
0
]));
}
if
(
Asserts
.
isNotNull
(
config
.
getTable
())
&&
config
.
getTable
().
size
()
>
0
)
{
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
MySqlSourceBuilder
<
String
>
builder
=
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
b
uilder
.
startupOptions
(
StartupOptions
.
initial
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
b
uilder
.
startupOptions
(
StartupOptions
.
earliest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
b
uilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
b
uilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
b
uilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceB
uilder
.
startupOptions
(
StartupOptions
.
latest
());
}
MySqlSource
<
String
>
sourceFunction
=
builder
.
build
();
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
streamSource
.
sinkTo
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
private
static
FlinkKafkaProducer
<
String
>
getKafkaProducer
(
String
brokers
,
String
topic
)
{
return
new
FlinkKafkaProducer
<
String
>(
brokers
,
topic
,
new
SimpleStringSchema
());
private
static
KafkaSink
<
String
>
getKafkaProducer
(
String
brokers
,
String
topic
)
{
return
KafkaSink
.<
String
>
builder
()
.
setBootstrapServers
(
brokers
)
.
setRecordSerializer
(
KafkaRecordSerializationSchema
.
builder
()
.
setTopic
(
topic
)
.
setValueSerializationSchema
(
new
SimpleStringSchema
())
.
build
()
)
.
build
();
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment