Unverified Commit fff25af9 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Optimization-384][client] Optimization CDCSourceMerge

[Optimization-384][client] Optimization CDCSourceMerge
parents b8743111 b1455045
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.6</flink.version> <flink.version>1.11.6</flink.version>
<flinkcdc.version>2.1.1</flinkcdc.version> <flinkcdc.version>1.1.0</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version> <junit.version>4.12</junit.version>
...@@ -90,7 +90,7 @@ ...@@ -90,7 +90,7 @@
<version>${flink.version}</version> <version>${flink.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.ververica</groupId> <groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId> <artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version> <version>${flinkcdc.version}</version>
</dependency> </dependency>
......
package com.dlink.cdc; package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
/** /**
* FlinkCDCMergeBuilder * FlinkCDCMergeBuilder
* *
...@@ -27,44 +25,26 @@ public class FlinkCDCMergeBuilder { ...@@ -27,44 +25,26 @@ public class FlinkCDCMergeBuilder {
if (Asserts.isNotNull(config.getCheckpoint())) { if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint()); env.enableCheckpointing(config.getCheckpoint());
} }
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder() MySQLSource.Builder<String> sourceBuilder = MySQLSource.<String>builder()
.hostname(config.getHostname()) .hostname(config.getHostname())
.port(config.getPort()) .port(config.getPort())
.username(config.getUsername()) .username(config.getUsername())
.password(config.getPassword()); .password(config.getPassword());
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) { if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0])); sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
} }
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) { if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0])); sourceBuilder.tableList(config.getTable().toArray(new String[0]));
} }
MySqlSourceBuilder<String> builder = sourceBuilder sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema()); .deserializer(new StringDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) { DataStreamSource<String> streamSource = env.addSource(sourceBuilder.build(), "MySQL CDC Source");
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
builder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
builder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
builder.startupOptions(StartupOptions.latest());
break;
default:
builder.startupOptions(StartupOptions.latest());
}
} else {
builder.startupOptions(StartupOptions.latest());
}
MySqlSource<String> sourceFunction = builder.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic())); streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
} }
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) { private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers, return new FlinkKafkaProducer<String>(brokers,
topic, topic,
new SimpleStringSchema()); new SimpleStringSchema());
} }
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.7</flink.version> <flink.version>1.12.7</flink.version>
<flinkcdc.version>2.1.1</flinkcdc.version> <flinkcdc.version>1.3.0</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version> <junit.version>4.12</junit.version>
...@@ -91,7 +91,7 @@ ...@@ -91,7 +91,7 @@
<version>${flink.version}</version> <version>${flink.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.ververica</groupId> <groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId> <artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version> <version>${flinkcdc.version}</version>
</dependency> </dependency>
......
package com.dlink.cdc; package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
/** /**
* FlinkCDCMergeBuilder * FlinkCDCMergeBuilder
* *
...@@ -27,44 +26,43 @@ public class FlinkCDCMergeBuilder { ...@@ -27,44 +26,43 @@ public class FlinkCDCMergeBuilder {
if (Asserts.isNotNull(config.getCheckpoint())) { if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint()); env.enableCheckpointing(config.getCheckpoint());
} }
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder() MySQLSource.Builder<String> sourceBuilder = MySQLSource.<String>builder()
.hostname(config.getHostname()) .hostname(config.getHostname())
.port(config.getPort()) .port(config.getPort())
.username(config.getUsername()) .username(config.getUsername())
.password(config.getPassword()); .password(config.getPassword());
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) { if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0])); sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
} }
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) { if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0])); sourceBuilder.tableList(config.getTable().toArray(new String[0]));
} }
MySqlSourceBuilder<String> builder = sourceBuilder sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema()); .deserializer(new StringDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) { switch (config.getStartupMode().toUpperCase()) {
case "INITIAL": case "INITIAL":
builder.startupOptions(StartupOptions.initial()); sourceBuilder.startupOptions(StartupOptions.initial());
break; break;
case "EARLIEST": case "EARLIEST":
builder.startupOptions(StartupOptions.earliest()); sourceBuilder.startupOptions(StartupOptions.earliest());
break; break;
case "LATEST": case "LATEST":
builder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default: default:
builder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
} }
} else { } else {
builder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
} }
MySqlSource<String> sourceFunction = builder.build(); DataStreamSource<String> streamSource = env.addSource(sourceBuilder.build(), "MySQL CDC Source");
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic())); streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
} }
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) { private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers, return new FlinkKafkaProducer<String>(brokers,
topic, topic,
new SimpleStringSchema()); new SimpleStringSchema());
} }
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.6</flink.version> <flink.version>1.13.6</flink.version>
<flinkcdc.version>2.1.1</flinkcdc.version> <flinkcdc.version>2.2.0</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version> <junit.version>4.12</junit.version>
......
package com.dlink.cdc; package com.dlink.cdc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
/** /**
* FlinkCDCMergeBuilder * FlinkCDCMergeBuilder
...@@ -28,43 +29,41 @@ public class FlinkCDCMergeBuilder { ...@@ -28,43 +29,41 @@ public class FlinkCDCMergeBuilder {
env.enableCheckpointing(config.getCheckpoint()); env.enableCheckpointing(config.getCheckpoint());
} }
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder() MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname()) .hostname(config.getHostname())
.port(config.getPort()) .port(config.getPort())
.username(config.getUsername()) .username(config.getUsername())
.password(config.getPassword()); .password(config.getPassword());
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) { if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0])); sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
} }
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) { if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0])); sourceBuilder.tableList(config.getTable().toArray(new String[0]));
} }
MySqlSourceBuilder<String> builder = sourceBuilder sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
.deserializer(new JsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) { switch (config.getStartupMode().toUpperCase()) {
case "INITIAL": case "INITIAL":
builder.startupOptions(StartupOptions.initial()); sourceBuilder.startupOptions(StartupOptions.initial());
break; break;
case "EARLIEST": case "EARLIEST":
builder.startupOptions(StartupOptions.earliest()); sourceBuilder.startupOptions(StartupOptions.earliest());
break; break;
case "LATEST": case "LATEST":
builder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default: default:
builder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
} }
} else { } else {
builder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
} }
MySqlSource<String> sourceFunction = builder.build(); DataStreamSource<String> streamSource = env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic())); streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
} }
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) { private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers, return new FlinkKafkaProducer<String>(brokers,
topic, topic,
new SimpleStringSchema()); new SimpleStringSchema());
} }
} }
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.3</flink.version> <flink.version>1.14.3</flink.version>
<flinkcdc.version>2.1.1</flinkcdc.version> <flinkcdc.version>2.2.0</flinkcdc.version>
<commons.version>1.3.1</commons.version> <commons.version>1.3.1</commons.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
......
package com.dlink.cdc; package com.dlink.cdc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
/** /**
* FlinkCDCMergeBuilder * FlinkCDCMergeBuilder
...@@ -28,43 +30,46 @@ public class FlinkCDCMergeBuilder { ...@@ -28,43 +30,46 @@ public class FlinkCDCMergeBuilder {
env.enableCheckpointing(config.getCheckpoint()); env.enableCheckpointing(config.getCheckpoint());
} }
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder() MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname()) .hostname(config.getHostname())
.port(config.getPort()) .port(config.getPort())
.username(config.getUsername()) .username(config.getUsername())
.password(config.getPassword()); .password(config.getPassword());
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) { if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0])); sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
} }
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) { if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0])); sourceBuilder.tableList(config.getTable().toArray(new String[0]));
} }
MySqlSourceBuilder<String> builder = sourceBuilder sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
.deserializer(new JsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) { switch (config.getStartupMode().toUpperCase()) {
case "INITIAL": case "INITIAL":
builder.startupOptions(StartupOptions.initial()); sourceBuilder.startupOptions(StartupOptions.initial());
break; break;
case "EARLIEST": case "EARLIEST":
builder.startupOptions(StartupOptions.earliest()); sourceBuilder.startupOptions(StartupOptions.earliest());
break; break;
case "LATEST": case "LATEST":
builder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default: default:
builder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
} }
} else { } else {
builder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
} }
MySqlSource<String> sourceFunction = builder.build(); DataStreamSource<String> streamSource = env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source"); streamSource.sinkTo(getKafkaProducer(config.getBrokers(), config.getTopic()));
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
} }
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) { private static KafkaSink<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers, return KafkaSink.<String>builder()
topic, .setBootstrapServers(brokers)
new SimpleStringSchema()); .setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment