Commit 64b8cf25 authored by wenmo's avatar wenmo

[Feature-429][*] OracleCDCSource sync kafka topics

parent 7a6749bf
......@@ -63,4 +63,8 @@ public abstract class AbstractCDCBuilder {
Collections.addAll(tableList, tables);
return tableList;
}
public String getSchemaFieldName() {
return "db";
}
}
......@@ -30,4 +30,6 @@ public interface CDCBuilder {
Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
String getSchemaFieldName();
}
......@@ -57,6 +57,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
new SimpleStringSchema()));
} else {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
......@@ -74,7 +75,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get("db").toString());
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
......
......@@ -8,6 +8,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
......@@ -49,6 +50,12 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
MySQLSource.Builder<String> sourceBuilder = MySQLSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
......@@ -60,8 +67,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
if (Asserts.isNotNullString(config.getTable())) {
sourceBuilder.tableList(config.getTable().split(FlinkParamConstant.SPLIT));
}
sourceBuilder
.deserializer(new StringDebeziumDeserializationSchema());
sourceBuilder.deserializer(new StringDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
return env.addSource(sourceBuilder.build(), "MySQL CDC Source");
}
......
......@@ -63,4 +63,8 @@ public abstract class AbstractCDCBuilder {
Collections.addAll(tableList, tables);
return tableList;
}
public String getSchemaFieldName() {
return "db";
}
}
......@@ -30,4 +30,6 @@ public interface CDCBuilder {
Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
String getSchemaFieldName();
}
......@@ -57,6 +57,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
new SimpleStringSchema()));
} else {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
......@@ -74,7 +75,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get("db").toString());
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
......
......@@ -8,6 +8,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
......@@ -50,6 +51,12 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
MySQLSource.Builder<String> sourceBuilder = MySQLSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
......@@ -61,8 +68,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
if (Asserts.isNotNullString(config.getTable())) {
sourceBuilder.tableList(config.getTable().split(FlinkParamConstant.SPLIT));
}
sourceBuilder
.deserializer(new StringDebeziumDeserializationSchema());
sourceBuilder.deserializer(new StringDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
......
......@@ -63,4 +63,8 @@ public abstract class AbstractCDCBuilder {
Collections.addAll(tableList, tables);
return tableList;
}
public String getSchemaFieldName() {
return "schema";
}
}
......@@ -30,4 +30,6 @@ public interface CDCBuilder {
Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
String getSchemaFieldName();
}
......@@ -57,6 +57,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
new SimpleStringSchema()));
} else {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
......@@ -74,7 +75,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get("db").toString());
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
......
......@@ -9,6 +9,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
......@@ -52,6 +53,12 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
......@@ -67,6 +74,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sourceBuilder.tableList(table);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
......@@ -140,4 +148,9 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sb.append("'");
return sb.toString();
}
@Override
public String getSchemaFieldName() {
return "db";
}
}
......@@ -6,6 +6,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
......@@ -47,6 +48,12 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
OracleSource.Builder<String> sourceBuilder = OracleSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
......@@ -62,6 +69,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sourceBuilder.tableList(table);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
......
......@@ -63,4 +63,8 @@ public abstract class AbstractCDCBuilder {
Collections.addAll(tableList, tables);
return tableList;
}
public String getSchemaFieldName() {
return "schema";
}
}
......@@ -30,4 +30,6 @@ public interface CDCBuilder {
Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
String getSchemaFieldName();
}
......@@ -63,6 +63,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.build());
} else {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
......@@ -80,7 +81,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get("db").toString());
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
......
......@@ -9,6 +9,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
......@@ -52,6 +53,12 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
......@@ -67,6 +74,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sourceBuilder.tableList(table);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
......@@ -133,11 +141,17 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
}
sb.append(" FROM ");
sb.append(sourceName);
/* sb.append(" WHERE database_name = '");
sb.append(" WHERE database_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");*/
sb.append("'");
return sb.toString();
}
@Override
public String getSchemaFieldName() {
return "db";
}
}
......@@ -6,6 +6,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
......@@ -48,6 +49,12 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
OracleSource.Builder<String> sourceBuilder = OracleSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
......@@ -63,6 +70,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sourceBuilder.tableList(table);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
......
......@@ -22,14 +22,16 @@ public class FlinkCDCConfig {
private String schema;
private String table;
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> sink;
private List<Schema> schemaList;
private String schemaFieldName;
public FlinkCDCConfig() {
}
public FlinkCDCConfig(String type, String hostname, int port, String username, String password, int checkpoint, int parallelism, String database, String schema, String table, String startupMode,
Map<String, String> sink) {
Map<String, String> debezium, Map<String, String> sink) {
this.type = type;
this.hostname = hostname;
this.port = port;
......@@ -41,6 +43,7 @@ public class FlinkCDCConfig {
this.schema = schema;
this.table = table;
this.startupMode = startupMode;
this.debezium = debezium;
this.sink = sink;
}
......@@ -147,4 +150,20 @@ public class FlinkCDCConfig {
public void setSchemaList(List<Schema> schemaList) {
this.schemaList = schemaList;
}
public String getSchemaFieldName() {
return schemaFieldName;
}
public void setSchemaFieldName(String schemaFieldName) {
this.schemaFieldName = schemaFieldName;
}
public Map<String, String> getDebezium() {
return debezium;
}
public void setDebezium(Map<String, String> debezium) {
this.debezium = debezium;
}
}
......@@ -30,10 +30,11 @@ public class CDCSource {
private String schema;
private String table;
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> sink;
public CDCSource(String type, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> sink) {
Map<String, String> debezium, Map<String, String> sink) {
this.type = type;
this.statement = statement;
this.name = name;
......@@ -44,12 +45,23 @@ public class CDCSource {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.startupMode = startupMode;
this.debezium = debezium;
this.sink = sink;
}
public static CDCSource build(String statement) {
Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement);
Map<String, String> config = getKeyValue(map.get("WITH"));
Map<String, String> debezium = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("debezium.")) {
String key = entry.getKey();
key = key.replace("debezium.", "");
if (!debezium.containsKey(key)) {
debezium.put(entry.getKey().replace("debezium.", ""), entry.getValue());
}
}
}
Map<String, String> sink = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("sink.")) {
......@@ -71,6 +83,7 @@ public class CDCSource {
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("startup"),
debezium,
sink
);
if (Asserts.isNotNullString(config.get("database"))) {
......@@ -208,4 +221,12 @@ public class CDCSource {
public void setStartupMode(String startupMode) {
this.startupMode = startupMode;
}
public Map<String, String> getDebezium() {
return debezium;
}
public void setDebezium(Map<String, String> debezium) {
this.debezium = debezium;
}
}
......@@ -53,10 +53,11 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getType(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getSink());
, cdcSource.getTable(), cdcSource.getStartupMode(),cdcSource.getDebezium(), cdcSource.getSink());
try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
config.setSchemaFieldName(cdcBuilder.getSchemaFieldName());
List<Schema> schemaList = new ArrayList<>();
final List<String> schemaNameList = cdcBuilder.getSchemaList();
final List<String> tableRegList = cdcBuilder.getTableList();
......@@ -69,11 +70,15 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
Driver driver = Driver.build(driverConfig);
final List<Table> tables = driver.getTablesAndColumns(schemaName);
for (Table table : tables) {
for (String tableReg : tableRegList) {
if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) {
schema.getTables().add(table);
break;
if(Asserts.isNotNullCollection(tableRegList)){
for (String tableReg : tableRegList) {
if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) {
schema.getTables().add(table);
break;
}
}
}else {
schema.getTables().add(table);
}
}
schemaList.add(schema);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment