Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
de28dc10
Commit
de28dc10
authored
May 03, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[Feature-469][client] Add MysqlCDCSource sync extended configuration
parent
2da5c901
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
130 additions
and
12 deletions
+130
-12
MysqlCDCBuilder.java
...13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+44
-1
MysqlCDCBuilder.java
...14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+44
-1
FlinkCDCConfig.java
...nt-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
+13
-2
CDCSource.java
...executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
+28
-7
CreateCDCSourceOperation.java
...in/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
+1
-1
No files found.
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
View file @
de28dc10
...
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
...
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
java.time.Duration
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashMap
;
...
@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
...
@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
String
database
=
config
.
getDatabase
();
String
serverId
=
config
.
getSource
().
get
(
"server-id"
);
String
serverTimeZone
=
config
.
getSource
().
get
(
"server-time-zone"
);
String
fetchSize
=
config
.
getSource
().
get
(
"scan.snapshot.fetch.size"
);
String
connectTimeout
=
config
.
getSource
().
get
(
"connect.timeout"
);
String
connectMaxRetries
=
config
.
getSource
().
get
(
"connect.max-retries"
);
String
connectionPoolSize
=
config
.
getSource
().
get
(
"connection.pool.size"
);
String
heartbeatInterval
=
config
.
getSource
().
get
(
"heartbeat.interval"
);
Properties
properties
=
new
Properties
();
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
}
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
.
password
(
config
.
getPassword
());
String
database
=
config
.
getDatabase
();
if
(
Asserts
.
isNotNullString
(
database
))
{
if
(
Asserts
.
isNotNullString
(
database
))
{
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
sourceBuilder
.
databaseList
(
databases
);
sourceBuilder
.
databaseList
(
databases
);
}
else
{
}
else
{
sourceBuilder
.
databaseList
(
new
String
[
0
]);
sourceBuilder
.
databaseList
(
new
String
[
0
]);
}
}
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
}
else
{
}
else
{
sourceBuilder
.
tableList
(
new
String
[
0
]);
sourceBuilder
.
tableList
(
new
String
[
0
]);
}
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
case
"initial"
:
case
"initial"
:
...
@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
...
@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
}
else
{
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
if
(
Asserts
.
isNotNullString
(
serverId
))
{
sourceBuilder
.
serverId
(
serverId
);
}
if
(
Asserts
.
isNotNullString
(
serverTimeZone
))
{
sourceBuilder
.
serverTimeZone
(
serverTimeZone
);
}
if
(
Asserts
.
isNotNullString
(
fetchSize
))
{
sourceBuilder
.
fetchSize
(
Integer
.
valueOf
(
fetchSize
));
}
if
(
Asserts
.
isNotNullString
(
connectTimeout
))
{
sourceBuilder
.
connectTimeout
(
Duration
.
ofMillis
(
Long
.
valueOf
(
connectTimeout
)));
}
if
(
Asserts
.
isNotNullString
(
connectMaxRetries
))
{
sourceBuilder
.
connectMaxRetries
(
Integer
.
valueOf
(
connectMaxRetries
));
}
if
(
Asserts
.
isNotNullString
(
connectionPoolSize
))
{
sourceBuilder
.
connectionPoolSize
(
Integer
.
valueOf
(
connectionPoolSize
));
}
if
(
Asserts
.
isNotNullString
(
heartbeatInterval
))
{
sourceBuilder
.
heartbeatInterval
(
Duration
.
ofMillis
(
Long
.
valueOf
(
heartbeatInterval
)));
}
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
}
}
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
View file @
de28dc10
...
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
...
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
java.time.Duration
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashMap
;
...
@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
...
@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
String
database
=
config
.
getDatabase
();
String
serverId
=
config
.
getSource
().
get
(
"server-id"
);
String
serverTimeZone
=
config
.
getSource
().
get
(
"server-time-zone"
);
String
fetchSize
=
config
.
getSource
().
get
(
"scan.snapshot.fetch.size"
);
String
connectTimeout
=
config
.
getSource
().
get
(
"connect.timeout"
);
String
connectMaxRetries
=
config
.
getSource
().
get
(
"connect.max-retries"
);
String
connectionPoolSize
=
config
.
getSource
().
get
(
"connection.pool.size"
);
String
heartbeatInterval
=
config
.
getSource
().
get
(
"heartbeat.interval"
);
Properties
properties
=
new
Properties
();
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
}
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
.
password
(
config
.
getPassword
());
String
database
=
config
.
getDatabase
();
if
(
Asserts
.
isNotNullString
(
database
))
{
if
(
Asserts
.
isNotNullString
(
database
))
{
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
sourceBuilder
.
databaseList
(
databases
);
sourceBuilder
.
databaseList
(
databases
);
}
else
{
}
else
{
sourceBuilder
.
databaseList
(
new
String
[
0
]);
sourceBuilder
.
databaseList
(
new
String
[
0
]);
}
}
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
}
else
{
}
else
{
sourceBuilder
.
tableList
(
new
String
[
0
]);
sourceBuilder
.
tableList
(
new
String
[
0
]);
}
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
case
"initial"
:
case
"initial"
:
...
@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
...
@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
}
else
{
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
if
(
Asserts
.
isNotNullString
(
serverId
))
{
sourceBuilder
.
serverId
(
serverId
);
}
if
(
Asserts
.
isNotNullString
(
serverTimeZone
))
{
sourceBuilder
.
serverTimeZone
(
serverTimeZone
);
}
if
(
Asserts
.
isNotNullString
(
fetchSize
))
{
sourceBuilder
.
fetchSize
(
Integer
.
valueOf
(
fetchSize
));
}
if
(
Asserts
.
isNotNullString
(
connectTimeout
))
{
sourceBuilder
.
connectTimeout
(
Duration
.
ofMillis
(
Long
.
valueOf
(
connectTimeout
)));
}
if
(
Asserts
.
isNotNullString
(
connectMaxRetries
))
{
sourceBuilder
.
connectMaxRetries
(
Integer
.
valueOf
(
connectMaxRetries
));
}
if
(
Asserts
.
isNotNullString
(
connectionPoolSize
))
{
sourceBuilder
.
connectionPoolSize
(
Integer
.
valueOf
(
connectionPoolSize
));
}
if
(
Asserts
.
isNotNullString
(
heartbeatInterval
))
{
sourceBuilder
.
heartbeatInterval
(
Duration
.
ofMillis
(
Long
.
valueOf
(
heartbeatInterval
)));
}
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
}
}
...
...
dlink-client/dlink-client-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
View file @
de28dc10
...
@@ -24,6 +24,7 @@ public class FlinkCDCConfig {
...
@@ -24,6 +24,7 @@ public class FlinkCDCConfig {
private
List
<
String
>
schemaTableNameList
;
private
List
<
String
>
schemaTableNameList
;
private
String
startupMode
;
private
String
startupMode
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
source
;
private
Map
<
String
,
String
>
sink
;
private
Map
<
String
,
String
>
sink
;
private
List
<
Schema
>
schemaList
;
private
List
<
Schema
>
schemaList
;
private
String
schemaFieldName
;
private
String
schemaFieldName
;
...
@@ -31,8 +32,9 @@ public class FlinkCDCConfig {
...
@@ -31,8 +32,9 @@ public class FlinkCDCConfig {
public
FlinkCDCConfig
()
{
public
FlinkCDCConfig
()
{
}
}
public
FlinkCDCConfig
(
String
type
,
String
hostname
,
int
port
,
String
username
,
String
password
,
int
checkpoint
,
int
parallelism
,
String
database
,
String
schema
,
String
table
,
String
startupMode
,
public
FlinkCDCConfig
(
String
type
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
database
,
String
schema
,
String
table
,
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
sink
)
{
String
startupMode
,
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
source
,
Map
<
String
,
String
>
sink
)
{
this
.
type
=
type
;
this
.
type
=
type
;
this
.
hostname
=
hostname
;
this
.
hostname
=
hostname
;
this
.
port
=
port
;
this
.
port
=
port
;
...
@@ -45,6 +47,7 @@ public class FlinkCDCConfig {
...
@@ -45,6 +47,7 @@ public class FlinkCDCConfig {
this
.
table
=
table
;
this
.
table
=
table
;
this
.
startupMode
=
startupMode
;
this
.
startupMode
=
startupMode
;
this
.
debezium
=
debezium
;
this
.
debezium
=
debezium
;
this
.
source
=
source
;
this
.
sink
=
sink
;
this
.
sink
=
sink
;
}
}
...
@@ -124,6 +127,14 @@ public class FlinkCDCConfig {
...
@@ -124,6 +127,14 @@ public class FlinkCDCConfig {
return
table
;
return
table
;
}
}
public
Map
<
String
,
String
>
getSource
()
{
return
source
;
}
public
void
setSource
(
Map
<
String
,
String
>
source
)
{
this
.
source
=
source
;
}
public
void
setTable
(
String
table
)
{
public
void
setTable
(
String
table
)
{
this
.
table
=
table
;
this
.
table
=
table
;
}
}
...
...
dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
View file @
de28dc10
...
@@ -31,10 +31,11 @@ public class CDCSource {
...
@@ -31,10 +31,11 @@ public class CDCSource {
private
String
table
;
private
String
table
;
private
String
startupMode
;
private
String
startupMode
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
source
;
private
Map
<
String
,
String
>
sink
;
private
Map
<
String
,
String
>
sink
;
public
CDCSource
(
String
connector
,
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
startupMode
,
public
CDCSource
(
String
connector
,
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
startupMode
,
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
sink
)
{
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
s
ource
,
Map
<
String
,
String
>
s
ink
)
{
this
.
connector
=
connector
;
this
.
connector
=
connector
;
this
.
statement
=
statement
;
this
.
statement
=
statement
;
this
.
name
=
name
;
this
.
name
=
name
;
...
@@ -46,6 +47,7 @@ public class CDCSource {
...
@@ -46,6 +47,7 @@ public class CDCSource {
this
.
parallelism
=
parallelism
;
this
.
parallelism
=
parallelism
;
this
.
startupMode
=
startupMode
;
this
.
startupMode
=
startupMode
;
this
.
debezium
=
debezium
;
this
.
debezium
=
debezium
;
this
.
source
=
source
;
this
.
sink
=
sink
;
this
.
sink
=
sink
;
}
}
...
@@ -62,6 +64,16 @@ public class CDCSource {
...
@@ -62,6 +64,16 @@ public class CDCSource {
}
}
}
}
}
}
Map
<
String
,
String
>
source
=
new
HashMap
<>();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
entrySet
())
{
if
(
entry
.
getKey
().
startsWith
(
"source."
))
{
String
key
=
entry
.
getKey
();
key
=
key
.
replaceFirst
(
"source."
,
""
);
if
(!
source
.
containsKey
(
key
))
{
source
.
put
(
key
,
entry
.
getValue
());
}
}
}
Map
<
String
,
String
>
sink
=
new
HashMap
<>();
Map
<
String
,
String
>
sink
=
new
HashMap
<>();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
entrySet
())
{
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
entrySet
())
{
if
(
entry
.
getKey
().
startsWith
(
"sink."
))
{
if
(
entry
.
getKey
().
startsWith
(
"sink."
))
{
...
@@ -84,16 +96,17 @@ public class CDCSource {
...
@@ -84,16 +96,17 @@ public class CDCSource {
Integer
.
valueOf
(
config
.
get
(
"parallelism"
)),
Integer
.
valueOf
(
config
.
get
(
"parallelism"
)),
config
.
get
(
"scan.startup.mode"
),
config
.
get
(
"scan.startup.mode"
),
debezium
,
debezium
,
source
,
sink
sink
);
);
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"database"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"database
-name
"
)))
{
cdcSource
.
setDatabase
(
config
.
get
(
"database"
));
cdcSource
.
setDatabase
(
config
.
get
(
"database
-name
"
));
}
}
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"schema"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"schema
-name
"
)))
{
cdcSource
.
setSchema
(
config
.
get
(
"schema"
));
cdcSource
.
setSchema
(
config
.
get
(
"schema
-name
"
));
}
}
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"table"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"table
-name
"
)))
{
cdcSource
.
setTable
(
config
.
get
(
"table"
));
cdcSource
.
setTable
(
config
.
get
(
"table
-name
"
));
}
}
return
cdcSource
;
return
cdcSource
;
}
}
...
@@ -229,4 +242,12 @@ public class CDCSource {
...
@@ -229,4 +242,12 @@ public class CDCSource {
public
void
setDebezium
(
Map
<
String
,
String
>
debezium
)
{
public
void
setDebezium
(
Map
<
String
,
String
>
debezium
)
{
this
.
debezium
=
debezium
;
this
.
debezium
=
debezium
;
}
}
public
Map
<
String
,
String
>
getSource
()
{
return
source
;
}
public
void
setSource
(
Map
<
String
,
String
>
source
)
{
this
.
source
=
source
;
}
}
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
View file @
de28dc10
...
@@ -53,7 +53,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
...
@@ -53,7 +53,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCSource
cdcSource
=
CDCSource
.
build
(
statement
);
CDCSource
cdcSource
=
CDCSource
.
build
(
statement
);
FlinkCDCConfig
config
=
new
FlinkCDCConfig
(
cdcSource
.
getConnector
(),
cdcSource
.
getHostname
(),
cdcSource
.
getPort
(),
cdcSource
.
getUsername
()
FlinkCDCConfig
config
=
new
FlinkCDCConfig
(
cdcSource
.
getConnector
(),
cdcSource
.
getHostname
(),
cdcSource
.
getPort
(),
cdcSource
.
getUsername
()
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getSchema
()
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getSchema
()
,
cdcSource
.
getTable
(),
cdcSource
.
getStartupMode
(),
cdcSource
.
getDebezium
(),
cdcSource
.
getSink
());
,
cdcSource
.
getTable
(),
cdcSource
.
getStartupMode
(),
cdcSource
.
getDebezium
(),
cdcSource
.
getS
ource
(),
cdcSource
.
getS
ink
());
try
{
try
{
CDCBuilder
cdcBuilder
=
CDCBuilderFactory
.
buildCDCBuilder
(
config
);
CDCBuilder
cdcBuilder
=
CDCBuilderFactory
.
buildCDCBuilder
(
config
);
Map
<
String
,
Map
<
String
,
String
>>
allConfigMap
=
cdcBuilder
.
parseMetaDataConfigs
();
Map
<
String
,
Map
<
String
,
String
>>
allConfigMap
=
cdcBuilder
.
parseMetaDataConfigs
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment