Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
1436b525
Unverified
Commit
1436b525
authored
May 03, 2022
by
George zhao
Committed by
GitHub
May 03, 2022
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'DataLinkDC:dev' into dev
parents
1039be7f
1fe5bb0c
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
130 additions
and
12 deletions
+130
-12
MysqlCDCBuilder.java
...13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+44
-1
MysqlCDCBuilder.java
...14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+44
-1
FlinkCDCConfig.java
...nt-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
+13
-2
CDCSource.java
...executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
+28
-7
CreateCDCSourceOperation.java
...in/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
+1
-1
No files found.
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
View file @
1436b525
...
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
...
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
java.time.Duration
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashMap
;
...
@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
...
@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
String
database
=
config
.
getDatabase
();
String
serverId
=
config
.
getSource
().
get
(
"server-id"
);
String
serverTimeZone
=
config
.
getSource
().
get
(
"server-time-zone"
);
String
fetchSize
=
config
.
getSource
().
get
(
"scan.snapshot.fetch.size"
);
String
connectTimeout
=
config
.
getSource
().
get
(
"connect.timeout"
);
String
connectMaxRetries
=
config
.
getSource
().
get
(
"connect.max-retries"
);
String
connectionPoolSize
=
config
.
getSource
().
get
(
"connection.pool.size"
);
String
heartbeatInterval
=
config
.
getSource
().
get
(
"heartbeat.interval"
);
Properties
properties
=
new
Properties
();
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
}
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
.
password
(
config
.
getPassword
());
String
database
=
config
.
getDatabase
();
if
(
Asserts
.
isNotNullString
(
database
))
{
if
(
Asserts
.
isNotNullString
(
database
))
{
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
sourceBuilder
.
databaseList
(
databases
);
sourceBuilder
.
databaseList
(
databases
);
}
else
{
}
else
{
sourceBuilder
.
databaseList
(
new
String
[
0
]);
sourceBuilder
.
databaseList
(
new
String
[
0
]);
}
}
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
}
else
{
}
else
{
sourceBuilder
.
tableList
(
new
String
[
0
]);
sourceBuilder
.
tableList
(
new
String
[
0
]);
}
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
case
"initial"
:
case
"initial"
:
...
@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
...
@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
}
else
{
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
if
(
Asserts
.
isNotNullString
(
serverId
))
{
sourceBuilder
.
serverId
(
serverId
);
}
if
(
Asserts
.
isNotNullString
(
serverTimeZone
))
{
sourceBuilder
.
serverTimeZone
(
serverTimeZone
);
}
if
(
Asserts
.
isNotNullString
(
fetchSize
))
{
sourceBuilder
.
fetchSize
(
Integer
.
valueOf
(
fetchSize
));
}
if
(
Asserts
.
isNotNullString
(
connectTimeout
))
{
sourceBuilder
.
connectTimeout
(
Duration
.
ofMillis
(
Long
.
valueOf
(
connectTimeout
)));
}
if
(
Asserts
.
isNotNullString
(
connectMaxRetries
))
{
sourceBuilder
.
connectMaxRetries
(
Integer
.
valueOf
(
connectMaxRetries
));
}
if
(
Asserts
.
isNotNullString
(
connectionPoolSize
))
{
sourceBuilder
.
connectionPoolSize
(
Integer
.
valueOf
(
connectionPoolSize
));
}
if
(
Asserts
.
isNotNullString
(
heartbeatInterval
))
{
sourceBuilder
.
heartbeatInterval
(
Duration
.
ofMillis
(
Long
.
valueOf
(
heartbeatInterval
)));
}
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
}
}
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
View file @
1436b525
...
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
...
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
java.time.Duration
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashMap
;
...
@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
...
@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
String
database
=
config
.
getDatabase
();
String
serverId
=
config
.
getSource
().
get
(
"server-id"
);
String
serverTimeZone
=
config
.
getSource
().
get
(
"server-time-zone"
);
String
fetchSize
=
config
.
getSource
().
get
(
"scan.snapshot.fetch.size"
);
String
connectTimeout
=
config
.
getSource
().
get
(
"connect.timeout"
);
String
connectMaxRetries
=
config
.
getSource
().
get
(
"connect.max-retries"
);
String
connectionPoolSize
=
config
.
getSource
().
get
(
"connection.pool.size"
);
String
heartbeatInterval
=
config
.
getSource
().
get
(
"heartbeat.interval"
);
Properties
properties
=
new
Properties
();
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
}
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
.
password
(
config
.
getPassword
());
String
database
=
config
.
getDatabase
();
if
(
Asserts
.
isNotNullString
(
database
))
{
if
(
Asserts
.
isNotNullString
(
database
))
{
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
sourceBuilder
.
databaseList
(
databases
);
sourceBuilder
.
databaseList
(
databases
);
}
else
{
}
else
{
sourceBuilder
.
databaseList
(
new
String
[
0
]);
sourceBuilder
.
databaseList
(
new
String
[
0
]);
}
}
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
List
<
String
>
schemaTableNameList
=
config
.
getSchemaTableNameList
();
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
if
(
Asserts
.
isNotNullCollection
(
schemaTableNameList
))
{
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
sourceBuilder
.
tableList
(
schemaTableNameList
.
toArray
(
new
String
[
schemaTableNameList
.
size
()]));
}
else
{
}
else
{
sourceBuilder
.
tableList
(
new
String
[
0
]);
sourceBuilder
.
tableList
(
new
String
[
0
]);
}
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
switch
(
config
.
getStartupMode
().
toLowerCase
())
{
case
"initial"
:
case
"initial"
:
...
@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
...
@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
}
else
{
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
if
(
Asserts
.
isNotNullString
(
serverId
))
{
sourceBuilder
.
serverId
(
serverId
);
}
if
(
Asserts
.
isNotNullString
(
serverTimeZone
))
{
sourceBuilder
.
serverTimeZone
(
serverTimeZone
);
}
if
(
Asserts
.
isNotNullString
(
fetchSize
))
{
sourceBuilder
.
fetchSize
(
Integer
.
valueOf
(
fetchSize
));
}
if
(
Asserts
.
isNotNullString
(
connectTimeout
))
{
sourceBuilder
.
connectTimeout
(
Duration
.
ofMillis
(
Long
.
valueOf
(
connectTimeout
)));
}
if
(
Asserts
.
isNotNullString
(
connectMaxRetries
))
{
sourceBuilder
.
connectMaxRetries
(
Integer
.
valueOf
(
connectMaxRetries
));
}
if
(
Asserts
.
isNotNullString
(
connectionPoolSize
))
{
sourceBuilder
.
connectionPoolSize
(
Integer
.
valueOf
(
connectionPoolSize
));
}
if
(
Asserts
.
isNotNullString
(
heartbeatInterval
))
{
sourceBuilder
.
heartbeatInterval
(
Duration
.
ofMillis
(
Long
.
valueOf
(
heartbeatInterval
)));
}
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
}
}
...
...
dlink-client/dlink-client-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
View file @
1436b525
...
@@ -24,6 +24,7 @@ public class FlinkCDCConfig {
...
@@ -24,6 +24,7 @@ public class FlinkCDCConfig {
private
List
<
String
>
schemaTableNameList
;
private
List
<
String
>
schemaTableNameList
;
private
String
startupMode
;
private
String
startupMode
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
source
;
private
Map
<
String
,
String
>
sink
;
private
Map
<
String
,
String
>
sink
;
private
List
<
Schema
>
schemaList
;
private
List
<
Schema
>
schemaList
;
private
String
schemaFieldName
;
private
String
schemaFieldName
;
...
@@ -31,8 +32,9 @@ public class FlinkCDCConfig {
...
@@ -31,8 +32,9 @@ public class FlinkCDCConfig {
public
FlinkCDCConfig
()
{
public
FlinkCDCConfig
()
{
}
}
public
FlinkCDCConfig
(
String
type
,
String
hostname
,
int
port
,
String
username
,
String
password
,
int
checkpoint
,
int
parallelism
,
String
database
,
String
schema
,
String
table
,
String
startupMode
,
public
FlinkCDCConfig
(
String
type
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
database
,
String
schema
,
String
table
,
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
sink
)
{
String
startupMode
,
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
source
,
Map
<
String
,
String
>
sink
)
{
this
.
type
=
type
;
this
.
type
=
type
;
this
.
hostname
=
hostname
;
this
.
hostname
=
hostname
;
this
.
port
=
port
;
this
.
port
=
port
;
...
@@ -45,6 +47,7 @@ public class FlinkCDCConfig {
...
@@ -45,6 +47,7 @@ public class FlinkCDCConfig {
this
.
table
=
table
;
this
.
table
=
table
;
this
.
startupMode
=
startupMode
;
this
.
startupMode
=
startupMode
;
this
.
debezium
=
debezium
;
this
.
debezium
=
debezium
;
this
.
source
=
source
;
this
.
sink
=
sink
;
this
.
sink
=
sink
;
}
}
...
@@ -124,6 +127,14 @@ public class FlinkCDCConfig {
...
@@ -124,6 +127,14 @@ public class FlinkCDCConfig {
return
table
;
return
table
;
}
}
public
Map
<
String
,
String
>
getSource
()
{
return
source
;
}
public
void
setSource
(
Map
<
String
,
String
>
source
)
{
this
.
source
=
source
;
}
public
void
setTable
(
String
table
)
{
public
void
setTable
(
String
table
)
{
this
.
table
=
table
;
this
.
table
=
table
;
}
}
...
...
dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
View file @
1436b525
...
@@ -31,10 +31,11 @@ public class CDCSource {
...
@@ -31,10 +31,11 @@ public class CDCSource {
private
String
table
;
private
String
table
;
private
String
startupMode
;
private
String
startupMode
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
source
;
private
Map
<
String
,
String
>
sink
;
private
Map
<
String
,
String
>
sink
;
public
CDCSource
(
String
connector
,
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
startupMode
,
public
CDCSource
(
String
connector
,
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
startupMode
,
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
sink
)
{
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
s
ource
,
Map
<
String
,
String
>
s
ink
)
{
this
.
connector
=
connector
;
this
.
connector
=
connector
;
this
.
statement
=
statement
;
this
.
statement
=
statement
;
this
.
name
=
name
;
this
.
name
=
name
;
...
@@ -46,6 +47,7 @@ public class CDCSource {
...
@@ -46,6 +47,7 @@ public class CDCSource {
this
.
parallelism
=
parallelism
;
this
.
parallelism
=
parallelism
;
this
.
startupMode
=
startupMode
;
this
.
startupMode
=
startupMode
;
this
.
debezium
=
debezium
;
this
.
debezium
=
debezium
;
this
.
source
=
source
;
this
.
sink
=
sink
;
this
.
sink
=
sink
;
}
}
...
@@ -62,6 +64,16 @@ public class CDCSource {
...
@@ -62,6 +64,16 @@ public class CDCSource {
}
}
}
}
}
}
Map
<
String
,
String
>
source
=
new
HashMap
<>();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
entrySet
())
{
if
(
entry
.
getKey
().
startsWith
(
"source."
))
{
String
key
=
entry
.
getKey
();
key
=
key
.
replaceFirst
(
"source."
,
""
);
if
(!
source
.
containsKey
(
key
))
{
source
.
put
(
key
,
entry
.
getValue
());
}
}
}
Map
<
String
,
String
>
sink
=
new
HashMap
<>();
Map
<
String
,
String
>
sink
=
new
HashMap
<>();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
entrySet
())
{
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
entrySet
())
{
if
(
entry
.
getKey
().
startsWith
(
"sink."
))
{
if
(
entry
.
getKey
().
startsWith
(
"sink."
))
{
...
@@ -84,16 +96,17 @@ public class CDCSource {
...
@@ -84,16 +96,17 @@ public class CDCSource {
Integer
.
valueOf
(
config
.
get
(
"parallelism"
)),
Integer
.
valueOf
(
config
.
get
(
"parallelism"
)),
config
.
get
(
"scan.startup.mode"
),
config
.
get
(
"scan.startup.mode"
),
debezium
,
debezium
,
source
,
sink
sink
);
);
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"database"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"database
-name
"
)))
{
cdcSource
.
setDatabase
(
config
.
get
(
"database"
));
cdcSource
.
setDatabase
(
config
.
get
(
"database
-name
"
));
}
}
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"schema"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"schema
-name
"
)))
{
cdcSource
.
setSchema
(
config
.
get
(
"schema"
));
cdcSource
.
setSchema
(
config
.
get
(
"schema
-name
"
));
}
}
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"table"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"table
-name
"
)))
{
cdcSource
.
setTable
(
config
.
get
(
"table"
));
cdcSource
.
setTable
(
config
.
get
(
"table
-name
"
));
}
}
return
cdcSource
;
return
cdcSource
;
}
}
...
@@ -229,4 +242,12 @@ public class CDCSource {
...
@@ -229,4 +242,12 @@ public class CDCSource {
public
void
setDebezium
(
Map
<
String
,
String
>
debezium
)
{
public
void
setDebezium
(
Map
<
String
,
String
>
debezium
)
{
this
.
debezium
=
debezium
;
this
.
debezium
=
debezium
;
}
}
public
Map
<
String
,
String
>
getSource
()
{
return
source
;
}
public
void
setSource
(
Map
<
String
,
String
>
source
)
{
this
.
source
=
source
;
}
}
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
View file @
1436b525
...
@@ -53,7 +53,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
...
@@ -53,7 +53,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCSource
cdcSource
=
CDCSource
.
build
(
statement
);
CDCSource
cdcSource
=
CDCSource
.
build
(
statement
);
FlinkCDCConfig
config
=
new
FlinkCDCConfig
(
cdcSource
.
getConnector
(),
cdcSource
.
getHostname
(),
cdcSource
.
getPort
(),
cdcSource
.
getUsername
()
FlinkCDCConfig
config
=
new
FlinkCDCConfig
(
cdcSource
.
getConnector
(),
cdcSource
.
getHostname
(),
cdcSource
.
getPort
(),
cdcSource
.
getUsername
()
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getSchema
()
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getSchema
()
,
cdcSource
.
getTable
(),
cdcSource
.
getStartupMode
(),
cdcSource
.
getDebezium
(),
cdcSource
.
getSink
());
,
cdcSource
.
getTable
(),
cdcSource
.
getStartupMode
(),
cdcSource
.
getDebezium
(),
cdcSource
.
getS
ource
(),
cdcSource
.
getS
ink
());
try
{
try
{
CDCBuilder
cdcBuilder
=
CDCBuilderFactory
.
buildCDCBuilder
(
config
);
CDCBuilder
cdcBuilder
=
CDCBuilderFactory
.
buildCDCBuilder
(
config
);
Map
<
String
,
Map
<
String
,
String
>>
allConfigMap
=
cdcBuilder
.
parseMetaDataConfigs
();
Map
<
String
,
Map
<
String
,
String
>>
allConfigMap
=
cdcBuilder
.
parseMetaDataConfigs
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment