Commit 745cfd8e authored by wenmo's avatar wenmo

修复 CDC多源合并问题

parent e6641382
package com.dlink.exception; package com.dlink.exception;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
...@@ -15,6 +17,8 @@ import org.springframework.web.bind.annotation.ResponseBody; ...@@ -15,6 +17,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
@ResponseBody @ResponseBody
public class WebExceptionHandler { public class WebExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(WebExceptionHandler.class);
@ExceptionHandler @ExceptionHandler
public Result busException(BusException e) { public Result busException(BusException e) {
return Result.failed(e.getMessage()); return Result.failed(e.getMessage());
...@@ -22,6 +26,7 @@ public class WebExceptionHandler { ...@@ -22,6 +26,7 @@ public class WebExceptionHandler {
@ExceptionHandler @ExceptionHandler
public Result unknownException(Exception e) { public Result unknownException(Exception e) {
return Result.failed("系统出现错误, 请联系网站管理员!"); logger.error("ERROR:",e);
return Result.failed("系统出现错误, 请联系平台管理员!");
} }
} }
package com.dlink.cdc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CustomerDeserialization
*
* @author wenmo
* @since 2022/1/29 22:16
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
Map result = new HashMap<String,String>();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Map source = new HashMap<String,String>();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
Map beforeJson = new HashMap<String,String>();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
Map afterJson = new HashMap<String,String>();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(mapper.writeValueAsString(result));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
...@@ -5,6 +5,7 @@ import com.dlink.model.FlinkCDCConfig; ...@@ -5,6 +5,7 @@ import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
...@@ -37,9 +38,8 @@ public class FlinkCDCMergeBuilder { ...@@ -37,9 +38,8 @@ public class FlinkCDCMergeBuilder {
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){ if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0])); sourceBuilder.tableList(config.getTable().toArray(new String[0]));
} }
MySqlSource<String> sourceFunction = sourceBuilder.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式 MySqlSource<String> sourceFunction = sourceBuilder
// .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .deserializer(new JsonDebeziumDeserializationSchema())
// .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest()) .startupOptions(StartupOptions.latest())
.build(); .build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source"); DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
......
package com.dlink.cdc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CustomerDeserialization
*
* @author wenmo
* @since 2022/1/29 22:16
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
Map result = new HashMap<String,String>();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Map source = new HashMap<String,String>();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
Map beforeJson = new HashMap<String,String>();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
Map afterJson = new HashMap<String,String>();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(mapper.writeValueAsString(result));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
...@@ -5,6 +5,7 @@ import com.dlink.model.FlinkCDCConfig; ...@@ -5,6 +5,7 @@ import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
...@@ -37,9 +38,8 @@ public class FlinkCDCMergeBuilder { ...@@ -37,9 +38,8 @@ public class FlinkCDCMergeBuilder {
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){ if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0])); sourceBuilder.tableList(config.getTable().toArray(new String[0]));
} }
MySqlSource<String> sourceFunction = sourceBuilder.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式 MySqlSource<String> sourceFunction = sourceBuilder
// .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .deserializer(new JsonDebeziumDeserializationSchema())
// .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest()) .startupOptions(StartupOptions.latest())
.build(); .build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source"); DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
......
package com.dlink.cdc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CustomerDeserialization
*
* @author wenmo
* @since 2022/1/29 22:16
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
Map result = new HashMap<String,String>();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Map source = new HashMap<String,String>();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
Map beforeJson = new HashMap<String,String>();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
Map afterJson = new HashMap<String,String>();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(mapper.writeValueAsString(result));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
...@@ -5,6 +5,7 @@ import com.dlink.model.FlinkCDCConfig; ...@@ -5,6 +5,7 @@ import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
...@@ -37,9 +38,8 @@ public class FlinkCDCMergeBuilder { ...@@ -37,9 +38,8 @@ public class FlinkCDCMergeBuilder {
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){ if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0])); sourceBuilder.tableList(config.getTable().toArray(new String[0]));
} }
MySqlSource<String> sourceFunction = sourceBuilder.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式 MySqlSource<String> sourceFunction = sourceBuilder
// .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .deserializer(new JsonDebeziumDeserializationSchema())
// .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest()) .startupOptions(StartupOptions.latest())
.build(); .build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source"); DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
......
package com.dlink.cdc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CustomerDeserialization
*
* @author wenmo
* @since 2022/1/29 22:16
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
Map result = new HashMap<String,String>();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Map source = new HashMap<String,String>();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
Map beforeJson = new HashMap<String,String>();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
Map afterJson = new HashMap<String,String>();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(mapper.writeValueAsString(result));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
...@@ -5,6 +5,7 @@ import com.dlink.model.FlinkCDCConfig; ...@@ -5,6 +5,7 @@ import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
...@@ -37,9 +38,8 @@ public class FlinkCDCMergeBuilder { ...@@ -37,9 +38,8 @@ public class FlinkCDCMergeBuilder {
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){ if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0])); sourceBuilder.tableList(config.getTable().toArray(new String[0]));
} }
MySqlSource<String> sourceFunction = sourceBuilder.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式 MySqlSource<String> sourceFunction = sourceBuilder
// .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .deserializer(new JsonDebeziumDeserializationSchema())
// .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest()) .startupOptions(StartupOptions.latest())
.build(); .build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source"); DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment