Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
561f7f13
Commit
561f7f13
authored
Feb 10, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
CDC多源合并扩展startup
'startup'='INITIAL|EARLIEST|LATEST'
parent
94960621
Changes
7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
110 additions
and
40 deletions
+110
-40
FlinkCDCMergeBuilder.java
....11/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+20
-4
FlinkCDCMergeBuilder.java
....12/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+20
-4
FlinkCDCMergeBuilder.java
....13/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+20
-4
FlinkCDCMergeBuilder.java
....14/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+20
-4
FlinkCDCConfig.java
...nt-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
+17
-7
CDCSource.java
...executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
+12
-16
CreateCDCSourceOperation.java
...in/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
+1
-1
No files found.
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
561f7f13
...
@@ -38,10 +38,26 @@ public class FlinkCDCMergeBuilder {
...
@@ -38,10 +38,26 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getTable
())&&
config
.
getTable
().
size
()>
0
){
if
(
Asserts
.
isNotNull
(
config
.
getTable
())&&
config
.
getTable
().
size
()>
0
){
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
}
MySqlSource
<
String
>
sourceFunction
=
sourceBuilder
MySqlSourceBuilder
<
String
>
builder
=
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
())
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
.
startupOptions
(
StartupOptions
.
latest
())
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
())){
.
build
();
switch
(
config
.
getStartupMode
().
toUpperCase
()){
case
"INITIAL"
:
builder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
builder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
MySqlSource
<
String
>
sourceFunction
=
builder
.
build
();
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
}
...
...
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
561f7f13
...
@@ -38,10 +38,26 @@ public class FlinkCDCMergeBuilder {
...
@@ -38,10 +38,26 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getTable
())&&
config
.
getTable
().
size
()>
0
){
if
(
Asserts
.
isNotNull
(
config
.
getTable
())&&
config
.
getTable
().
size
()>
0
){
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
}
MySqlSource
<
String
>
sourceFunction
=
sourceBuilder
MySqlSourceBuilder
<
String
>
builder
=
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
())
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
.
startupOptions
(
StartupOptions
.
latest
())
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
())){
.
build
();
switch
(
config
.
getStartupMode
().
toUpperCase
()){
case
"INITIAL"
:
builder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
builder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
MySqlSource
<
String
>
sourceFunction
=
builder
.
build
();
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
}
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
561f7f13
...
@@ -38,10 +38,26 @@ public class FlinkCDCMergeBuilder {
...
@@ -38,10 +38,26 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getTable
())&&
config
.
getTable
().
size
()>
0
){
if
(
Asserts
.
isNotNull
(
config
.
getTable
())&&
config
.
getTable
().
size
()>
0
){
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
}
MySqlSource
<
String
>
sourceFunction
=
sourceBuilder
MySqlSourceBuilder
<
String
>
builder
=
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
())
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
.
startupOptions
(
StartupOptions
.
latest
())
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
())){
.
build
();
switch
(
config
.
getStartupMode
().
toUpperCase
()){
case
"INITIAL"
:
builder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
builder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
MySqlSource
<
String
>
sourceFunction
=
builder
.
build
();
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
}
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
561f7f13
...
@@ -38,10 +38,26 @@ public class FlinkCDCMergeBuilder {
...
@@ -38,10 +38,26 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getTable
())&&
config
.
getTable
().
size
()>
0
){
if
(
Asserts
.
isNotNull
(
config
.
getTable
())&&
config
.
getTable
().
size
()>
0
){
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
}
MySqlSource
<
String
>
sourceFunction
=
sourceBuilder
MySqlSourceBuilder
<
String
>
builder
=
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
())
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
.
startupOptions
(
StartupOptions
.
latest
())
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
())){
.
build
();
switch
(
config
.
getStartupMode
().
toUpperCase
()){
case
"INITIAL"
:
builder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
builder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
builder
.
startupOptions
(
StartupOptions
.
latest
());
}
MySqlSource
<
String
>
sourceFunction
=
builder
.
build
();
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceFunction
,
WatermarkStrategy
.
noWatermarks
(),
"MySQL Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
}
...
...
dlink-client/dlink-client-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
View file @
561f7f13
...
@@ -18,13 +18,14 @@ public class FlinkCDCConfig {
...
@@ -18,13 +18,14 @@ public class FlinkCDCConfig {
private
Integer
parallelism
;
private
Integer
parallelism
;
private
List
<
String
>
database
;
private
List
<
String
>
database
;
private
List
<
String
>
table
;
private
List
<
String
>
table
;
private
String
startupMode
;
private
String
topic
;
private
String
topic
;
private
String
brokers
;
private
String
brokers
;
public
FlinkCDCConfig
()
{
public
FlinkCDCConfig
()
{
}
}
public
FlinkCDCConfig
(
String
hostname
,
int
port
,
String
username
,
String
password
,
int
checkpoint
,
int
parallelism
,
List
<
String
>
database
,
List
<
String
>
table
,
String
topic
,
String
brokers
)
{
public
FlinkCDCConfig
(
String
hostname
,
int
port
,
String
username
,
String
password
,
int
checkpoint
,
int
parallelism
,
List
<
String
>
database
,
List
<
String
>
table
,
String
startupMode
,
String
topic
,
String
brokers
)
{
this
.
hostname
=
hostname
;
this
.
hostname
=
hostname
;
this
.
port
=
port
;
this
.
port
=
port
;
this
.
username
=
username
;
this
.
username
=
username
;
...
@@ -33,6 +34,7 @@ public class FlinkCDCConfig {
...
@@ -33,6 +34,7 @@ public class FlinkCDCConfig {
this
.
parallelism
=
parallelism
;
this
.
parallelism
=
parallelism
;
this
.
database
=
database
;
this
.
database
=
database
;
this
.
table
=
table
;
this
.
table
=
table
;
this
.
startupMode
=
startupMode
;
this
.
topic
=
topic
;
this
.
topic
=
topic
;
this
.
brokers
=
brokers
;
this
.
brokers
=
brokers
;
}
}
...
@@ -45,11 +47,11 @@ public class FlinkCDCConfig {
...
@@ -45,11 +47,11 @@ public class FlinkCDCConfig {
this
.
hostname
=
hostname
;
this
.
hostname
=
hostname
;
}
}
public
int
getPort
()
{
public
Integer
getPort
()
{
return
port
;
return
port
;
}
}
public
void
setPort
(
int
port
)
{
public
void
setPort
(
Integer
port
)
{
this
.
port
=
port
;
this
.
port
=
port
;
}
}
...
@@ -69,19 +71,19 @@ public class FlinkCDCConfig {
...
@@ -69,19 +71,19 @@ public class FlinkCDCConfig {
this
.
password
=
password
;
this
.
password
=
password
;
}
}
public
int
getCheckpoint
()
{
public
Integer
getCheckpoint
()
{
return
checkpoint
;
return
checkpoint
;
}
}
public
void
setCheckpoint
(
int
checkpoint
)
{
public
void
setCheckpoint
(
Integer
checkpoint
)
{
this
.
checkpoint
=
checkpoint
;
this
.
checkpoint
=
checkpoint
;
}
}
public
int
getParallelism
()
{
public
Integer
getParallelism
()
{
return
parallelism
;
return
parallelism
;
}
}
public
void
setParallelism
(
int
parallelism
)
{
public
void
setParallelism
(
Integer
parallelism
)
{
this
.
parallelism
=
parallelism
;
this
.
parallelism
=
parallelism
;
}
}
...
@@ -116,4 +118,12 @@ public class FlinkCDCConfig {
...
@@ -116,4 +118,12 @@ public class FlinkCDCConfig {
public
void
setBrokers
(
String
brokers
)
{
public
void
setBrokers
(
String
brokers
)
{
this
.
brokers
=
brokers
;
this
.
brokers
=
brokers
;
}
}
public
String
getStartupMode
()
{
return
startupMode
;
}
public
void
setStartupMode
(
String
startupMode
)
{
this
.
startupMode
=
startupMode
;
}
}
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
View file @
561f7f13
...
@@ -29,10 +29,11 @@ public class CDCSource {
...
@@ -29,10 +29,11 @@ public class CDCSource {
private
Integer
parallelism
;
private
Integer
parallelism
;
private
List
<
String
>
database
;
private
List
<
String
>
database
;
private
List
<
String
>
table
;
private
List
<
String
>
table
;
private
String
startupMode
;
private
String
topic
;
private
String
topic
;
private
String
brokers
;
private
String
brokers
;
public
CDCSource
(
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
topic
,
String
brokers
)
{
public
CDCSource
(
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
startupMode
,
String
topic
,
String
brokers
)
{
this
.
statement
=
statement
;
this
.
statement
=
statement
;
this
.
name
=
name
;
this
.
name
=
name
;
this
.
hostname
=
hostname
;
this
.
hostname
=
hostname
;
...
@@ -41,21 +42,7 @@ public class CDCSource {
...
@@ -41,21 +42,7 @@ public class CDCSource {
this
.
password
=
password
;
this
.
password
=
password
;
this
.
checkpoint
=
checkpoint
;
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
parallelism
=
parallelism
;
this
.
topic
=
topic
;
this
.
startupMode
=
startupMode
;
this
.
brokers
=
brokers
;
}
public
CDCSource
(
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
List
<
String
>
database
,
List
<
String
>
table
,
String
topic
,
String
brokers
)
{
this
.
statement
=
statement
;
this
.
name
=
name
;
this
.
hostname
=
hostname
;
this
.
port
=
port
;
this
.
username
=
username
;
this
.
password
=
password
;
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
database
=
database
;
this
.
table
=
table
;
this
.
topic
=
topic
;
this
.
topic
=
topic
;
this
.
brokers
=
brokers
;
this
.
brokers
=
brokers
;
}
}
...
@@ -71,6 +58,7 @@ public class CDCSource {
...
@@ -71,6 +58,7 @@ public class CDCSource {
config
.
get
(
"password"
),
config
.
get
(
"password"
),
Integer
.
valueOf
(
config
.
get
(
"checkpoint"
)),
Integer
.
valueOf
(
config
.
get
(
"checkpoint"
)),
Integer
.
valueOf
(
config
.
get
(
"parallelism"
)),
Integer
.
valueOf
(
config
.
get
(
"parallelism"
)),
config
.
get
(
"startup"
),
config
.
get
(
"topic"
),
config
.
get
(
"topic"
),
config
.
get
(
"brokers"
)
config
.
get
(
"brokers"
)
);
);
...
@@ -190,4 +178,12 @@ public class CDCSource {
...
@@ -190,4 +178,12 @@ public class CDCSource {
public
void
setBrokers
(
String
brokers
)
{
public
void
setBrokers
(
String
brokers
)
{
this
.
brokers
=
brokers
;
this
.
brokers
=
brokers
;
}
}
public
String
getStartupMode
()
{
return
startupMode
;
}
public
void
setStartupMode
(
String
startupMode
)
{
this
.
startupMode
=
startupMode
;
}
}
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
View file @
561f7f13
...
@@ -38,7 +38,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
...
@@ -38,7 +38,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCSource
cdcSource
=
CDCSource
.
build
(
statement
);
CDCSource
cdcSource
=
CDCSource
.
build
(
statement
);
FlinkCDCConfig
config
=
new
FlinkCDCConfig
(
cdcSource
.
getHostname
(),
cdcSource
.
getPort
(),
cdcSource
.
getUsername
()
FlinkCDCConfig
config
=
new
FlinkCDCConfig
(
cdcSource
.
getHostname
(),
cdcSource
.
getPort
(),
cdcSource
.
getUsername
()
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getTable
()
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getTable
()
,
cdcSource
.
getTopic
(),
cdcSource
.
getBrokers
());
,
cdcSource
.
get
StartupMode
(),
cdcSource
.
get
Topic
(),
cdcSource
.
getBrokers
());
try
{
try
{
FlinkCDCMergeBuilder
.
buildMySqlCDC
(
executor
.
getStreamExecutionEnvironment
(),
config
);
FlinkCDCMergeBuilder
.
buildMySqlCDC
(
executor
.
getStreamExecutionEnvironment
(),
config
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment