Unverified Commit 48420487 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-570][client] Add CDCSOURCE jdbc properties and upgrade flink cdc version

为mysql source cdc添加jdbc连接参数;升级1.13和1.14 cdc连接版本(2.2.0->2.2.1)
parents c1227bdc a56503a5
......@@ -62,10 +62,22 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");
Properties properties = new Properties();
Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}
// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
}
}
......@@ -90,7 +102,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
......
......@@ -62,10 +62,22 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");
Properties properties = new Properties();
Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}
// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
}
}
......@@ -90,7 +102,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
......
......@@ -62,10 +62,22 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");
Properties properties = new Properties();
Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}
// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
}
}
......@@ -90,7 +102,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
......
......@@ -25,6 +25,7 @@ public class FlinkCDCConfig {
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> source;
private Map<String, String> jdbc;
private Map<String, String> sink;
private List<Schema> schemaList;
private String schemaFieldName;
......@@ -34,7 +35,7 @@ public class FlinkCDCConfig {
public FlinkCDCConfig(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table,
String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) {
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink,Map<String, String> jdbc) {
this.type = type;
this.hostname = hostname;
this.port = port;
......@@ -49,6 +50,7 @@ public class FlinkCDCConfig {
this.debezium = debezium;
this.source = source;
this.sink = sink;
this.jdbc = jdbc;
}
public String getType() {
......@@ -217,6 +219,14 @@ public class FlinkCDCConfig {
return debezium;
}
public Map<String, String> getJdbc() {
return jdbc;
}
public void setJdbc(Map<String, String> jdbc) {
this.jdbc = jdbc;
}
public void setDebezium(Map<String, String> debezium) {
this.debezium = debezium;
}
......
......@@ -31,11 +31,12 @@ public class CDCSource {
private String table;
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> jdbc;
private Map<String, String> source;
private Map<String, String> sink;
public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) {
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) {
this.connector = connector;
this.statement = statement;
this.name = name;
......@@ -47,6 +48,7 @@ public class CDCSource {
this.parallelism = parallelism;
this.startupMode = startupMode;
this.debezium = debezium;
this.jdbc = jdbc;
this.source = source;
this.sink = sink;
}
......@@ -74,6 +76,18 @@ public class CDCSource {
}
}
}
// jdbc参数(jdbc.properties.*)
Map<String, String> jdbc = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("jdbc.properties.")) {
String key = entry.getKey();
key = key.replaceFirst("jdbc.properties.", "");
if (!jdbc.containsKey(key)) {
jdbc.put(key, entry.getValue());
}
}
}
Map<String, String> sink = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("sink.")) {
......@@ -85,19 +99,20 @@ public class CDCSource {
}
}
CDCSource cdcSource = new CDCSource(
config.get("connector"),
statement,
map.get("CDCSOURCE").toString(),
config.get("hostname"),
Integer.valueOf(config.get("port")),
config.get("username"),
config.get("password"),
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("scan.startup.mode"),
debezium,
source,
sink
config.get("connector"),
statement,
map.get("CDCSOURCE").toString(),
config.get("hostname"),
Integer.valueOf(config.get("port")),
config.get("username"),
config.get("password"),
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("scan.startup.mode"),
debezium,
source,
sink,
jdbc
);
if (Asserts.isNotNullString(config.get("database-name"))) {
cdcSource.setDatabase(config.get("database-name"));
......@@ -250,4 +265,12 @@ public class CDCSource {
public void setSource(Map<String, String> source) {
this.source = source;
}
public Map<String, String> getJdbc() {
return jdbc;
}
public void setJdbc(Map<String, String> jdbc) {
this.jdbc = jdbc;
}
}
......@@ -54,7 +54,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink());
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink(),cdcSource.getJdbc());
try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
......
......@@ -15,7 +15,7 @@
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.6</flink.version>
<flinkcdc.version>2.2.0</flinkcdc.version>
<flinkcdc.version>2.2.1</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
......
......@@ -15,7 +15,7 @@
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.4</flink.version>
<flinkcdc.version>2.2.0</flinkcdc.version>
<flinkcdc.version>2.2.1</flinkcdc.version>
<commons.version>1.3.1</commons.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment