Unverified Commit 1fe5bb0c authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-469][client] Add MysqlCDCSource sync extended configuration

[Feature-469][client] Add MysqlCDCSource sync extended configuration
parents 2da5c901 de28dc10
......@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
......@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String database = config.getDatabase();
String serverId = config.getSource().get("server-id");
String serverTimeZone = config.getSource().get("server-time-zone");
String fetchSize = config.getSource().get("scan.snapshot.fetch.size");
String connectTimeout = config.getSource().get("connect.timeout");
String connectMaxRetries = config.getSource().get("connect.max-retries");
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
String database = config.getDatabase();
if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
} else {
sourceBuilder.databaseList(new String[0]);
}
List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
case "initial":
......@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
if (Asserts.isNotNullString(serverId)) {
sourceBuilder.serverId(serverId);
}
if (Asserts.isNotNullString(serverTimeZone)) {
sourceBuilder.serverTimeZone(serverTimeZone);
}
if (Asserts.isNotNullString(fetchSize)) {
sourceBuilder.fetchSize(Integer.valueOf(fetchSize));
}
if (Asserts.isNotNullString(connectTimeout)) {
sourceBuilder.connectTimeout(Duration.ofMillis(Long.valueOf(connectTimeout)));
}
if (Asserts.isNotNullString(connectMaxRetries)) {
sourceBuilder.connectMaxRetries(Integer.valueOf(connectMaxRetries));
}
if (Asserts.isNotNullString(connectionPoolSize)) {
sourceBuilder.connectionPoolSize(Integer.valueOf(connectionPoolSize));
}
if (Asserts.isNotNullString(heartbeatInterval)) {
sourceBuilder.heartbeatInterval(Duration.ofMillis(Long.valueOf(heartbeatInterval)));
}
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
}
......
......@@ -4,6 +4,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
......@@ -52,32 +53,45 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String database = config.getDatabase();
String serverId = config.getSource().get("server-id");
String serverTimeZone = config.getSource().get("server-time-zone");
String fetchSize = config.getSource().get("scan.snapshot.fetch.size");
String connectTimeout = config.getSource().get("connect.timeout");
String connectMaxRetries = config.getSource().get("connect.max-retries");
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
String database = config.getDatabase();
if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
} else {
sourceBuilder.databaseList(new String[0]);
}
List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
case "initial":
......@@ -90,6 +104,35 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
if (Asserts.isNotNullString(serverId)) {
sourceBuilder.serverId(serverId);
}
if (Asserts.isNotNullString(serverTimeZone)) {
sourceBuilder.serverTimeZone(serverTimeZone);
}
if (Asserts.isNotNullString(fetchSize)) {
sourceBuilder.fetchSize(Integer.valueOf(fetchSize));
}
if (Asserts.isNotNullString(connectTimeout)) {
sourceBuilder.connectTimeout(Duration.ofMillis(Long.valueOf(connectTimeout)));
}
if (Asserts.isNotNullString(connectMaxRetries)) {
sourceBuilder.connectMaxRetries(Integer.valueOf(connectMaxRetries));
}
if (Asserts.isNotNullString(connectionPoolSize)) {
sourceBuilder.connectionPoolSize(Integer.valueOf(connectionPoolSize));
}
if (Asserts.isNotNullString(heartbeatInterval)) {
sourceBuilder.heartbeatInterval(Duration.ofMillis(Long.valueOf(heartbeatInterval)));
}
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
}
......
......@@ -24,6 +24,7 @@ public class FlinkCDCConfig {
private List<String> schemaTableNameList;
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> source;
private Map<String, String> sink;
private List<Schema> schemaList;
private String schemaFieldName;
......@@ -31,8 +32,9 @@ public class FlinkCDCConfig {
public FlinkCDCConfig() {
}
public FlinkCDCConfig(String type, String hostname, int port, String username, String password, int checkpoint, int parallelism, String database, String schema, String table, String startupMode,
Map<String, String> debezium, Map<String, String> sink) {
public FlinkCDCConfig(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table,
String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) {
this.type = type;
this.hostname = hostname;
this.port = port;
......@@ -45,6 +47,7 @@ public class FlinkCDCConfig {
this.table = table;
this.startupMode = startupMode;
this.debezium = debezium;
this.source = source;
this.sink = sink;
}
......@@ -124,6 +127,14 @@ public class FlinkCDCConfig {
return table;
}
public Map<String, String> getSource() {
return source;
}
public void setSource(Map<String, String> source) {
this.source = source;
}
public void setTable(String table) {
this.table = table;
}
......
......@@ -31,10 +31,11 @@ public class CDCSource {
private String table;
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> source;
private Map<String, String> sink;
public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> sink) {
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) {
this.connector = connector;
this.statement = statement;
this.name = name;
......@@ -46,6 +47,7 @@ public class CDCSource {
this.parallelism = parallelism;
this.startupMode = startupMode;
this.debezium = debezium;
this.source = source;
this.sink = sink;
}
......@@ -62,6 +64,16 @@ public class CDCSource {
}
}
}
Map<String, String> source = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("source.")) {
String key = entry.getKey();
key = key.replaceFirst("source.", "");
if (!source.containsKey(key)) {
source.put(key, entry.getValue());
}
}
}
Map<String, String> sink = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("sink.")) {
......@@ -84,16 +96,17 @@ public class CDCSource {
Integer.valueOf(config.get("parallelism")),
config.get("scan.startup.mode"),
debezium,
source,
sink
);
if (Asserts.isNotNullString(config.get("database"))) {
cdcSource.setDatabase(config.get("database"));
if (Asserts.isNotNullString(config.get("database-name"))) {
cdcSource.setDatabase(config.get("database-name"));
}
if (Asserts.isNotNullString(config.get("schema"))) {
cdcSource.setSchema(config.get("schema"));
if (Asserts.isNotNullString(config.get("schema-name"))) {
cdcSource.setSchema(config.get("schema-name"));
}
if (Asserts.isNotNullString(config.get("table"))) {
cdcSource.setTable(config.get("table"));
if (Asserts.isNotNullString(config.get("table-name"))) {
cdcSource.setTable(config.get("table-name"));
}
return cdcSource;
}
......@@ -229,4 +242,12 @@ public class CDCSource {
public void setDebezium(Map<String, String> debezium) {
this.debezium = debezium;
}
public Map<String, String> getSource() {
return source;
}
public void setSource(Map<String, String> source) {
this.source = source;
}
}
......@@ -53,7 +53,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSink());
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink());
try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment