Commit 522cefb0 authored by wenmo's avatar wenmo

[Feature-389][client,executor] Add OracleCDCSourceMerge

parent d4097462
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractCDCBuilder {
protected FlinkCDCConfig config;
public AbstractCDCBuilder() {
}
public AbstractCDCBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface CDCBuilder {
String getHandle();
CDCBuilder create(FlinkCDCConfig config);
DataStreamSource build(StreamExecutionEnvironment env);
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.mysql.MysqlCDCBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public class CDCBuilderFactory {
private static CDCBuilder[] cdcBuilders = {
new MysqlCDCBuilder()
};
public static CDCBuilder buildCDCBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getType())) {
throw new FlinkClientException("请指定 CDC Source 类型。");
}
for (int i = 0; i < cdcBuilders.length; i++) {
if (config.getType().equals(cdcBuilders[i].getHandle())) {
return cdcBuilders[i].create(config);
}
}
throw new FlinkClientException("未匹配到对应 CDC Source 类型的【" + config.getType() + "】。");
}
}
......@@ -5,8 +5,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
......@@ -25,20 +23,7 @@ public class FlinkCDCMergeBuilder {
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
MySQLSource.Builder<String> sourceBuilder = MySQLSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
sourceBuilder
.deserializer(new StringDebeziumDeserializationSchema());
DataStreamSource<String> streamSource = env.addSource(sourceBuilder.build(), "MySQL CDC Source");
DataStreamSource<String> streamSource = CDCBuilderFactory.buildCDCBuilder(config).build(env);
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
......
package com.dlink.cdc.mysql;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc";
public MysqlCDCBuilder() {
}
public MysqlCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new MysqlCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
MySQLSource.Builder<String> sourceBuilder = MySQLSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if (Asserts.isNotNullString(config.getDatabase())) {
sourceBuilder.databaseList(config.getDatabase().split(FlinkParamConstant.SPLIT));
}
if (Asserts.isNotNullString(config.getTable())) {
sourceBuilder.tableList(config.getTable().split(FlinkParamConstant.SPLIT));
}
sourceBuilder
.deserializer(new StringDebeziumDeserializationSchema());
return env.addSource(sourceBuilder.build(), "MySQL CDC Source");
}
}
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractCDCBuilder {
protected FlinkCDCConfig config;
public AbstractCDCBuilder() {
}
public AbstractCDCBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface CDCBuilder {
String getHandle();
CDCBuilder create(FlinkCDCConfig config);
DataStreamSource build(StreamExecutionEnvironment env);
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.mysql.MysqlCDCBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public class CDCBuilderFactory {
private static CDCBuilder[] cdcBuilders = {
new MysqlCDCBuilder()
};
public static CDCBuilder buildCDCBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getType())) {
throw new FlinkClientException("请指定 CDC Source 类型。");
}
for (int i = 0; i < cdcBuilders.length; i++) {
if (config.getType().equals(cdcBuilders[i].getHandle())) {
return cdcBuilders[i].create(config);
}
}
throw new FlinkClientException("未匹配到对应 CDC Source 类型的【" + config.getType() + "】。");
}
}
......@@ -9,6 +9,7 @@ import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
/**
......@@ -26,37 +27,7 @@ public class FlinkCDCMergeBuilder {
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
MySQLSource.Builder<String> sourceBuilder = MySQLSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
sourceBuilder
.deserializer(new StringDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
sourceBuilder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
DataStreamSource<String> streamSource = env.addSource(sourceBuilder.build(), "MySQL CDC Source");
DataStreamSource<String> streamSource = CDCBuilderFactory.buildCDCBuilder(config).build(env);
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
......
package com.dlink.cdc.mysql;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc";
public MysqlCDCBuilder() {
}
public MysqlCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new MysqlCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
MySQLSource.Builder<String> sourceBuilder = MySQLSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if (Asserts.isNotNullString(config.getDatabase())) {
sourceBuilder.databaseList(config.getDatabase().split(FlinkParamConstant.SPLIT));
}
if (Asserts.isNotNullString(config.getTable())) {
sourceBuilder.tableList(config.getTable().split(FlinkParamConstant.SPLIT));
}
sourceBuilder
.deserializer(new StringDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
sourceBuilder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
return env.addSource(sourceBuilder.build(), "MySQL CDC Source");
}
}
......@@ -110,6 +110,11 @@
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractCDCBuilder {
protected FlinkCDCConfig config;
public AbstractCDCBuilder() {
}
public AbstractCDCBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface CDCBuilder {
String getHandle();
CDCBuilder create(FlinkCDCConfig config);
DataStreamSource build(StreamExecutionEnvironment env);
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.mysql.MysqlCDCBuilder;
import com.dlink.cdc.oracle.OracleCDCBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public class CDCBuilderFactory {
private static CDCBuilder[] cdcBuilders = {
new MysqlCDCBuilder(),
new OracleCDCBuilder()
};
public static CDCBuilder buildCDCBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getType())) {
throw new FlinkClientException("请指定 CDC Source 类型。");
}
for (int i = 0; i < cdcBuilders.length; i++) {
if (config.getType().equals(cdcBuilders[i].getHandle())) {
return cdcBuilders[i].create(config);
}
}
throw new FlinkClientException("未匹配到对应 CDC Source 类型的【" + config.getType() + "】。");
}
}
package com.dlink.cdc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -8,10 +7,6 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* FlinkCDCMergeBuilder
......@@ -28,36 +23,7 @@ public class FlinkCDCMergeBuilder {
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
sourceBuilder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
DataStreamSource<String> streamSource = env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
DataStreamSource<String> streamSource = CDCBuilderFactory.buildCDCBuilder(config).build(env);
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
......
package com.dlink.cdc.mysql;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc";
public MysqlCDCBuilder() {
}
public MysqlCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new MysqlCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
String database = config.getDatabase();
if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
}
String table = config.getTable();
if (Asserts.isNotNullString(table)) {
sourceBuilder.tableList(table);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
sourceBuilder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
}
}
package com.dlink.cdc.oracle;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "oracle-cdc";
public OracleCDCBuilder() {
}
public OracleCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new OracleCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
OracleSource.Builder<String> sourceBuilder = OracleSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword())
.database(config.getDatabase());
String schema = config.getSchema();
if (Asserts.isNotNullString(schema)) {
sourceBuilder.schemaList(schema);
}
String table = config.getTable();
if (Asserts.isNotNullString(table)) {
sourceBuilder.tableList(table);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
return env.addSource(sourceBuilder.build(), "Oracle CDC Source");
}
}
......@@ -95,6 +95,11 @@
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractCDCBuilder {
protected FlinkCDCConfig config;
public AbstractCDCBuilder() {
}
public AbstractCDCBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface CDCBuilder {
String getHandle();
CDCBuilder create(FlinkCDCConfig config);
DataStreamSource build(StreamExecutionEnvironment env);
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.mysql.MysqlCDCBuilder;
import com.dlink.cdc.oracle.OracleCDCBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public class CDCBuilderFactory {
private static CDCBuilder[] cdcBuilders = {
new MysqlCDCBuilder(),
new OracleCDCBuilder()
};
public static CDCBuilder buildCDCBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getType())) {
throw new FlinkClientException("请指定 CDC Source 类型。");
}
for (int i = 0; i < cdcBuilders.length; i++) {
if (config.getType().equals(cdcBuilders[i].getHandle())) {
return cdcBuilders[i].create(config);
}
}
throw new FlinkClientException("未匹配到对应 CDC Source 类型的【" + config.getType() + "】。");
}
}
package com.dlink.cdc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
......@@ -9,10 +8,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* FlinkCDCMergeBuilder
......@@ -29,36 +24,7 @@ public class FlinkCDCMergeBuilder {
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
sourceBuilder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
DataStreamSource<String> streamSource = env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
DataStreamSource<String> streamSource = CDCBuilderFactory.buildCDCBuilder(config).build(env);
streamSource.sinkTo(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
......
package com.dlink.cdc.mysql;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc";
public MysqlCDCBuilder() {
}
public MysqlCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new MysqlCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
String database = config.getDatabase();
if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
}
String table = config.getTable();
if (Asserts.isNotNullString(table)) {
sourceBuilder.tableList(table);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
sourceBuilder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
}
}
package com.dlink.cdc.oracle;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "oracle-cdc";
public OracleCDCBuilder() {
}
public OracleCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new OracleCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
OracleSource.Builder<String> sourceBuilder = OracleSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword())
.database(config.getDatabase());
String schema = config.getSchema();
if (Asserts.isNotNullString(schema)) {
sourceBuilder.schemaList(schema);
}
String table = config.getTable();
if (Asserts.isNotNullString(table)) {
sourceBuilder.tableList(table);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
return env.addSource(sourceBuilder.build(), "Oracle CDC Source");
}
}
......@@ -12,4 +12,6 @@ public final class FlinkParamConstant {
public static final String URL = "url";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String SPLIT = ";";
}
package com.dlink.exception;
/**
* FlinkClientException
*
* @author wenmo
* @since 2022/4/12 21:21
**/
public class FlinkClientException extends RuntimeException {
public FlinkClientException(String message, Throwable cause) {
super(message, cause);
}
public FlinkClientException(String message) {
super(message);
}
}
package com.dlink.model;
import java.util.List;
/**
* FlinkCDCConfig
*
......@@ -10,14 +8,16 @@ import java.util.List;
*/
public class FlinkCDCConfig {
private String type;
private String hostname;
private Integer port;
private String username;
private String password;
private Integer checkpoint;
private Integer parallelism;
private List<String> database;
private List<String> table;
private String database;
private String schema;
private String table;
private String startupMode;
private String topic;
private String brokers;
......@@ -25,7 +25,9 @@ public class FlinkCDCConfig {
public FlinkCDCConfig() {
}
public FlinkCDCConfig(String hostname, int port, String username, String password, int checkpoint, int parallelism, List<String> database, List<String> table, String startupMode, String topic, String brokers) {
public FlinkCDCConfig(String type, String hostname, int port, String username, String password, int checkpoint, int parallelism, String database, String schema, String table, String startupMode,
String topic, String brokers) {
this.type = type;
this.hostname = hostname;
this.port = port;
this.username = username;
......@@ -33,12 +35,21 @@ public class FlinkCDCConfig {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.database = database;
this.schema = schema;
this.table = table;
this.startupMode = startupMode;
this.topic = topic;
this.brokers = brokers;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getHostname() {
return hostname;
}
......@@ -87,19 +98,27 @@ public class FlinkCDCConfig {
this.parallelism = parallelism;
}
public List<String> getDatabase() {
public String getDatabase() {
return database;
}
public void setDatabase(List<String> database) {
public void setDatabase(String database) {
this.database = database;
}
public List<String> getTable() {
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public String getTable() {
return table;
}
public void setTable(List<String> table) {
public void setTable(String table) {
this.table = table;
}
......
package com.dlink.core;
import com.dlink.parser.SingleSqlParserFactory;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import com.dlink.parser.SingleSqlParserFactory;
/**
* SqlParserTest
*
......@@ -25,17 +27,17 @@ public class SqlParserTest {
@Test
public void createAggTableTest() {
String sql = "CREATE AGGTABLE agg1 AS \n" +
"SELECT sid,data\n" +
"FROM score\n" +
"WHERE cls = 1\n" +
"GROUP BY sid\n" +
"AGG BY toMap(cls,score) as (data)";
"SELECT sid,data\n" +
"FROM score\n" +
"WHERE cls = 1\n" +
"GROUP BY sid\n" +
"AGG BY toMap(cls,score) as (data)";
String sql2 = "\r\n" +
"CREATE AGGTABLE aggscore AS \r\n" +
"SELECT cls,score,rank\r\n" +
"FROM score\r\n" +
"GROUP BY cls\r\n" +
"AGG BY TOP2(score) as (score,rank)";
"CREATE AGGTABLE aggscore AS \r\n" +
"SELECT cls,score,rank\r\n" +
"FROM score\r\n" +
"GROUP BY cls\r\n" +
"AGG BY TOP2(score) as (score,rank)";
//sql=sql.replace("\n"," ");
Map<String, List<String>> lists = SingleSqlParserFactory.generateParser(sql2);
System.out.println(lists.toString());
......@@ -52,11 +54,11 @@ public class SqlParserTest {
@Test
public void regTest() {
String sql = "--并行度\n" +
"CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (sid) NOT ENFORCED\n" +
") WITH ${tb}";
"CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (sid) NOT ENFORCED\n" +
") WITH ${tb}";
sql = sql.replaceAll("--([^'\r\n]{0,}('[^'\r\n]{0,}'){0,1}[^'\r\n]{0,}){0,}", "").trim();
System.out.println(sql);
}
......@@ -64,17 +66,18 @@ public class SqlParserTest {
@Test
public void createCDCSourceTest() {
String sql = "EXECUTE CDCSOURCE demo WITH (\n" +
" 'hostname'='127.0.0.1',\n" +
" 'port'='3306',\n" +
" 'password'='dlink',\n" +
" 'hostname'='dlink',\n" +
" 'checkpoint'='3000',\n" +
" 'parallelism'='1',\n" +
" 'database'='dlink:test',\n" +
" 'table'='',\n" +
" 'topic'='dlinkcdc',\n" +
" 'brokers'='127.0.0.1:9092'\n" +
");";
" 'type'='mysql-cdc',\n" +
" 'hostname'='127.0.0.1',\n" +
" 'port'='3306',\n" +
" 'password'='dlink',\n" +
" 'hostname'='dlink',\n" +
" 'checkpoint'='3000',\n" +
" 'parallelism'='1',\n" +
" 'database'='dlink,test',\n" +
" 'table'='',\n" +
" 'topic'='dlinkcdc',\n" +
" 'brokers'='127.0.0.1:9092'\n" +
");";
Map<String, List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString());
}
......
......@@ -15,6 +15,6 @@ public class CreateCDCSourceSqlParser extends BaseSingleSqlParser {
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("CDCSOURCE", "(execute\\s+cdcsource\\s+)(.+)(\\s+with\\s+\\()", "[,]"));
segments.add(new SqlSegment("WITH", "(with\\s+\\()(.+)(\\))", "[,]"));
segments.add(new SqlSegment("WITH", "(with\\s+\\()(.+)(\\))", "',"));
}
}
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
import com.dlink.parser.SingleSqlParserFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.dlink.assertion.Asserts;
import com.dlink.parser.SingleSqlParserFactory;
/**
* TODO
* CDCSource
*
* @author wenmo
* @since 2022/1/29 23:30
*/
public class CDCSource {
private String type;
private String statement;
private String name;
private String hostname;
......@@ -26,13 +26,16 @@ public class CDCSource {
private String password;
private Integer checkpoint;
private Integer parallelism;
private List<String> database;
private List<String> table;
private String database;
private String schema;
private String table;
private String startupMode;
private String topic;
private String brokers;
public CDCSource(String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, String topic, String brokers) {
public CDCSource(String type, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
String topic, String brokers) {
this.type = type;
this.statement = statement;
this.name = name;
this.hostname = hostname;
......@@ -49,23 +52,28 @@ public class CDCSource {
public static CDCSource build(String statement) {
Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement);
Map<String, String> config = getKeyValue(map.get("WITH"));
CDCSource cdcSource = new CDCSource(statement,
map.get("CDCSOURCE").toString(),
config.get("hostname"),
Integer.valueOf(config.get("port")),
config.get("username"),
config.get("password"),
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("startup"),
config.get("topic"),
config.get("brokers")
CDCSource cdcSource = new CDCSource(
config.get("type"),
statement,
map.get("CDCSOURCE").toString(),
config.get("hostname"),
Integer.valueOf(config.get("port")),
config.get("username"),
config.get("password"),
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("startup"),
config.get("topic"),
config.get("brokers")
);
if (Asserts.isNotNullString(config.get("database"))) {
cdcSource.setDatabase(Arrays.asList(config.get("database").split(":")));
cdcSource.setDatabase(config.get("database"));
}
if (Asserts.isNotNullString(config.get("schema"))) {
cdcSource.setSchema(config.get("schema"));
}
if (Asserts.isNotNullString(config.get("table"))) {
cdcSource.setTable(Arrays.asList(config.get("table").split(":")));
cdcSource.setTable(config.get("table"));
}
return cdcSource;
}
......@@ -74,7 +82,7 @@ public class CDCSource {
Map<String, String> map = new HashMap<>();
Pattern p = Pattern.compile("'(.*?)'\\s*=\\s*'(.*?)'");
for (int i = 0; i < list.size(); i++) {
Matcher m = p.matcher(list.get(i));
Matcher m = p.matcher(list.get(i) + "'");
if (m.find()) {
map.put(m.group(1), m.group(2));
}
......@@ -82,6 +90,14 @@ public class CDCSource {
return map;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getStatement() {
return statement;
}
......@@ -146,19 +162,27 @@ public class CDCSource {
this.parallelism = parallelism;
}
public List<String> getDatabase() {
public String getDatabase() {
return database;
}
public void setDatabase(List<String> database) {
public void setDatabase(String database) {
this.database = database;
}
public List<String> getTable() {
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public String getTable() {
return table;
}
public void setTable(List<String> table) {
public void setTable(String table) {
this.table = table;
}
......
package com.dlink.trans.ddl;
import org.apache.flink.table.api.TableResult;
import com.dlink.cdc.FlinkCDCMergeBuilder;
import com.dlink.executor.Executor;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
import org.apache.flink.table.api.TableResult;
/**
* CreateCDCSourceOperation
......@@ -37,9 +38,9 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
@Override
public TableResult build(Executor executor) {
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getTable()
, cdcSource.getStartupMode(), cdcSource.getTopic(), cdcSource.getBrokers());
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getType(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getTopic(), cdcSource.getBrokers());
try {
FlinkCDCMergeBuilder.buildMySqlCDC(executor.getStreamExecutionEnvironment(), config);
} catch (Exception e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment