Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
2d03d13b
Unverified
Commit
2d03d13b
authored
Apr 12, 2022
by
aiwenmo
Committed by
GitHub
Apr 12, 2022
Browse files
Options
Browse Files
Download
Plain Diff
[Feature-389][client,executor] Add OracleCDCSourceMerge
[Feature-389][client,executor] Add OracleCDCSourceMerge
parents
3ad56871
522cefb0
Changes
31
Show whitespace changes
Inline
Side-by-side
Showing
31 changed files
with
921 additions
and
185 deletions
+921
-185
AbstractCDCBuilder.java
...-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+29
-0
CDCBuilder.java
...k-client-1.11/src/main/java/com/dlink/cdc/CDCBuilder.java
+21
-0
CDCBuilderFactory.java
...t-1.11/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
+31
-0
FlinkCDCMergeBuilder.java
....11/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+1
-16
MysqlCDCBuilder.java
...11/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+58
-0
AbstractCDCBuilder.java
...-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+29
-0
CDCBuilder.java
...k-client-1.12/src/main/java/com/dlink/cdc/CDCBuilder.java
+21
-0
CDCBuilderFactory.java
...t-1.12/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
+31
-0
FlinkCDCMergeBuilder.java
....12/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+2
-31
MysqlCDCBuilder.java
...12/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+76
-0
pom.xml
dlink-client/dlink-client-1.13/pom.xml
+5
-0
AbstractCDCBuilder.java
...-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+29
-0
CDCBuilder.java
...k-client-1.13/src/main/java/com/dlink/cdc/CDCBuilder.java
+21
-0
CDCBuilderFactory.java
...t-1.13/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
+33
-0
FlinkCDCMergeBuilder.java
....13/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+1
-35
MysqlCDCBuilder.java
...13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+80
-0
OracleCDCBuilder.java
.../src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
+74
-0
pom.xml
dlink-client/dlink-client-1.14/pom.xml
+5
-0
AbstractCDCBuilder.java
...-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+29
-0
CDCBuilder.java
...k-client-1.14/src/main/java/com/dlink/cdc/CDCBuilder.java
+21
-0
CDCBuilderFactory.java
...t-1.14/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
+33
-0
FlinkCDCMergeBuilder.java
....14/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
+1
-35
MysqlCDCBuilder.java
...14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
+80
-0
OracleCDCBuilder.java
.../src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
+74
-0
FlinkParamConstant.java
.../src/main/java/com/dlink/constant/FlinkParamConstant.java
+2
-0
FlinkClientException.java
...c/main/java/com/dlink/exception/FlinkClientException.java
+19
-0
FlinkCDCConfig.java
...nt-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
+28
-9
SqlParserTest.java
dlink-core/src/test/java/com/dlink/core/SqlParserTest.java
+31
-28
CreateCDCSourceSqlParser.java
.../main/java/com/dlink/parser/CreateCDCSourceSqlParser.java
+1
-1
CDCSource.java
...executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
+50
-26
CreateCDCSourceOperation.java
...in/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
+5
-4
No files found.
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public
abstract
class
AbstractCDCBuilder
{
protected
FlinkCDCConfig
config
;
public
AbstractCDCBuilder
()
{
}
public
AbstractCDCBuilder
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
public
FlinkCDCConfig
getConfig
()
{
return
config
;
}
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
}
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/CDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public
interface
CDCBuilder
{
String
getHandle
();
CDCBuilder
create
(
FlinkCDCConfig
config
);
DataStreamSource
build
(
StreamExecutionEnvironment
env
);
}
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.mysql.MysqlCDCBuilder
;
import
com.dlink.exception.FlinkClientException
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public
class
CDCBuilderFactory
{
private
static
CDCBuilder
[]
cdcBuilders
=
{
new
MysqlCDCBuilder
()
};
public
static
CDCBuilder
buildCDCBuilder
(
FlinkCDCConfig
config
)
{
if
(
Asserts
.
isNull
(
config
)
||
Asserts
.
isNullString
(
config
.
getType
()))
{
throw
new
FlinkClientException
(
"请指定 CDC Source 类型。"
);
}
for
(
int
i
=
0
;
i
<
cdcBuilders
.
length
;
i
++)
{
if
(
config
.
getType
().
equals
(
cdcBuilders
[
i
].
getHandle
()))
{
return
cdcBuilders
[
i
].
create
(
config
);
}
}
throw
new
FlinkClientException
(
"未匹配到对应 CDC Source 类型的【"
+
config
.
getType
()
+
"】。"
);
}
}
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
2d03d13b
...
@@ -5,8 +5,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
...
@@ -5,8 +5,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
;
import
com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.FlinkCDCConfig
;
...
@@ -25,20 +23,7 @@ public class FlinkCDCMergeBuilder {
...
@@ -25,20 +23,7 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
}
}
MySQLSource
.
Builder
<
String
>
sourceBuilder
=
MySQLSource
.<
String
>
builder
()
DataStreamSource
<
String
>
streamSource
=
CDCBuilderFactory
.
buildCDCBuilder
(
config
).
build
(
env
);
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNull
(
config
.
getDatabase
())
&&
config
.
getDatabase
().
size
()
>
0
)
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
toArray
(
new
String
[
0
]));
}
if
(
Asserts
.
isNotNull
(
config
.
getTable
())
&&
config
.
getTable
().
size
()
>
0
)
{
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
sourceBuilder
.
deserializer
(
new
StringDebeziumDeserializationSchema
());
DataStreamSource
<
String
>
streamSource
=
env
.
addSource
(
sourceBuilder
.
build
(),
"MySQL CDC Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
}
...
...
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
.
mysql
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
;
import
com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public
class
MysqlCDCBuilder
extends
AbstractCDCBuilder
implements
CDCBuilder
{
private
String
KEY_WORD
=
"mysql-cdc"
;
public
MysqlCDCBuilder
()
{
}
public
MysqlCDCBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
CDCBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
MysqlCDCBuilder
(
config
);
}
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
MySQLSource
.
Builder
<
String
>
sourceBuilder
=
MySQLSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNullString
(
config
.
getDatabase
()))
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
split
(
FlinkParamConstant
.
SPLIT
));
}
if
(
Asserts
.
isNotNullString
(
config
.
getTable
()))
{
sourceBuilder
.
tableList
(
config
.
getTable
().
split
(
FlinkParamConstant
.
SPLIT
));
}
sourceBuilder
.
deserializer
(
new
StringDebeziumDeserializationSchema
());
return
env
.
addSource
(
sourceBuilder
.
build
(),
"MySQL CDC Source"
);
}
}
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public
abstract
class
AbstractCDCBuilder
{
protected
FlinkCDCConfig
config
;
public
AbstractCDCBuilder
()
{
}
public
AbstractCDCBuilder
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
public
FlinkCDCConfig
getConfig
()
{
return
config
;
}
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
}
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/CDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public
interface
CDCBuilder
{
String
getHandle
();
CDCBuilder
create
(
FlinkCDCConfig
config
);
DataStreamSource
build
(
StreamExecutionEnvironment
env
);
}
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.mysql.MysqlCDCBuilder
;
import
com.dlink.exception.FlinkClientException
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public
class
CDCBuilderFactory
{
private
static
CDCBuilder
[]
cdcBuilders
=
{
new
MysqlCDCBuilder
()
};
public
static
CDCBuilder
buildCDCBuilder
(
FlinkCDCConfig
config
)
{
if
(
Asserts
.
isNull
(
config
)
||
Asserts
.
isNullString
(
config
.
getType
()))
{
throw
new
FlinkClientException
(
"请指定 CDC Source 类型。"
);
}
for
(
int
i
=
0
;
i
<
cdcBuilders
.
length
;
i
++)
{
if
(
config
.
getType
().
equals
(
cdcBuilders
[
i
].
getHandle
()))
{
return
cdcBuilders
[
i
].
create
(
config
);
}
}
throw
new
FlinkClientException
(
"未匹配到对应 CDC Source 类型的【"
+
config
.
getType
()
+
"】。"
);
}
}
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
2d03d13b
...
@@ -9,6 +9,7 @@ import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
...
@@ -9,6 +9,7 @@ import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import
com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema
;
import
com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.FlinkCDCConfig
;
/**
/**
...
@@ -26,37 +27,7 @@ public class FlinkCDCMergeBuilder {
...
@@ -26,37 +27,7 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
}
}
MySQLSource
.
Builder
<
String
>
sourceBuilder
=
MySQLSource
.<
String
>
builder
()
DataStreamSource
<
String
>
streamSource
=
CDCBuilderFactory
.
buildCDCBuilder
(
config
).
build
(
env
);
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNull
(
config
.
getDatabase
())
&&
config
.
getDatabase
().
size
()
>
0
)
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
toArray
(
new
String
[
0
]));
}
if
(
Asserts
.
isNotNull
(
config
.
getTable
())
&&
config
.
getTable
().
size
()
>
0
)
{
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
sourceBuilder
.
deserializer
(
new
StringDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
DataStreamSource
<
String
>
streamSource
=
env
.
addSource
(
sourceBuilder
.
build
(),
"MySQL CDC Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
}
...
...
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
.
mysql
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
;
import
com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public
class
MysqlCDCBuilder
extends
AbstractCDCBuilder
implements
CDCBuilder
{
private
String
KEY_WORD
=
"mysql-cdc"
;
public
MysqlCDCBuilder
()
{
}
public
MysqlCDCBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
CDCBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
MysqlCDCBuilder
(
config
);
}
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
MySQLSource
.
Builder
<
String
>
sourceBuilder
=
MySQLSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNullString
(
config
.
getDatabase
()))
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
split
(
FlinkParamConstant
.
SPLIT
));
}
if
(
Asserts
.
isNotNullString
(
config
.
getTable
()))
{
sourceBuilder
.
tableList
(
config
.
getTable
().
split
(
FlinkParamConstant
.
SPLIT
));
}
sourceBuilder
.
deserializer
(
new
StringDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
return
env
.
addSource
(
sourceBuilder
.
build
(),
"MySQL CDC Source"
);
}
}
dlink-client/dlink-client-1.13/pom.xml
View file @
2d03d13b
...
@@ -110,6 +110,11 @@
...
@@ -110,6 +110,11 @@
<artifactId>
flink-connector-mysql-cdc
</artifactId>
<artifactId>
flink-connector-mysql-cdc
</artifactId>
<version>
${flinkcdc.version}
</version>
<version>
${flinkcdc.version}
</version>
</dependency>
</dependency>
<dependency>
<groupId>
com.ververica
</groupId>
<artifactId>
flink-connector-oracle-cdc
</artifactId>
<version>
${flinkcdc.version}
</version>
</dependency>
<dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
<artifactId>
slf4j-api
</artifactId>
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public
abstract
class
AbstractCDCBuilder
{
protected
FlinkCDCConfig
config
;
public
AbstractCDCBuilder
()
{
}
public
AbstractCDCBuilder
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
public
FlinkCDCConfig
getConfig
()
{
return
config
;
}
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/CDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public
interface
CDCBuilder
{
String
getHandle
();
CDCBuilder
create
(
FlinkCDCConfig
config
);
DataStreamSource
build
(
StreamExecutionEnvironment
env
);
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.mysql.MysqlCDCBuilder
;
import
com.dlink.cdc.oracle.OracleCDCBuilder
;
import
com.dlink.exception.FlinkClientException
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public
class
CDCBuilderFactory
{
private
static
CDCBuilder
[]
cdcBuilders
=
{
new
MysqlCDCBuilder
(),
new
OracleCDCBuilder
()
};
public
static
CDCBuilder
buildCDCBuilder
(
FlinkCDCConfig
config
)
{
if
(
Asserts
.
isNull
(
config
)
||
Asserts
.
isNullString
(
config
.
getType
()))
{
throw
new
FlinkClientException
(
"请指定 CDC Source 类型。"
);
}
for
(
int
i
=
0
;
i
<
cdcBuilders
.
length
;
i
++)
{
if
(
config
.
getType
().
equals
(
cdcBuilders
[
i
].
getHandle
()))
{
return
cdcBuilders
[
i
].
create
(
config
);
}
}
throw
new
FlinkClientException
(
"未匹配到对应 CDC Source 类型的【"
+
config
.
getType
()
+
"】。"
);
}
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
package
com
.
dlink
.
cdc
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
...
@@ -8,10 +7,6 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
...
@@ -8,10 +7,6 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import
com.dlink.assertion.Asserts
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSource
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder
;
import
com.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
/**
/**
* FlinkCDCMergeBuilder
* FlinkCDCMergeBuilder
...
@@ -28,36 +23,7 @@ public class FlinkCDCMergeBuilder {
...
@@ -28,36 +23,7 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
}
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
DataStreamSource
<
String
>
streamSource
=
CDCBuilderFactory
.
buildCDCBuilder
(
config
).
build
(
env
);
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNull
(
config
.
getDatabase
())
&&
config
.
getDatabase
().
size
()
>
0
)
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
toArray
(
new
String
[
0
]));
}
if
(
Asserts
.
isNotNull
(
config
.
getTable
())
&&
config
.
getTable
().
size
()
>
0
)
{
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
streamSource
.
addSink
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
}
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
.
mysql
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSource
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder
;
import
com.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public
class
MysqlCDCBuilder
extends
AbstractCDCBuilder
implements
CDCBuilder
{
private
String
KEY_WORD
=
"mysql-cdc"
;
public
MysqlCDCBuilder
()
{
}
public
MysqlCDCBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
CDCBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
MysqlCDCBuilder
(
config
);
}
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
String
database
=
config
.
getDatabase
();
if
(
Asserts
.
isNotNullString
(
database
))
{
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
sourceBuilder
.
databaseList
(
databases
);
}
String
table
=
config
.
getTable
();
if
(
Asserts
.
isNotNullString
(
table
))
{
sourceBuilder
.
tableList
(
table
);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
}
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
.
oracle
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.oracle.OracleSource
;
import
com.ververica.cdc.connectors.oracle.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public
class
OracleCDCBuilder
extends
AbstractCDCBuilder
implements
CDCBuilder
{
private
String
KEY_WORD
=
"oracle-cdc"
;
public
OracleCDCBuilder
()
{
}
public
OracleCDCBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
CDCBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
OracleCDCBuilder
(
config
);
}
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
OracleSource
.
Builder
<
String
>
sourceBuilder
=
OracleSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
())
.
database
(
config
.
getDatabase
());
String
schema
=
config
.
getSchema
();
if
(
Asserts
.
isNotNullString
(
schema
))
{
sourceBuilder
.
schemaList
(
schema
);
}
String
table
=
config
.
getTable
();
if
(
Asserts
.
isNotNullString
(
table
))
{
sourceBuilder
.
tableList
(
table
);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"LATEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
return
env
.
addSource
(
sourceBuilder
.
build
(),
"Oracle CDC Source"
);
}
}
dlink-client/dlink-client-1.14/pom.xml
View file @
2d03d13b
...
@@ -95,6 +95,11 @@
...
@@ -95,6 +95,11 @@
<artifactId>
flink-connector-mysql-cdc
</artifactId>
<artifactId>
flink-connector-mysql-cdc
</artifactId>
<version>
${flinkcdc.version}
</version>
<version>
${flinkcdc.version}
</version>
</dependency>
</dependency>
<dependency>
<groupId>
com.ververica
</groupId>
<artifactId>
flink-connector-oracle-cdc
</artifactId>
<version>
${flinkcdc.version}
</version>
</dependency>
<dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
<artifactId>
slf4j-api
</artifactId>
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public
abstract
class
AbstractCDCBuilder
{
protected
FlinkCDCConfig
config
;
public
AbstractCDCBuilder
()
{
}
public
AbstractCDCBuilder
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
public
FlinkCDCConfig
getConfig
()
{
return
config
;
}
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/CDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public
interface
CDCBuilder
{
String
getHandle
();
CDCBuilder
create
(
FlinkCDCConfig
config
);
DataStreamSource
build
(
StreamExecutionEnvironment
env
);
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/CDCBuilderFactory.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.mysql.MysqlCDCBuilder
;
import
com.dlink.cdc.oracle.OracleCDCBuilder
;
import
com.dlink.exception.FlinkClientException
;
import
com.dlink.model.FlinkCDCConfig
;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public
class
CDCBuilderFactory
{
private
static
CDCBuilder
[]
cdcBuilders
=
{
new
MysqlCDCBuilder
(),
new
OracleCDCBuilder
()
};
public
static
CDCBuilder
buildCDCBuilder
(
FlinkCDCConfig
config
)
{
if
(
Asserts
.
isNull
(
config
)
||
Asserts
.
isNullString
(
config
.
getType
()))
{
throw
new
FlinkClientException
(
"请指定 CDC Source 类型。"
);
}
for
(
int
i
=
0
;
i
<
cdcBuilders
.
length
;
i
++)
{
if
(
config
.
getType
().
equals
(
cdcBuilders
[
i
].
getHandle
()))
{
return
cdcBuilders
[
i
].
create
(
config
);
}
}
throw
new
FlinkClientException
(
"未匹配到对应 CDC Source 类型的【"
+
config
.
getType
()
+
"】。"
);
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/FlinkCDCMergeBuilder.java
View file @
2d03d13b
package
com
.
dlink
.
cdc
;
package
com
.
dlink
.
cdc
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
;
import
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
;
import
org.apache.flink.connector.kafka.sink.KafkaSink
;
import
org.apache.flink.connector.kafka.sink.KafkaSink
;
...
@@ -9,10 +8,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...
@@ -9,10 +8,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
com.dlink.assertion.Asserts
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSource
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder
;
import
com.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
/**
/**
* FlinkCDCMergeBuilder
* FlinkCDCMergeBuilder
...
@@ -29,36 +24,7 @@ public class FlinkCDCMergeBuilder {
...
@@ -29,36 +24,7 @@ public class FlinkCDCMergeBuilder {
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
if
(
Asserts
.
isNotNull
(
config
.
getCheckpoint
()))
{
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
env
.
enableCheckpointing
(
config
.
getCheckpoint
());
}
}
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
DataStreamSource
<
String
>
streamSource
=
CDCBuilderFactory
.
buildCDCBuilder
(
config
).
build
(
env
);
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
if
(
Asserts
.
isNotNull
(
config
.
getDatabase
())
&&
config
.
getDatabase
().
size
()
>
0
)
{
sourceBuilder
.
databaseList
(
config
.
getDatabase
().
toArray
(
new
String
[
0
]));
}
if
(
Asserts
.
isNotNull
(
config
.
getTable
())
&&
config
.
getTable
().
size
()
>
0
)
{
sourceBuilder
.
tableList
(
config
.
getTable
().
toArray
(
new
String
[
0
]));
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
DataStreamSource
<
String
>
streamSource
=
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
streamSource
.
sinkTo
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
streamSource
.
sinkTo
(
getKafkaProducer
(
config
.
getBrokers
(),
config
.
getTopic
()));
}
}
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
.
mysql
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSource
;
import
com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder
;
import
com.ververica.cdc.connectors.mysql.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public
class
MysqlCDCBuilder
extends
AbstractCDCBuilder
implements
CDCBuilder
{
private
String
KEY_WORD
=
"mysql-cdc"
;
public
MysqlCDCBuilder
()
{
}
public
MysqlCDCBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
CDCBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
MysqlCDCBuilder
(
config
);
}
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
MySqlSourceBuilder
<
String
>
sourceBuilder
=
MySqlSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
());
String
database
=
config
.
getDatabase
();
if
(
Asserts
.
isNotNullString
(
database
))
{
String
[]
databases
=
database
.
split
(
FlinkParamConstant
.
SPLIT
);
sourceBuilder
.
databaseList
(
databases
);
}
String
table
=
config
.
getTable
();
if
(
Asserts
.
isNotNullString
(
table
))
{
sourceBuilder
.
tableList
(
table
);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"EARLIEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
earliest
());
break
;
case
"LATEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
return
env
.
fromSource
(
sourceBuilder
.
build
(),
WatermarkStrategy
.
noWatermarks
(),
"MySQL CDC Source"
);
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/oracle/OracleCDCBuilder.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
cdc
.
oracle
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractCDCBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.ververica.cdc.connectors.oracle.OracleSource
;
import
com.ververica.cdc.connectors.oracle.table.StartupOptions
;
import
com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public
class
OracleCDCBuilder
extends
AbstractCDCBuilder
implements
CDCBuilder
{
private
String
KEY_WORD
=
"oracle-cdc"
;
public
OracleCDCBuilder
()
{
}
public
OracleCDCBuilder
(
FlinkCDCConfig
config
)
{
super
(
config
);
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
}
@Override
public
CDCBuilder
create
(
FlinkCDCConfig
config
)
{
return
new
OracleCDCBuilder
(
config
);
}
@Override
public
DataStreamSource
<
String
>
build
(
StreamExecutionEnvironment
env
)
{
OracleSource
.
Builder
<
String
>
sourceBuilder
=
OracleSource
.<
String
>
builder
()
.
hostname
(
config
.
getHostname
())
.
port
(
config
.
getPort
())
.
username
(
config
.
getUsername
())
.
password
(
config
.
getPassword
())
.
database
(
config
.
getDatabase
());
String
schema
=
config
.
getSchema
();
if
(
Asserts
.
isNotNullString
(
schema
))
{
sourceBuilder
.
schemaList
(
schema
);
}
String
table
=
config
.
getTable
();
if
(
Asserts
.
isNotNullString
(
table
))
{
sourceBuilder
.
tableList
(
table
);
}
sourceBuilder
.
deserializer
(
new
JsonDebeziumDeserializationSchema
());
if
(
Asserts
.
isNotNullString
(
config
.
getStartupMode
()))
{
switch
(
config
.
getStartupMode
().
toUpperCase
())
{
case
"INITIAL"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
initial
());
break
;
case
"LATEST"
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
break
;
default
:
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
}
else
{
sourceBuilder
.
startupOptions
(
StartupOptions
.
latest
());
}
return
env
.
addSource
(
sourceBuilder
.
build
(),
"Oracle CDC Source"
);
}
}
dlink-client/dlink-client-base/src/main/java/com/dlink/constant/FlinkParamConstant.java
View file @
2d03d13b
...
@@ -12,4 +12,6 @@ public final class FlinkParamConstant {
...
@@ -12,4 +12,6 @@ public final class FlinkParamConstant {
public
static
final
String
URL
=
"url"
;
public
static
final
String
URL
=
"url"
;
public
static
final
String
USERNAME
=
"username"
;
public
static
final
String
USERNAME
=
"username"
;
public
static
final
String
PASSWORD
=
"password"
;
public
static
final
String
PASSWORD
=
"password"
;
public
static
final
String
SPLIT
=
";"
;
}
}
dlink-client/dlink-client-base/src/main/java/com/dlink/exception/FlinkClientException.java
0 → 100644
View file @
2d03d13b
package
com
.
dlink
.
exception
;
/**
* FlinkClientException
*
* @author wenmo
* @since 2022/4/12 21:21
**/
public
class
FlinkClientException
extends
RuntimeException
{
public
FlinkClientException
(
String
message
,
Throwable
cause
)
{
super
(
message
,
cause
);
}
public
FlinkClientException
(
String
message
)
{
super
(
message
);
}
}
dlink-client/dlink-client-base/src/main/java/com/dlink/model/FlinkCDCConfig.java
View file @
2d03d13b
package
com
.
dlink
.
model
;
package
com
.
dlink
.
model
;
import
java.util.List
;
/**
/**
* FlinkCDCConfig
* FlinkCDCConfig
*
*
...
@@ -10,14 +8,16 @@ import java.util.List;
...
@@ -10,14 +8,16 @@ import java.util.List;
*/
*/
public
class
FlinkCDCConfig
{
public
class
FlinkCDCConfig
{
private
String
type
;
private
String
hostname
;
private
String
hostname
;
private
Integer
port
;
private
Integer
port
;
private
String
username
;
private
String
username
;
private
String
password
;
private
String
password
;
private
Integer
checkpoint
;
private
Integer
checkpoint
;
private
Integer
parallelism
;
private
Integer
parallelism
;
private
List
<
String
>
database
;
private
String
database
;
private
List
<
String
>
table
;
private
String
schema
;
private
String
table
;
private
String
startupMode
;
private
String
startupMode
;
private
String
topic
;
private
String
topic
;
private
String
brokers
;
private
String
brokers
;
...
@@ -25,7 +25,9 @@ public class FlinkCDCConfig {
...
@@ -25,7 +25,9 @@ public class FlinkCDCConfig {
public
FlinkCDCConfig
()
{
public
FlinkCDCConfig
()
{
}
}
public
FlinkCDCConfig
(
String
hostname
,
int
port
,
String
username
,
String
password
,
int
checkpoint
,
int
parallelism
,
List
<
String
>
database
,
List
<
String
>
table
,
String
startupMode
,
String
topic
,
String
brokers
)
{
public
FlinkCDCConfig
(
String
type
,
String
hostname
,
int
port
,
String
username
,
String
password
,
int
checkpoint
,
int
parallelism
,
String
database
,
String
schema
,
String
table
,
String
startupMode
,
String
topic
,
String
brokers
)
{
this
.
type
=
type
;
this
.
hostname
=
hostname
;
this
.
hostname
=
hostname
;
this
.
port
=
port
;
this
.
port
=
port
;
this
.
username
=
username
;
this
.
username
=
username
;
...
@@ -33,12 +35,21 @@ public class FlinkCDCConfig {
...
@@ -33,12 +35,21 @@ public class FlinkCDCConfig {
this
.
checkpoint
=
checkpoint
;
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
parallelism
=
parallelism
;
this
.
database
=
database
;
this
.
database
=
database
;
this
.
schema
=
schema
;
this
.
table
=
table
;
this
.
table
=
table
;
this
.
startupMode
=
startupMode
;
this
.
startupMode
=
startupMode
;
this
.
topic
=
topic
;
this
.
topic
=
topic
;
this
.
brokers
=
brokers
;
this
.
brokers
=
brokers
;
}
}
public
String
getType
()
{
return
type
;
}
public
void
setType
(
String
type
)
{
this
.
type
=
type
;
}
public
String
getHostname
()
{
public
String
getHostname
()
{
return
hostname
;
return
hostname
;
}
}
...
@@ -87,19 +98,27 @@ public class FlinkCDCConfig {
...
@@ -87,19 +98,27 @@ public class FlinkCDCConfig {
this
.
parallelism
=
parallelism
;
this
.
parallelism
=
parallelism
;
}
}
public
List
<
String
>
getDatabase
()
{
public
String
getDatabase
()
{
return
database
;
return
database
;
}
}
public
void
setDatabase
(
List
<
String
>
database
)
{
public
void
setDatabase
(
String
database
)
{
this
.
database
=
database
;
this
.
database
=
database
;
}
}
public
List
<
String
>
getTable
()
{
public
String
getSchema
()
{
return
schema
;
}
public
void
setSchema
(
String
schema
)
{
this
.
schema
=
schema
;
}
public
String
getTable
()
{
return
table
;
return
table
;
}
}
public
void
setTable
(
List
<
String
>
table
)
{
public
void
setTable
(
String
table
)
{
this
.
table
=
table
;
this
.
table
=
table
;
}
}
...
...
dlink-core/src/test/java/com/dlink/core/SqlParserTest.java
View file @
2d03d13b
package
com
.
dlink
.
core
;
package
com
.
dlink
.
core
;
import
com.dlink.parser.SingleSqlParserFactory
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.junit.Test
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
import
org.junit.Test
;
import
com.dlink.parser.SingleSqlParserFactory
;
/**
/**
* SqlParserTest
* SqlParserTest
*
*
...
@@ -64,13 +66,14 @@ public class SqlParserTest {
...
@@ -64,13 +66,14 @@ public class SqlParserTest {
@Test
@Test
public
void
createCDCSourceTest
()
{
public
void
createCDCSourceTest
()
{
String
sql
=
"EXECUTE CDCSOURCE demo WITH (\n"
+
String
sql
=
"EXECUTE CDCSOURCE demo WITH (\n"
+
" 'type'='mysql-cdc',\n"
+
" 'hostname'='127.0.0.1',\n"
+
" 'hostname'='127.0.0.1',\n"
+
" 'port'='3306',\n"
+
" 'port'='3306',\n"
+
" 'password'='dlink',\n"
+
" 'password'='dlink',\n"
+
" 'hostname'='dlink',\n"
+
" 'hostname'='dlink',\n"
+
" 'checkpoint'='3000',\n"
+
" 'checkpoint'='3000',\n"
+
" 'parallelism'='1',\n"
+
" 'parallelism'='1',\n"
+
" 'database'='dlink:
test',\n"
+
" 'database'='dlink,
test',\n"
+
" 'table'='',\n"
+
" 'table'='',\n"
+
" 'topic'='dlinkcdc',\n"
+
" 'topic'='dlinkcdc',\n"
+
" 'brokers'='127.0.0.1:9092'\n"
+
" 'brokers'='127.0.0.1:9092'\n"
+
...
...
dlink-executor/src/main/java/com/dlink/parser/CreateCDCSourceSqlParser.java
View file @
2d03d13b
...
@@ -15,6 +15,6 @@ public class CreateCDCSourceSqlParser extends BaseSingleSqlParser {
...
@@ -15,6 +15,6 @@ public class CreateCDCSourceSqlParser extends BaseSingleSqlParser {
@Override
@Override
protected
void
initializeSegments
()
{
protected
void
initializeSegments
()
{
segments
.
add
(
new
SqlSegment
(
"CDCSOURCE"
,
"(execute\\s+cdcsource\\s+)(.+)(\\s+with\\s+\\()"
,
"[,]"
));
segments
.
add
(
new
SqlSegment
(
"CDCSOURCE"
,
"(execute\\s+cdcsource\\s+)(.+)(\\s+with\\s+\\()"
,
"[,]"
));
segments
.
add
(
new
SqlSegment
(
"WITH"
,
"(with\\s+\\()(.+)(\\))"
,
"
[,]
"
));
segments
.
add
(
new
SqlSegment
(
"WITH"
,
"(with\\s+\\()(.+)(\\))"
,
"
',
"
));
}
}
}
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
View file @
2d03d13b
package
com
.
dlink
.
trans
.
ddl
;
package
com
.
dlink
.
trans
.
ddl
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.parser.SingleSqlParserFactory
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
import
java.util.regex.Matcher
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
java.util.regex.Pattern
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.parser.SingleSqlParserFactory
;
/**
/**
*
TODO
*
CDCSource
*
*
* @author wenmo
* @author wenmo
* @since 2022/1/29 23:30
* @since 2022/1/29 23:30
*/
*/
public
class
CDCSource
{
public
class
CDCSource
{
private
String
type
;
private
String
statement
;
private
String
statement
;
private
String
name
;
private
String
name
;
private
String
hostname
;
private
String
hostname
;
...
@@ -26,13 +26,16 @@ public class CDCSource {
...
@@ -26,13 +26,16 @@ public class CDCSource {
private
String
password
;
private
String
password
;
private
Integer
checkpoint
;
private
Integer
checkpoint
;
private
Integer
parallelism
;
private
Integer
parallelism
;
private
List
<
String
>
database
;
private
String
database
;
private
List
<
String
>
table
;
private
String
schema
;
private
String
table
;
private
String
startupMode
;
private
String
startupMode
;
private
String
topic
;
private
String
topic
;
private
String
brokers
;
private
String
brokers
;
public
CDCSource
(
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
startupMode
,
String
topic
,
String
brokers
)
{
public
CDCSource
(
String
type
,
String
statement
,
String
name
,
String
hostname
,
Integer
port
,
String
username
,
String
password
,
Integer
checkpoint
,
Integer
parallelism
,
String
startupMode
,
String
topic
,
String
brokers
)
{
this
.
type
=
type
;
this
.
statement
=
statement
;
this
.
statement
=
statement
;
this
.
name
=
name
;
this
.
name
=
name
;
this
.
hostname
=
hostname
;
this
.
hostname
=
hostname
;
...
@@ -49,7 +52,9 @@ public class CDCSource {
...
@@ -49,7 +52,9 @@ public class CDCSource {
public
static
CDCSource
build
(
String
statement
)
{
public
static
CDCSource
build
(
String
statement
)
{
Map
<
String
,
List
<
String
>>
map
=
SingleSqlParserFactory
.
generateParser
(
statement
);
Map
<
String
,
List
<
String
>>
map
=
SingleSqlParserFactory
.
generateParser
(
statement
);
Map
<
String
,
String
>
config
=
getKeyValue
(
map
.
get
(
"WITH"
));
Map
<
String
,
String
>
config
=
getKeyValue
(
map
.
get
(
"WITH"
));
CDCSource
cdcSource
=
new
CDCSource
(
statement
,
CDCSource
cdcSource
=
new
CDCSource
(
config
.
get
(
"type"
),
statement
,
map
.
get
(
"CDCSOURCE"
).
toString
(),
map
.
get
(
"CDCSOURCE"
).
toString
(),
config
.
get
(
"hostname"
),
config
.
get
(
"hostname"
),
Integer
.
valueOf
(
config
.
get
(
"port"
)),
Integer
.
valueOf
(
config
.
get
(
"port"
)),
...
@@ -62,10 +67,13 @@ public class CDCSource {
...
@@ -62,10 +67,13 @@ public class CDCSource {
config
.
get
(
"brokers"
)
config
.
get
(
"brokers"
)
);
);
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"database"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"database"
)))
{
cdcSource
.
setDatabase
(
Arrays
.
asList
(
config
.
get
(
"database"
).
split
(
":"
)));
cdcSource
.
setDatabase
(
config
.
get
(
"database"
));
}
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"schema"
)))
{
cdcSource
.
setSchema
(
config
.
get
(
"schema"
));
}
}
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"table"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"table"
)))
{
cdcSource
.
setTable
(
Arrays
.
asList
(
config
.
get
(
"table"
).
split
(
":"
)
));
cdcSource
.
setTable
(
config
.
get
(
"table"
));
}
}
return
cdcSource
;
return
cdcSource
;
}
}
...
@@ -74,7 +82,7 @@ public class CDCSource {
...
@@ -74,7 +82,7 @@ public class CDCSource {
Map
<
String
,
String
>
map
=
new
HashMap
<>();
Map
<
String
,
String
>
map
=
new
HashMap
<>();
Pattern
p
=
Pattern
.
compile
(
"'(.*?)'\\s*=\\s*'(.*?)'"
);
Pattern
p
=
Pattern
.
compile
(
"'(.*?)'\\s*=\\s*'(.*?)'"
);
for
(
int
i
=
0
;
i
<
list
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
list
.
size
();
i
++)
{
Matcher
m
=
p
.
matcher
(
list
.
get
(
i
));
Matcher
m
=
p
.
matcher
(
list
.
get
(
i
)
+
"'"
);
if
(
m
.
find
())
{
if
(
m
.
find
())
{
map
.
put
(
m
.
group
(
1
),
m
.
group
(
2
));
map
.
put
(
m
.
group
(
1
),
m
.
group
(
2
));
}
}
...
@@ -82,6 +90,14 @@ public class CDCSource {
...
@@ -82,6 +90,14 @@ public class CDCSource {
return
map
;
return
map
;
}
}
public
String
getType
()
{
return
type
;
}
public
void
setType
(
String
type
)
{
this
.
type
=
type
;
}
public
String
getStatement
()
{
public
String
getStatement
()
{
return
statement
;
return
statement
;
}
}
...
@@ -146,19 +162,27 @@ public class CDCSource {
...
@@ -146,19 +162,27 @@ public class CDCSource {
this
.
parallelism
=
parallelism
;
this
.
parallelism
=
parallelism
;
}
}
public
List
<
String
>
getDatabase
()
{
public
String
getDatabase
()
{
return
database
;
return
database
;
}
}
public
void
setDatabase
(
List
<
String
>
database
)
{
public
void
setDatabase
(
String
database
)
{
this
.
database
=
database
;
this
.
database
=
database
;
}
}
public
List
<
String
>
getTable
()
{
public
String
getSchema
()
{
return
schema
;
}
public
void
setSchema
(
String
schema
)
{
this
.
schema
=
schema
;
}
public
String
getTable
()
{
return
table
;
return
table
;
}
}
public
void
setTable
(
List
<
String
>
table
)
{
public
void
setTable
(
String
table
)
{
this
.
table
=
table
;
this
.
table
=
table
;
}
}
...
...
dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
View file @
2d03d13b
package
com
.
dlink
.
trans
.
ddl
;
package
com
.
dlink
.
trans
.
ddl
;
import
org.apache.flink.table.api.TableResult
;
import
com.dlink.cdc.FlinkCDCMergeBuilder
;
import
com.dlink.cdc.FlinkCDCMergeBuilder
;
import
com.dlink.executor.Executor
;
import
com.dlink.executor.Executor
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.trans.AbstractOperation
;
import
com.dlink.trans.AbstractOperation
;
import
com.dlink.trans.Operation
;
import
com.dlink.trans.Operation
;
import
org.apache.flink.table.api.TableResult
;
/**
/**
* CreateCDCSourceOperation
* CreateCDCSourceOperation
...
@@ -37,9 +38,9 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
...
@@ -37,9 +38,9 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
@Override
@Override
public
TableResult
build
(
Executor
executor
)
{
public
TableResult
build
(
Executor
executor
)
{
CDCSource
cdcSource
=
CDCSource
.
build
(
statement
);
CDCSource
cdcSource
=
CDCSource
.
build
(
statement
);
FlinkCDCConfig
config
=
new
FlinkCDCConfig
(
cdcSource
.
getHostname
(),
cdcSource
.
getPort
(),
cdcSource
.
getUsername
()
FlinkCDCConfig
config
=
new
FlinkCDCConfig
(
cdcSource
.
get
Type
(),
cdcSource
.
get
Hostname
(),
cdcSource
.
getPort
(),
cdcSource
.
getUsername
()
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getTable
()
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getSchema
()
,
cdcSource
.
getStartupMode
(),
cdcSource
.
getTopic
(),
cdcSource
.
getBrokers
());
,
cdcSource
.
getTable
()
,
cdcSource
.
getStartupMode
(),
cdcSource
.
getTopic
(),
cdcSource
.
getBrokers
());
try
{
try
{
FlinkCDCMergeBuilder
.
buildMySqlCDC
(
executor
.
getStreamExecutionEnvironment
(),
config
);
FlinkCDCMergeBuilder
.
buildMySqlCDC
(
executor
.
getStreamExecutionEnvironment
(),
config
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment