Unverified Commit 30202a79 authored by 谢帮桂's avatar 谢帮桂 Committed by GitHub

Create Flink_CDC_kafka_Multi_source_merger.md

parent 800b7d30
# Flink CDC和kafka进行多源合并和下游同步更新 # Flink CDC和kafka进行多源合并和下游同步更新
---
编辑:谢帮桂 编辑:谢帮桂
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
# 前言 # 前言
---
本文主要是针对 Flink SQL 使用 Flink CDC 无法实现多库多表的多源合并问题,以及多源合并后如何对下游 Kafka 同步更新的问题,因为目前 Flink SQL 也只能进行单表 Flink CDC 的作业操作,这会导致数据库 CDC 的连接数过多。 本文主要是针对 Flink SQL 使用 Flink CDC 无法实现多库多表的多源合并问题,以及多源合并后如何对下游 Kafka 同步更新的问题,因为目前 Flink SQL 也只能进行单表 Flink CDC 的作业操作,这会导致数据库 CDC 的连接数过多。
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
# 环境 # 环境
---
|Flink|1.13.3| |Flink|1.13.3|
...@@ -44,12 +44,14 @@ ConnectRecord{topic='mysql_binlog_source.gmall.spu_info', kafkaPartition=null, k ...@@ -44,12 +44,14 @@ ConnectRecord{topic='mysql_binlog_source.gmall.spu_info', kafkaPartition=null, k
# 查看文档 # 查看文档
---
图1
图2 ![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLhNR.png)
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLfE9.png)
我们可以看到红框部分,基于 Debezium 格式的 json 可以在 Kafka connector 建表中可以实现表的 CRUD 同步操作。只要总线 Kafka 的 json 格式符合该模式就可以对下游 kafka 进行 CRUD 的同步更新,刚好 Flink CDC 也是基于 Debezium。 我们可以看到红框部分,基于 Debezium 格式的 json 可以在 Kafka connector 建表中可以实现表的 CRUD 同步操作。只要总线 Kafka 的 json 格式符合该模式就可以对下游 kafka 进行 CRUD 的同步更新,刚好 Flink CDC 也是基于 Debezium。
...@@ -60,7 +62,7 @@ ConnectRecord{topic='mysql_binlog_source.gmall.spu_info', kafkaPartition=null, k ...@@ -60,7 +62,7 @@ ConnectRecord{topic='mysql_binlog_source.gmall.spu_info', kafkaPartition=null, k
再往下翻文档: 再往下翻文档:
图3 ![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLRHJ.png)
可以看到,基于 Debezium-json 格式,可以把上面的 schema 定义的 json 格式的元数据给取出来放在字段里。 可以看到,基于 Debezium-json 格式,可以把上面的 schema 定义的 json 格式的元数据给取出来放在字段里。
...@@ -99,7 +101,7 @@ select * from Kafka_Table where origin_database='gmall' and origin_table = 'spu_ ...@@ -99,7 +101,7 @@ select * from Kafka_Table where origin_database='gmall' and origin_table = 'spu_
# 新建Flink CDC的dataStream项目 # 新建Flink CDC的dataStream项目
---
```java ```java
...@@ -162,7 +164,7 @@ public class FlinkCDC { ...@@ -162,7 +164,7 @@ public class FlinkCDC {
# 自定义序列化类 # 自定义序列化类
---
```java ```java
...@@ -286,19 +288,19 @@ PS:没放 schema{}这个对象,看文档说加了识别会影响效率。 ...@@ -286,19 +288,19 @@ PS:没放 schema{}这个对象,看文档说加了识别会影响效率。
# 总线 Kafka # 总线 Kafka
---
图4
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSL441.png)
# Dinky 里面进行建表,提交作业 # Dinky 里面进行建表,提交作业
---
图5
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSL2B4.png)
PS:yarn-session 模式,记得开启预览结果和打印流,不然观察不到数据 changelog PS:yarn-session 模式,记得开启预览结果和打印流,不然观察不到数据 changelog
...@@ -306,28 +308,28 @@ PS:yarn-session 模式,记得开启预览结果和打印流,不然观察 ...@@ -306,28 +308,28 @@ PS:yarn-session 模式,记得开启预览结果和打印流,不然观察
# 查看结果 # 查看结果
---
图6
图7 ![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLI9x.png)
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLo36.png)
可以看到在指定库和表中新增一条数据,在下游 kafka 作业中实现了同步更新,然后试试对数据库该表的记录进行 delete,效果如下: 可以看到在指定库和表中新增一条数据,在下游 kafka 作业中实现了同步更新,然后试试对数据库该表的记录进行 delete,效果如下:
图8 ![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLTgK.png)
可以看到"是是是.."这条记录同步删除了。 可以看到"是是是.."这条记录同步删除了。
此时 Flink CDC 的记录是这样: 此时 Flink CDC 的记录是这样:
图9 ![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSL7jO.png)
原理主要是 op 去同步下游 kafka 的 changeLog 里的 op 原理主要是 op 去同步下游 kafka 的 changeLog 里的 op
我们浏览一下 changeLog:(Dinky 选中打印流即可) 我们浏览一下 changeLog:(Dinky 选中打印流即可)
图10 ![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLbuD.png)
可以看到,op 自动识别总线 kafka 发来的 JSON 进行了同步来记录操作。 可以看到,op 自动识别总线 kafka 发来的 JSON 进行了同步来记录操作。
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment