Commit a56503a5 authored by Zack Young's avatar Zack Young

为mysql source cdc添加jdbc连接参数

parent c1227bdc
...@@ -62,10 +62,22 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -62,10 +62,22 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
String connectionPoolSize = config.getSource().get("connection.pool.size"); String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval"); String heartbeatInterval = config.getSource().get("heartbeat.interval");
Properties properties = new Properties(); Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) { for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue()); debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}
// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
} }
} }
...@@ -90,7 +102,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -90,7 +102,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties); sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) { switch (config.getStartupMode().toLowerCase()) {
......
...@@ -62,10 +62,22 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -62,10 +62,22 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
String connectionPoolSize = config.getSource().get("connection.pool.size"); String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval"); String heartbeatInterval = config.getSource().get("heartbeat.interval");
Properties properties = new Properties(); Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) { for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue()); debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}
// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
} }
} }
...@@ -90,7 +102,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -90,7 +102,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties); sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) { switch (config.getStartupMode().toLowerCase()) {
......
...@@ -62,10 +62,22 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -62,10 +62,22 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
String connectionPoolSize = config.getSource().get("connection.pool.size"); String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval"); String heartbeatInterval = config.getSource().get("heartbeat.interval");
Properties properties = new Properties(); Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) { for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue()); debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}
// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
} }
} }
...@@ -90,7 +102,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -90,7 +102,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties); sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) { switch (config.getStartupMode().toLowerCase()) {
......
...@@ -25,6 +25,7 @@ public class FlinkCDCConfig { ...@@ -25,6 +25,7 @@ public class FlinkCDCConfig {
private String startupMode; private String startupMode;
private Map<String, String> debezium; private Map<String, String> debezium;
private Map<String, String> source; private Map<String, String> source;
private Map<String, String> jdbc;
private Map<String, String> sink; private Map<String, String> sink;
private List<Schema> schemaList; private List<Schema> schemaList;
private String schemaFieldName; private String schemaFieldName;
...@@ -34,7 +35,7 @@ public class FlinkCDCConfig { ...@@ -34,7 +35,7 @@ public class FlinkCDCConfig {
public FlinkCDCConfig(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table, public FlinkCDCConfig(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table,
String startupMode, String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) { Map<String, String> debezium, Map<String, String> source, Map<String, String> sink,Map<String, String> jdbc) {
this.type = type; this.type = type;
this.hostname = hostname; this.hostname = hostname;
this.port = port; this.port = port;
...@@ -49,6 +50,7 @@ public class FlinkCDCConfig { ...@@ -49,6 +50,7 @@ public class FlinkCDCConfig {
this.debezium = debezium; this.debezium = debezium;
this.source = source; this.source = source;
this.sink = sink; this.sink = sink;
this.jdbc = jdbc;
} }
public String getType() { public String getType() {
...@@ -217,6 +219,14 @@ public class FlinkCDCConfig { ...@@ -217,6 +219,14 @@ public class FlinkCDCConfig {
return debezium; return debezium;
} }
public Map<String, String> getJdbc() {
return jdbc;
}
public void setJdbc(Map<String, String> jdbc) {
this.jdbc = jdbc;
}
public void setDebezium(Map<String, String> debezium) { public void setDebezium(Map<String, String> debezium) {
this.debezium = debezium; this.debezium = debezium;
} }
......
...@@ -31,11 +31,12 @@ public class CDCSource { ...@@ -31,11 +31,12 @@ public class CDCSource {
private String table; private String table;
private String startupMode; private String startupMode;
private Map<String, String> debezium; private Map<String, String> debezium;
private Map<String, String> jdbc;
private Map<String, String> source; private Map<String, String> source;
private Map<String, String> sink; private Map<String, String> sink;
public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) { Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) {
this.connector = connector; this.connector = connector;
this.statement = statement; this.statement = statement;
this.name = name; this.name = name;
...@@ -47,6 +48,7 @@ public class CDCSource { ...@@ -47,6 +48,7 @@ public class CDCSource {
this.parallelism = parallelism; this.parallelism = parallelism;
this.startupMode = startupMode; this.startupMode = startupMode;
this.debezium = debezium; this.debezium = debezium;
this.jdbc = jdbc;
this.source = source; this.source = source;
this.sink = sink; this.sink = sink;
} }
...@@ -74,6 +76,18 @@ public class CDCSource { ...@@ -74,6 +76,18 @@ public class CDCSource {
} }
} }
} }
// jdbc参数(jdbc.properties.*)
Map<String, String> jdbc = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("jdbc.properties.")) {
String key = entry.getKey();
key = key.replaceFirst("jdbc.properties.", "");
if (!jdbc.containsKey(key)) {
jdbc.put(key, entry.getValue());
}
}
}
Map<String, String> sink = new HashMap<>(); Map<String, String> sink = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) { for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("sink.")) { if (entry.getKey().startsWith("sink.")) {
...@@ -85,19 +99,20 @@ public class CDCSource { ...@@ -85,19 +99,20 @@ public class CDCSource {
} }
} }
CDCSource cdcSource = new CDCSource( CDCSource cdcSource = new CDCSource(
config.get("connector"), config.get("connector"),
statement, statement,
map.get("CDCSOURCE").toString(), map.get("CDCSOURCE").toString(),
config.get("hostname"), config.get("hostname"),
Integer.valueOf(config.get("port")), Integer.valueOf(config.get("port")),
config.get("username"), config.get("username"),
config.get("password"), config.get("password"),
Integer.valueOf(config.get("checkpoint")), Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")), Integer.valueOf(config.get("parallelism")),
config.get("scan.startup.mode"), config.get("scan.startup.mode"),
debezium, debezium,
source, source,
sink sink,
jdbc
); );
if (Asserts.isNotNullString(config.get("database-name"))) { if (Asserts.isNotNullString(config.get("database-name"))) {
cdcSource.setDatabase(config.get("database-name")); cdcSource.setDatabase(config.get("database-name"));
...@@ -250,4 +265,12 @@ public class CDCSource { ...@@ -250,4 +265,12 @@ public class CDCSource {
public void setSource(Map<String, String> source) { public void setSource(Map<String, String> source) {
this.source = source; this.source = source;
} }
public Map<String, String> getJdbc() {
return jdbc;
}
public void setJdbc(Map<String, String> jdbc) {
this.jdbc = jdbc;
}
} }
...@@ -54,7 +54,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -54,7 +54,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCSource cdcSource = CDCSource.build(statement); CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername() FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema() , cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink()); , cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink(),cdcSource.getJdbc());
try { try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config); CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs(); Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.6</flink.version> <flink.version>1.13.6</flink.version>
<flinkcdc.version>2.2.0</flinkcdc.version> <flinkcdc.version>2.2.1</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version> <junit.version>4.12</junit.version>
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.4</flink.version> <flink.version>1.14.4</flink.version>
<flinkcdc.version>2.2.0</flinkcdc.version> <flinkcdc.version>2.2.1</flinkcdc.version>
<commons.version>1.3.1</commons.version> <commons.version>1.3.1</commons.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment