Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
25463348
Unverified
Commit
25463348
authored
Apr 19, 2022
by
aiwenmo
Committed by
GitHub
Apr 19, 2022
Browse files
Options
Browse Files
Download
Plain Diff
[Feature-429][*] OracleCDCSource sync kafka topics
[Feature-429][*] OracleCDCSource sync kafka topics
parents
fc646434
64b8cf25
Changes
21
Hide whitespace changes
Inline
Side-by-side
Showing
21 changed files
with
147 additions
and
17 deletions
+147
-17
AbstractCDCBuilder.java
...-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+4
-0
CDCBuilder.java
...k-client-1.11/src/main/java/com/dlink/cdc/CDCBuilder.java
+2
-0
KafkaSinkBuilder.java
...1/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
+2
-1
MysqlCDCBuilder.java
...11/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+9
-2
AbstractCDCBuilder.java
...-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+4
-0
CDCBuilder.java
...k-client-1.12/src/main/java/com/dlink/cdc/CDCBuilder.java
+2
-0
KafkaSinkBuilder.java
...2/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
+2
-1
MysqlCDCBuilder.java
...12/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+9
-2
AbstractCDCBuilder.java
...-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+4
-0
CDCBuilder.java
...k-client-1.13/src/main/java/com/dlink/cdc/CDCBuilder.java
+2
-0
KafkaSinkBuilder.java
...3/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
+2
-1
MysqlCDCBuilder.java
...13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+13
-0
OracleCDCBuilder.java
.../src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
+8
-0
AbstractCDCBuilder.java
...-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+4
-0
CDCBuilder.java
...k-client-1.14/src/main/java/com/dlink/cdc/CDCBuilder.java
+2
-0
KafkaSinkBuilder.java
...4/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
+2
-1
MysqlCDCBuilder.java
...14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+16
-2
OracleCDCBuilder.java
.../src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
+8
-0
FlinkCDCConfig.java
...nt-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
+20
-1
CDCSource.java
...executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
+22
-1
CreateCDCSourceOperation.java
...in/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
+10
-5
No files found.
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
View file @
25463348
...
...
@@ -63,4 +63,8 @@ public abstract class AbstractCDCBuilder {
Collections
.
addAll
(
tableList
,
tables
);
return
tableList
;
}
public
String
getSchemaFieldName
()
{
return
"db"
;
}
}
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/CDCBuilder.java
View file @
25463348
...
...
@@ -30,4 +30,6 @@ public interface CDCBuilder {
Map
<
String
,
Map
<
String
,
String
>>
parseMetaDataConfigs
();
String
getInsertSQL
(
Table
table
,
String
sourceName
);
String
getSchemaFieldName
();
}
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
View file @
25463348
...
...
@@ -57,6 +57,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
new
SimpleStringSchema
()));
}
else
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
...
...
@@ -74,7 +75,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
"db"
).
toString
());
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
SingleOutputStreamOperator
<
String
>
stringOperator
=
filterOperator
.
map
(
new
MapFunction
<
Map
,
String
>()
{
...
...
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
View file @
25463348
...
...
@@ -8,6 +8,7 @@ import java.util.Collections;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
;
import
com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema
;
...
...
@@ -49,6 +50,12 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
MySQLSource
.
Builder
<
String
>
sourceBuilder
=
MySQLSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
...
...
@@ -60,8 +67,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
if
(
Asserts
.
isNotNullString
(
config
.
getTable
()))
{
sourceBuilder
.
tableList
(
config
.
getTable
().
split
(
FlinkParamConstant
.
SPLIT
));
}
sourceBuilder
.
deserializer
(
new
StringDebeziumDeserializationSchema
()
);
sourceBuilder
.
deserializer
(
new
StringDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
return
env
.
addSource
(
sourceBuilder
.
build
(),
"MySQL CDC Source"
);
}
...
...
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
View file @
25463348
...
...
@@ -63,4 +63,8 @@ public abstract class AbstractCDCBuilder {
Collections
.
addAll
(
tableList
,
tables
);
return
tableList
;
}
public
String
getSchemaFieldName
()
{
return
"db"
;
}
}
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/CDCBuilder.java
View file @
25463348
...
...
@@ -30,4 +30,6 @@ public interface CDCBuilder {
Map
<
String
,
Map
<
String
,
String
>>
parseMetaDataConfigs
();
String
getInsertSQL
(
Table
table
,
String
sourceName
);
String
getSchemaFieldName
();
}
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
View file @
25463348
...
...
@@ -57,6 +57,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
new
SimpleStringSchema
()));
}
else
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
...
...
@@ -74,7 +75,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
"db"
).
toString
());
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
SingleOutputStreamOperator
<
String
>
stringOperator
=
filterOperator
.
map
(
new
MapFunction
<
Map
,
String
>()
{
...
...
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
View file @
25463348
...
...
@@ -8,6 +8,7 @@ import java.util.Collections;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
;
import
com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions
;
...
...
@@ -50,6 +51,12 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
MySQLSource
.
Builder
<
String
>
sourceBuilder
=
MySQLSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
...
...
@@ -61,8 +68,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
if
(
Asserts
.
isNotNullString
(
config
.
getTable
()))
{
sourceBuilder
.
tableList
(
config
.
getTable
().
split
(
FlinkParamConstant
.
SPLIT
));
}
sourceBuilder
.
deserializer
(
new
StringDebeziumDeserializationSchema
()
);
sourceBuilder
.
deserializer
(
new
StringDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
View file @
25463348
...
...
@@ -63,4 +63,8 @@ public abstract class AbstractCDCBuilder {
Collections
.
addAll
(
tableList
,
tables
);
return
tableList
;
}
public
String
getSchemaFieldName
()
{
return
"schema"
;
}
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/CDCBuilder.java
View file @
25463348
...
...
@@ -30,4 +30,6 @@ public interface CDCBuilder {
Map
<
String
,
Map
<
String
,
String
>>
parseMetaDataConfigs
();
String
getInsertSQL
(
Table
table
,
String
sourceName
);
String
getSchemaFieldName
();
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
View file @
25463348
...
...
@@ -57,6 +57,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
new
SimpleStringSchema
()));
}
else
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
...
...
@@ -74,7 +75,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
"db"
).
toString
());
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
SingleOutputStreamOperator
<
String
>
stringOperator
=
filterOperator
.
map
(
new
MapFunction
<
Map
,
String
>()
{
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
View file @
25463348
...
...
@@ -9,6 +9,7 @@ import java.util.Collections;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
...
...
@@ -52,6 +53,12 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
...
...
@@ -67,6 +74,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sourceBuilder
.
tableList
(
table
);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
...
...
@@ -140,4 +148,9 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sb
.
append
(
"'"
);
return
sb
.
toString
();
}
@Override
public
String
getSchemaFieldName
()
{
return
"db"
;
}
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
View file @
25463348
...
...
@@ -6,6 +6,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
...
...
@@ -47,6 +48,12 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
OracleSource
.
Builder
<
String
>
sourceBuilder
=
OracleSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
...
...
@@ -62,6 +69,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sourceBuilder
.
tableList
(
table
);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
View file @
25463348
...
...
@@ -63,4 +63,8 @@ public abstract class AbstractCDCBuilder {
Collections
.
addAll
(
tableList
,
tables
);
return
tableList
;
}
public
String
getSchemaFieldName
()
{
return
"schema"
;
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/CDCBuilder.java
View file @
25463348
...
...
@@ -30,4 +30,6 @@ public interface CDCBuilder {
Map
<
String
,
Map
<
String
,
String
>>
parseMetaDataConfigs
();
String
getInsertSQL
(
Table
table
,
String
sourceName
);
String
getSchemaFieldName
();
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
View file @
25463348
...
...
@@ -63,6 +63,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.
build
());
}
else
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
...
...
@@ -80,7 +81,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
"db"
).
toString
());
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
SingleOutputStreamOperator
<
String
>
stringOperator
=
filterOperator
.
map
(
new
MapFunction
<
Map
,
String
>()
{
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
View file @
25463348
...
...
@@ -9,6 +9,7 @@ import java.util.Collections;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
...
...
@@ -52,6 +53,12 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
...
...
@@ -67,6 +74,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sourceBuilder
.
tableList
(
table
);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
...
...
@@ -133,11 +141,17 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
}
sb
.
append
(
" FROM "
);
sb
.
append
(
sourceName
);
/*
sb.append(" WHERE database_name = '");
sb
.
append
(
" WHERE database_name = '"
);
sb
.
append
(
table
.
getSchema
());
sb
.
append
(
"' and table_name = '"
);
sb
.
append
(
table
.
getName
());
sb.append("'");
*/
sb
.
append
(
"'"
);
return
sb
.
toString
();
}
@Override
public
String
getSchemaFieldName
()
{
return
"db"
;
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
View file @
25463348
...
...
@@ -6,6 +6,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
...
...
@@ -48,6 +49,12 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getDebezium
().
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
OracleSource
.
Builder
<
String
>
sourceBuilder
=
OracleSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
...
...
@@ -63,6 +70,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sourceBuilder
.
tableList
(
table
);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
sourceBuilder
.
debeziumProperties
(
properties
);
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
...
...
dlink-client/dlink-client-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
View file @
25463348
...
...
@@ -22,14 +22,16 @@ public class FlinkCDCConfig {
private
String
schema
;
private
String
table
;
private
String
startupMode
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
sink
;
private
List
<
Schema
>
schemaList
;
private
String
schemaFieldName
;
public
FlinkCDCConfig
()
{
}
public
FlinkCDCConfig
(
String
type
,
String
hostname
,
int
port
,
String
username
,
String
password
,
int
checkpoint
,
int
parallelism
,
String
database
,
String
schema
,
String
table
,
String
startupMode
,
Map
<
String
,
String
>
sink
)
{
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
sink
)
{
this
.
type
=
type
;
this
.
hostname
=
hostname
;
this
.
port
=
port
;
...
...
@@ -41,6 +43,7 @@ public class FlinkCDCConfig {
this
.
schema
=
schema
;
this
.
table
=
table
;
this
.
startupMode
=
startupMode
;
this
.
debezium
=
debezium
;
this
.
sink
=
sink
;
}
...
...
@@ -147,4 +150,20 @@ public class FlinkCDCConfig {
public
void
setSchemaList
(
List
<
Schema
>
schemaList
)
{
this
.
schemaList
=
schemaList
;
}
public
String
getSchemaFieldName
()
{
return
schemaFieldName
;
}
public
void
setSchemaFieldName
(
String
schemaFieldName
)
{
this
.
schemaFieldName
=
schemaFieldName
;
}
public
Map
<
String
,
String
>
getDebezium
()
{
return
debezium
;
}
public
void
setDebezium
(
Map
<
String
,
String
>
debezium
)
{
this
.
debezium
=
debezium
;
}
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
View file @
25463348
...
...
@@ -30,10 +30,11 @@ public class CDCSource {
private
String
schema
;
private
String
table
;
private
String
startupMode
;
private
Map
<
String
,
String
>
debezium
;
private
Map
<
String
,
String
>
sink
;
public
CDCSource
(
String
type
,
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
startupMode
,
Map
<
String
,
String
>
sink
)
{
Map
<
String
,
String
>
debezium
,
Map
<
String
,
String
>
sink
)
{
this
.
type
=
type
;
this
.
statement
=
statement
;
this
.
name
=
name
;
...
...
@@ -44,12 +45,23 @@ public class CDCSource {
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
startupMode
=
startupMode
;
this
.
debezium
=
debezium
;
this
.
sink
=
sink
;
}
public
static
CDCSource
build
(
String
statement
)
{
Map
<
String
,
List
<
String
>>
map
=
SingleSqlParserFactory
.
generateParser
(
statement
);
Map
<
String
,
String
>
config
=
getKeyValue
(
map
.
get
(
"WITH"
));
Map
<
String
,
String
>
debezium
=
new
HashMap
<>();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
entrySet
())
{
if
(
entry
.
getKey
().
startsWith
(
"debezium."
))
{
String
key
=
entry
.
getKey
();
key
=
key
.
replace
(
"debezium."
,
""
);
if
(!
debezium
.
containsKey
(
key
))
{
debezium
.
put
(
entry
.
getKey
().
replace
(
"debezium."
,
""
),
entry
.
getValue
());
}
}
}
Map
<
String
,
String
>
sink
=
new
HashMap
<>();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
entrySet
())
{
if
(
entry
.
getKey
().
startsWith
(
"sink."
))
{
...
...
@@ -71,6 +83,7 @@ public class CDCSource {
Integer
.
valueOf
(
config
.
get
(
"checkpoint"
)),
Integer
.
valueOf
(
config
.
get
(
"parallelism"
)),
config
.
get
(
"startup"
),
debezium
,
sink
);
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"database"
)))
{
...
...
@@ -208,4 +221,12 @@ public class CDCSource {
public
void
setStartupMode
(
String
startupMode
)
{
this
.
startupMode
=
startupMode
;
}
public
Map
<
String
,
String
>
getDebezium
()
{
return
debezium
;
}
public
void
setDebezium
(
Map
<
String
,
String
>
debezium
)
{
this
.
debezium
=
debezium
;
}
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
View file @
25463348
...
...
@@ -53,10 +53,11 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCSource
cdcSource
=
CDCSource
.
build
(
statement
);
FlinkCDCConfig
config
=
new
FlinkCDCConfig
(
cdcSource
.
getType
(),
cdcSource
.
getHostname
(),
cdcSource
.
getPort
(),
cdcSource
.
getUsername
()
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getSchema
()
,
cdcSource
.
getTable
(),
cdcSource
.
getStartupMode
(),
cdcSource
.
getSink
());
,
cdcSource
.
getTable
(),
cdcSource
.
getStartupMode
(),
cdcSource
.
getDebezium
(),
cdcSource
.
getSink
());
try
{
CDCBuilder
cdcBuilder
=
CDCBuilderFactory
.
buildCDCBuilder
(
config
);
Map
<
String
,
Map
<
String
,
String
>>
allConfigMap
=
cdcBuilder
.
parseMetaDataConfigs
();
config
.
setSchemaFieldName
(
cdcBuilder
.
getSchemaFieldName
());
List
<
Schema
>
schemaList
=
new
ArrayList
<>();
final
List
<
String
>
schemaNameList
=
cdcBuilder
.
getSchemaList
();
final
List
<
String
>
tableRegList
=
cdcBuilder
.
getTableList
();
...
...
@@ -69,11 +70,15 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
Driver
driver
=
Driver
.
build
(
driverConfig
);
final
List
<
Table
>
tables
=
driver
.
getTablesAndColumns
(
schemaName
);
for
(
Table
table
:
tables
)
{
for
(
String
tableReg
:
tableRegList
)
{
if
(
table
.
getSchemaTableName
().
matches
(
tableReg
)
&&
!
schema
.
getTables
().
contains
(
Table
.
build
(
table
.
getName
())))
{
schema
.
getTables
().
add
(
table
);
break
;
if
(
Asserts
.
isNotNullCollection
(
tableRegList
)){
for
(
String
tableReg
:
tableRegList
)
{
if
(
table
.
getSchemaTableName
().
matches
(
tableReg
)
&&
!
schema
.
getTables
().
contains
(
Table
.
build
(
table
.
getName
())))
{
schema
.
getTables
().
add
(
table
);
break
;
}
}
}
else
{
schema
.
getTables
().
add
(
table
);
}
}
schemaList
.
add
(
schema
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment