Commit 4a977449 authored by wenmo's avatar wenmo

删除冗余代码

parent 03a17f8d
...@@ -19,7 +19,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -19,7 +19,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
*/ */
public class FlinkCDCMergeBuilder { public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) throws Exception { public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if(Asserts.isNotNull(config.getParallelism())){ if(Asserts.isNotNull(config.getParallelism())){
env.setParallelism(config.getParallelism()); env.setParallelism(config.getParallelism());
} }
...@@ -43,10 +43,7 @@ public class FlinkCDCMergeBuilder { ...@@ -43,10 +43,7 @@ public class FlinkCDCMergeBuilder {
.startupOptions(StartupOptions.latest()) .startupOptions(StartupOptions.latest())
.build(); .build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source"); DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.print();
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic())); streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
// JobExecutionResult jobExecutionResult = env.execute(config.getJobName());
// return jobExecutionResult.getJobID().toHexString();
} }
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) { private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
......
...@@ -19,7 +19,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -19,7 +19,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
*/ */
public class FlinkCDCMergeBuilder { public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) throws Exception { public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if(Asserts.isNotNull(config.getParallelism())){ if(Asserts.isNotNull(config.getParallelism())){
env.setParallelism(config.getParallelism()); env.setParallelism(config.getParallelism());
} }
...@@ -43,10 +43,7 @@ public class FlinkCDCMergeBuilder { ...@@ -43,10 +43,7 @@ public class FlinkCDCMergeBuilder {
.startupOptions(StartupOptions.latest()) .startupOptions(StartupOptions.latest())
.build(); .build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source"); DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.print();
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic())); streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
// JobExecutionResult jobExecutionResult = env.execute(config.getJobName());
// return jobExecutionResult.getJobID().toHexString();
} }
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) { private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
......
...@@ -19,7 +19,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -19,7 +19,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
*/ */
public class FlinkCDCMergeBuilder { public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) throws Exception { public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if(Asserts.isNotNull(config.getParallelism())){ if(Asserts.isNotNull(config.getParallelism())){
env.setParallelism(config.getParallelism()); env.setParallelism(config.getParallelism());
} }
...@@ -43,10 +43,7 @@ public class FlinkCDCMergeBuilder { ...@@ -43,10 +43,7 @@ public class FlinkCDCMergeBuilder {
.startupOptions(StartupOptions.latest()) .startupOptions(StartupOptions.latest())
.build(); .build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source"); DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.print();
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic())); streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
// JobExecutionResult jobExecutionResult = env.execute(config.getJobName());
// return jobExecutionResult.getJobID().toHexString();
} }
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) { private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
......
...@@ -19,7 +19,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -19,7 +19,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
*/ */
public class FlinkCDCMergeBuilder { public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) throws Exception { public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if(Asserts.isNotNull(config.getParallelism())){ if(Asserts.isNotNull(config.getParallelism())){
env.setParallelism(config.getParallelism()); env.setParallelism(config.getParallelism());
} }
...@@ -43,10 +43,7 @@ public class FlinkCDCMergeBuilder { ...@@ -43,10 +43,7 @@ public class FlinkCDCMergeBuilder {
.startupOptions(StartupOptions.latest()) .startupOptions(StartupOptions.latest())
.build(); .build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source"); DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.print();
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic())); streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
// JobExecutionResult jobExecutionResult = env.execute(config.getJobName());
// return jobExecutionResult.getJobID().toHexString();
} }
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) { private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment