Unverified Commit c8aa3d12 authored by 第一片心意's avatar 第一片心意 Committed by GitHub

[bug and doc] Fix yarn per job resource release (#863)

* [文档]:
1. 补充部署细节。
2. 增加一些 flink sql 案例。

* [bug修复]:
修复 yarn per job 、 batch 模式下任务完成后,yarn 不自动释放 flink JM 角色资源的问题,任务以 detached 模式运行。
parent 8efccada
...@@ -70,7 +70,7 @@ public class YarnPerJobGateway extends YarnGateway { ...@@ -70,7 +70,7 @@ public class YarnPerJobGateway extends YarnGateway {
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true); configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try { try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, false); ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId(); ApplicationId applicationId = clusterClient.getClusterId();
result.setAppId(applicationId.toString()); result.setAppId(applicationId.toString());
......
...@@ -11,7 +11,7 @@ title: 部署 ...@@ -11,7 +11,7 @@ title: 部署
Dinky 不依赖任何外部的 Hadoop 或者 Flink 环境,可以单独部署在 flink、 hadoop 和 K8S 集群之外,完全解耦,支持同时连接多个不同的集群实例进行运维。 Dinky 不依赖任何外部的 Hadoop 或者 Flink 环境,可以单独部署在 flink、 hadoop 和 K8S 集群之外,完全解耦,支持同时连接多个不同的集群实例进行运维。
``` ```shell
tar -zxvf dlink-release-{version}.tar.gz tar -zxvf dlink-release-{version}.tar.gz
mv dlink-release-{version} dlink mv dlink-release-{version} dlink
cd dlink cd dlink
...@@ -21,7 +21,7 @@ cd dlink ...@@ -21,7 +21,7 @@ cd dlink
Dinky 采用 mysql 作为后端的存储库,mysql 支持 5.7+。这里假设你已经安装了 mysql 。首先需要创建 Dinky 的后端数据库,这里以配置文件中默认库创建。 Dinky 采用 mysql 作为后端的存储库,mysql 支持 5.7+。这里假设你已经安装了 mysql 。首先需要创建 Dinky 的后端数据库,这里以配置文件中默认库创建。
``` ```sql
#登录mysql #登录mysql
mysql -uroot -proot@123 mysql -uroot -proot@123
#授权并创建数据库 #授权并创建数据库
...@@ -33,20 +33,27 @@ mysql -h fdw1 -udlink -pdlink ...@@ -33,20 +33,27 @@ mysql -h fdw1 -udlink -pdlink
mysql> create database dlink; mysql> create database dlink;
``` ```
在 Dinky 根目录 sql 文件夹下有 2 个 sql 文件,分别是 dlink.sql 和 dlink_history.sql。如果第一次部署,可以直接将 dlink.sql 文件在 dlink 数据库下执行。(如果之前已经建立了 dlink 的数据库,那 dlink_history.sql 存放了各版本的升级 sql ,根据版本号及日期按需执行即可) 在 Dinky 根目录 sql 文件夹下有 3 个 sql 文件,分别是 dlink.sql 、 dlink_history.sql 和 dlinkmysqlcatalog.sql。如果第一次部署,可以直接将 dlink.sql 文件在 dlink 数据库下执行。(如果之前已经建立了 dlink 的数据库,那 dlink_history.sql 存放了各版本的升级 sql ,根据版本号及日期按需执行即可)
``` ```sql
#首先登录 mysql #首先登录 mysql
mysql -h fdw1 -udlink -pdlink mysql -h fdw1 -udlink -pdlink
mysql> use dlink; mysql> use dlink;
mysql> source /opt/dlink/sql/dlink.sql mysql> source /opt/dlink/sql/dlink.sql
``` ```
平台默认有两种 catalog 实现,一种是基于内存的,一种是基于平台 mysql 的,如果想要使用平台内置的 mysql catalog,
需要手动执行一下 dlinkmysqlcatalog.sql 脚本,以初始化平台内置 catalog 数据库表。
```sql
mysql> source /opt/dlink/sql/dlinkmysqlcatalog.sql
```
### 配置文件 ### 配置文件
创建好数据库后,修改 Dinky 连接 mysql 的配置文件。 创建好数据库后,修改 Dinky 连接 mysql 的配置文件。
``` ```shell
#切换目录 #切换目录
cd /opt/dlink/config/ cd /opt/dlink/config/
vim application.yml vim application.yml
...@@ -56,7 +63,7 @@ vim application.yml ...@@ -56,7 +63,7 @@ vim application.yml
在 linux,首先要配置好相应的 yum 库,因为在安装过程中没有配置,这里可以大概讲述下步骤,可以选择连接网络或者本地 yum 源都可以,这里选择连接网络方式配置。 在 linux,首先要配置好相应的 yum 库,因为在安装过程中没有配置,这里可以大概讲述下步骤,可以选择连接网络或者本地 yum 源都可以,这里选择连接网络方式配置。
``` ```shell
#下载yum源 #下载yum源
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
#清除缓存 #清除缓存
...@@ -78,7 +85,7 @@ ps -ef|grep nginx ...@@ -78,7 +85,7 @@ ps -ef|grep nginx
如果是 yum 源安装的 nginx,配置文件在 etc 下,如果是源码包安装,请自行找到配置文件 如果是 yum 源安装的 nginx,配置文件在 etc 下,如果是源码包安装,请自行找到配置文件
``` ```shell
#切换到nginx配置目录 #切换到nginx配置目录
cd /etc/nginx/ cd /etc/nginx/
``` ```
...@@ -130,7 +137,7 @@ vim /etc/nginx/nginx.conf 打开配置文件,修改 server 中的内容,其 ...@@ -130,7 +137,7 @@ vim /etc/nginx/nginx.conf 打开配置文件,修改 server 中的内容,其
配置完成后,保存退出。并重启 nginx 并重新加载生效 配置完成后,保存退出。并重启 nginx 并重新加载生效
``` ```shell
$systemctl restart nginx.service $systemctl restart nginx.service
$systemctl reload nginx.service $systemctl reload nginx.service
#查看nginx是否配置成功 #查看nginx是否配置成功
...@@ -147,12 +154,14 @@ Dinky 当前版本的 yarn 的 perjob 与 application 执行模式依赖 flink-s ...@@ -147,12 +154,14 @@ Dinky 当前版本的 yarn 的 perjob 与 application 执行模式依赖 flink-s
当然如果你的 Hadoop 为 3+ 也可以自行编译对于版本的 dlink-client-hadoop.jar 以替代 uber 包, 当然如果你的 Hadoop 为 3+ 也可以自行编译对于版本的 dlink-client-hadoop.jar 以替代 uber 包,
::: :::
``` ```shell
#创建目录 #创建目录
cd /opt/dlink/ cd /opt/dlink/
mkdir plugins mkdir plugins
``` ```
将 flink-shade-hadoop 上传到到 plugins 文件目录下,使用 flink-shade-hadoop-3 地址如下: 将 flink-shade-hadoop 上传到到 plugins 文件目录下,使用 flink-shade-hadoop-3 地址如下:
``` ```
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber?repo=cloudera-repos https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber?repo=cloudera-repos
``` ```
...@@ -161,7 +170,7 @@ https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber?r ...@@ -161,7 +170,7 @@ https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber?r
最终项目根目录如下,仅供参考: 最终项目根目录如下,仅供参考:
```shell ```
config/ -- 配置文件 config/ -- 配置文件
|- application.yml |- application.yml
extends/ -- 扩展 extends/ -- 扩展
...@@ -184,7 +193,7 @@ lib/ -- 内部组件 ...@@ -184,7 +193,7 @@ lib/ -- 内部组件
|- dlink-alert-feishu.jar |- dlink-alert-feishu.jar
|- dlink-alert-wechat.jar |- dlink-alert-wechat.jar
|- dlink-client-1.13.jar -- 适配 Flink1.13.x,默认 |- dlink-client-1.13.jar -- 适配 Flink1.13.x,默认
|- dlink-catalog-mysql.jar -- dlink 的 catalog实现 |- dlink-catalog-mysql.jar -- dlink 的 catalog 实现
|- dlink-connector-jdbc.jar |- dlink-connector-jdbc.jar
|- dlink-function.jar |- dlink-function.jar
|- dlink-metadata-clickhouse.jar |- dlink-metadata-clickhouse.jar
...@@ -213,9 +222,13 @@ auto.sh --启动停止脚本 ...@@ -213,9 +222,13 @@ auto.sh --启动停止脚本
dlink-admin.jar --主程序包 dlink-admin.jar --主程序包
``` ```
#### flink 版本适配
lib 目录下默认的 flink-client 版本为 **1.13** ,如果你配置的 flink 版本不是 **1.13**,则需要删除 lib 目录下的 flink-client 包,然后从 extends 目录下找到合适的包,拷贝到 lib 目录下。
### 启动 Dinky ### 启动 Dinky
``` ```shell
#启动 #启动
$sh auto.sh start $sh auto.sh start
#停止 #停止
...@@ -229,12 +242,11 @@ $sh auto.sh status ...@@ -229,12 +242,11 @@ $sh auto.sh status
默认用户名/密码: admin/admin 默认用户名/密码: admin/admin
:::tip 说明 :::tip 说明
Dinky 部署需要 MySQL5.7 以上版本 Dinky 部署需要 MySQL5.7 以上版本
Dinky 不依赖于 Nginx, Nginx 可选 Dinky 不依赖于 Nginx, Nginx 可选
::: :::
---
sidebar_position: 17
id: cross_join
title: cross join
---
## 列转行
也就是将数组展开,一行变多行,使用到 `cross join unnest()` 语句。
读取 hive 表数据,然后写入 hive 表。
### source
`source_table` 表信息如下
```sql
CREATE TABLE `test.source_table`(
`col1` string,
`col2` array<string> COMMENT '数组类型的字段')
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://hadoopCluster/user/hive/warehouse/test.db/source_table'
TBLPROPERTIES (
'transient_lastDdlTime'='1659261419')
;
```
`source_table` 表数据如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/hive_to_hive_explode_source_table_data.png)
### sink
`sink_table` 表信息如下
```sql
CREATE TABLE `test.sink_table`(
`col1` string,
`col2` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://hadoopCluster/user/hive/warehouse/test.db/sink_table'
TBLPROPERTIES (
'transient_lastDdlTime'='1659261915')
;
```
`sink_table` 表数据如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/hive_to_hive_explode_sink_table_data.png)
### flink sql 语句
下面将使用两种方言演示如何将数组中的数据展开
#### 使用flink方言
```sql
set 'table.local-time-zone' = 'GMT+08:00';
-- 在需要读取hive或者是写入hive表时,必须创建hive catalog。
-- 创建catalog
create catalog hive with (
'type' = 'hive',
'hadoop-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf',
'hive-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf'
)
;
use catalog hive;
insert overwrite test.sink_table
select col1, a.col
from test.source_table
cross join unnest(col2) as a (col)
;
```
#### 使用hive方言
```sql
set 'table.local-time-zone' = 'GMT+08:00';
-- 在需要读取hive或者是写入hive表时,必须创建hive catalog。
-- 创建catalog
create catalog hive with (
'type' = 'hive',
'hadoop-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf',
'hive-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf'
)
;
use catalog hive;
load module hive;
set 'table.sql-dialect' = 'hive';
insert overwrite table test.sink_table
select col1, a.col
from test.source_table
lateral view explode(col2) a as col
;
```
---
sidebar_position: 14
id: kafka_to_hive
title: kafka写入hive
---
## 写入无分区表
下面的案例演示的是将 kafka 表中的数据,经过处理之后,直接写入 hive 无分区表,具体 hive 表中的数据什么时候可见,具体请查看 `insert` 语句中对 hive 表使用的 sql 提示。
### hive 表
```sql
CREATE TABLE `test.order_info`(
`id` int COMMENT '订单id',
`product_count` int COMMENT '购买商品数量',
`one_price` double COMMENT '单个商品价格')
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://hadoopCluster/user/hive/warehouse/test.db/order_info'
TBLPROPERTIES (
'transient_lastDdlTime'='1659250044')
;
```
### flink sql 语句
```sql
-- 如果是 flink-1.13.x ,则需要手动设置该参数
set 'table.dynamic-table-options.enabled' = 'true';
-- 在需要读取hive或者是写入hive表时,必须创建hive catalog。
-- 创建catalog
create catalog hive with (
'type' = 'hive',
'hadoop-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf',
'hive-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf'
)
;
use catalog hive;
-- 创建连接 kafka 的虚拟表作为 source,此处使用 temporary ,是为了不让创建的虚拟表元数据保存到 hive,可以让任务重启是不出错。
-- 如果想让虚拟表元数据保存到 hive ,则可以在创建语句中加入 if not exist 语句。
CREATE temporary TABLE source_kafka(
id integer comment '订单id',
product_count integer comment '购买商品数量',
one_price double comment '单个商品价格'
) WITH (
'connector' = 'kafka',
'topic' = 'data_gen_source',
'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',
'properties.group.id' = 'for_source_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'csv.field-delimiter' = ' '
)
;
insert into test.order_info
-- 下面的语法是 flink sql 提示,用于在语句中使用到表时手动设置一些临时的参数
/*+
OPTIONS(
-- 设置写入的文件滚动时间间隔
'sink.rolling-policy.rollover-interval' = '10 s',
-- 设置检查文件是否需要滚动的时间间隔
'sink.rolling-policy.check-interval' = '1 s',
-- sink 并行度
'sink.parallelism' = '1'
)
*/
select id, product_count, one_price
from source_kafka
;
```
flink sql 写入 hive ,依赖的是 fileSystem 连接器,该连接器写入到文件系统的文件可见性,依赖于 flink 任务的 checkpoint ,
所以 dlink 界面提交任务时,一定要开启 checkpoint ,也就是设置 checkpoint 的时间间隔参数 `execution.checkpointing.interval` ,如下图所示,设置为 10000 毫秒。
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_dlink_ui.jpg)
任务运行之后,就可以看到如下的 fink ui 界面了
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_flink_ui.png)
本案例使用 streaming 方式运行, checkpoint 时间为 10 s,文件滚动时间为 10 s,在配置的时间过后,就可以看到 hive 中的数据了
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_hive_data.png)
从 hdfs 上查看 hive 表对应文件的数据,如下图所示
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_hive_table_hdfs_file.png)
可以看到,1 分钟滚动生成了 6 个文件,最新文件为 .part 开头的文件,在 hdfs 中,以 `.` 开头的文件,是不可见的,说明这个文件是由于我关闭了 flink sql 任务,然后文件无法滚动造成的。
有关读写 hive 的一些配置和读写 hive 表时其数据的可见性,可以看考[读写hive](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_read_write/)页面。
## 写入分区表
### hive 表
```sql
CREATE TABLE `test.order_info_have_partition`(
`product_count` int COMMENT '购买商品数量',
`one_price` double COMMENT '单个商品价格')
PARTITIONED BY (
`minute` string COMMENT '订单时间,分钟级别',
`order_id` int COMMENT '订单id')
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://hadoopCluster/user/hive/warehouse/test.db/order_info_have_partition'
TBLPROPERTIES (
'transient_lastDdlTime'='1659254559')
;
```
### flink sql 语句
```sql
-- 如果是 flink-1.13.x ,则需要手动设置该参数
set 'table.dynamic-table-options.enabled' = 'true';
-- 在需要读取hive或者是写入hive表时,必须创建hive catalog。
-- 创建catalog
create catalog hive with (
'type' = 'hive',
'hadoop-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf',
'hive-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf'
)
;
use catalog hive;
-- 创建连接 kafka 的虚拟表作为 source,此处使用 temporary ,是为了不让创建的虚拟表元数据保存到 hive,可以让任务重启是不出错。
-- 如果想让虚拟表元数据保存到 hive ,则可以在创建语句中加入 if not exist 语句。
CREATE temporary TABLE source_kafka(
event_time TIMESTAMP(3) METADATA FROM 'timestamp',
id integer comment '订单id',
product_count integer comment '购买商品数量',
one_price double comment '单个商品价格'
) WITH (
'connector' = 'kafka',
'topic' = 'data_gen_source',
'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',
'properties.group.id' = 'for_source_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'csv.field-delimiter' = ' '
)
;
insert into test.order_info_have_partition
-- 下面的语法是 flink sql 提示,用于在语句中使用到表时手动设置一些临时的参数
/*+
OPTIONS(
-- 设置分区提交触发器为分区时间
'sink.partition-commit.trigger' = 'partition-time',
-- 'partition.time-extractor.timestamp-pattern' = '$year-$month-$day $hour:$minute',
-- 设置时间提取器的时间格式,要和分区字段值的格式保持一直
'partition.time-extractor.timestamp-formatter' = 'yyyy-MM-dd_HH:mm',
-- 设置分区提交延迟时间,这儿设置 1 分钟,是因为分区时间为 1 分钟间隔
'sink.partition-commit.delay' = '1 m',
-- 设置水印时区
'sink.partition-commit.watermark-time-zone' = 'GMT+08:00',
-- 设置分区提交策略,这儿是将分区提交到元数据存储,并且在分区目录下生成 success 文件
'sink.partition-commit.policy.kind' = 'metastore,success-file',
-- sink 并行度
'sink.parallelism' = '1'
)
*/
select
product_count,
one_price,
-- 不要让分区值中带有空格,分区值最后会变成目录名,有空格的话,可能会有一些未知问题
date_format(event_time, 'yyyy-MM-dd_HH:mm') as `minute`,
id as order_id
from source_kafka
;
```
flink sql 任务运行的 UI 界面如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_partition_table_flink_ui.png)
1 分钟之后查看 hive 表中数据,如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_partition_table_data.png)
查看 hive 表对应 hdfs 上的文件,可以看到
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_partition_table_hdfs_file.png)
从上图可以看到,具体的分区目录下生成了 `_SUCCESS` 文件,表示该分区提交成功。
---
sidebar_position: 15
id: lookup_join
title: lookup join
---
## lookup join
该例中,将 mysql 表作为维表,里面保存订单信息,之后去关联订单流水表,最后输出完整的订单流水信息数据到 kafka。
### kafka 主题 (order_water)
订单流水表读取的是 kafka `order_water` 主题中的数据,数据内容如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_order_water_data.png)
### mysql 表 (dim.order_info)
**表结构**
```sql
CREATE TABLE `order_info` (
`id` int(11) NOT NULL COMMENT '订单id',
`user_name` varchar(50) DEFAULT NULL COMMENT '订单所属用户',
`order_source` varchar(50) DEFAULT NULL COMMENT '订单所属来源',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
```
**数据**
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_mysql_data1.png)
### flink sql 语句
```sql
set 'table.local-time-zone' = 'GMT+08:00';
-- 订单流水
CREATE temporary TABLE order_flow(
id int comment '订单id',
product_count int comment '购买商品数量',
one_price double comment '单个商品价格',
-- 一定要添加处理时间字段,lookup join 需要该字段
proc_time as proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'order_water',
'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',
'properties.group.id' = 'for_source_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'csv.field-delimiter' = ' '
)
;
-- 订单信息
create table order_info (
id int PRIMARY KEY NOT ENFORCED comment '订单id',
user_name string comment '订单所属用户',
order_source string comment '订单所属来源'
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://node01:3306/dim?useSSL=false',
'table-name' = 'order_info',
'username' = 'root',
'password' = 'root'
)
;
-- 创建连接 kafka 的虚拟表作为 sink
create table sink_kafka(
id int PRIMARY KEY NOT ENFORCED comment '订单id',
user_name string comment '订单所属用户',
order_source string comment '订单所属来源',
product_count int comment '购买商品数量',
one_price double comment '单个商品价格',
total_price double comment '总价格'
) with (
'connector' = 'upsert-kafka',
'topic' = 'for_sink',
'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',
'key.format' = 'csv',
'value.format' = 'csv',
'value.csv.field-delimiter' = ' '
)
;
-- 真正要执行的任务
insert into sink_kafka
select
a.id,
b.user_name,
b.order_source,
a.product_count,
a.one_price,
a.product_count * a.one_price as total_price
from order_flow as a
-- 一定要添加 for system_time as of 语句,否则读取 mysql 的子任务会被认为是有界流,只读取一次,之后 mysql 维表中变化后的数据无法被读取
left join order_info for system_time as of a.proc_time as b
on a.id = b.id
;
```
flink sql 任务运行之后,flink UI 界面显示为
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_flink_ui.png)
最后查看写入 kafka 中的数据为
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_sink_kafka_data1.png)
此时,修改 mysql 中的数据,修改之后为
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_mysql_data2.png)
再查看写入 kafka 中的数据为
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_sink_kafka_data2.png)
**其他**
如果 kafka 中的订单流数据中的某个订单 id 在维表 mysql 中找不到,而且 flink sql 任务中使用的是 left join 连接,
则匹配不到的订单中的 user_name 和 product_count 字段将为空字符串,具体如下图所示
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_sink_kafka_data3.png)
\ No newline at end of file
This diff is collapsed.
...@@ -11,7 +11,7 @@ title: 部署 ...@@ -11,7 +11,7 @@ title: 部署
Dinky 不依赖任何外部的 Hadoop 或者 Flink 环境,可以单独部署在 flink、 hadoop 和 K8S 集群之外,完全解耦,支持同时连接多个不同的集群实例进行运维。 Dinky 不依赖任何外部的 Hadoop 或者 Flink 环境,可以单独部署在 flink、 hadoop 和 K8S 集群之外,完全解耦,支持同时连接多个不同的集群实例进行运维。
``` ```shell
tar -zxvf dlink-release-{version}.tar.gz tar -zxvf dlink-release-{version}.tar.gz
mv dlink-release-{version} dlink mv dlink-release-{version} dlink
cd dlink cd dlink
...@@ -21,7 +21,7 @@ cd dlink ...@@ -21,7 +21,7 @@ cd dlink
Dinky 采用 mysql 作为后端的存储库,mysql 支持 5.7+。这里假设你已经安装了 mysql 。首先需要创建 Dinky 的后端数据库,这里以配置文件中默认库创建。 Dinky 采用 mysql 作为后端的存储库,mysql 支持 5.7+。这里假设你已经安装了 mysql 。首先需要创建 Dinky 的后端数据库,这里以配置文件中默认库创建。
``` ```sql
#登录mysql #登录mysql
mysql -uroot -proot@123 mysql -uroot -proot@123
#授权并创建数据库 #授权并创建数据库
...@@ -33,20 +33,27 @@ mysql -h fdw1 -udlink -pdlink ...@@ -33,20 +33,27 @@ mysql -h fdw1 -udlink -pdlink
mysql> create database dlink; mysql> create database dlink;
``` ```
在 Dinky 根目录 sql 文件夹下有 2 个 sql 文件,分别是 dlink.sql 和 dlink_history.sql。如果第一次部署,可以直接将 dlink.sql 文件在 dlink 数据库下执行。(如果之前已经建立了 dlink 的数据库,那 dlink_history.sql 存放了各版本的升级 sql ,根据版本号及日期按需执行即可) 在 Dinky 根目录 sql 文件夹下有 3 个 sql 文件,分别是 dlink.sql 、 dlink_history.sql 和 dlinkmysqlcatalog.sql。如果第一次部署,可以直接将 dlink.sql 文件在 dlink 数据库下执行。(如果之前已经建立了 dlink 的数据库,那 dlink_history.sql 存放了各版本的升级 sql ,根据版本号及日期按需执行即可)
``` ```sql
#首先登录 mysql #首先登录 mysql
mysql -h fdw1 -udlink -pdlink mysql -h fdw1 -udlink -pdlink
mysql> use dlink; mysql> use dlink;
mysql> source /opt/dlink/sql/dlink.sql mysql> source /opt/dlink/sql/dlink.sql
``` ```
平台默认有两种 catalog 实现,一种是基于内存的,一种是基于平台 mysql 的,如果想要使用平台内置的 mysql catalog,
需要手动执行一下 dlinkmysqlcatalog.sql 脚本,以初始化平台内置 catalog 数据库表。
```sql
mysql> source /opt/dlink/sql/dlinkmysqlcatalog.sql
```
### 配置文件 ### 配置文件
创建好数据库后,修改 Dinky 连接 mysql 的配置文件。 创建好数据库后,修改 Dinky 连接 mysql 的配置文件。
``` ```shell
#切换目录 #切换目录
cd /opt/dlink/config/ cd /opt/dlink/config/
vim application.yml vim application.yml
...@@ -56,7 +63,7 @@ vim application.yml ...@@ -56,7 +63,7 @@ vim application.yml
在 linux,首先要配置好相应的 yum 库,因为在安装过程中没有配置,这里可以大概讲述下步骤,可以选择连接网络或者本地 yum 源都可以,这里选择连接网络方式配置。 在 linux,首先要配置好相应的 yum 库,因为在安装过程中没有配置,这里可以大概讲述下步骤,可以选择连接网络或者本地 yum 源都可以,这里选择连接网络方式配置。
``` ```shell
#下载yum源 #下载yum源
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
#清除缓存 #清除缓存
...@@ -78,7 +85,7 @@ ps -ef|grep nginx ...@@ -78,7 +85,7 @@ ps -ef|grep nginx
如果是 yum 源安装的 nginx,配置文件在 etc 下,如果是源码包安装,请自行找到配置文件 如果是 yum 源安装的 nginx,配置文件在 etc 下,如果是源码包安装,请自行找到配置文件
``` ```shell
#切换到nginx配置目录 #切换到nginx配置目录
cd /etc/nginx/ cd /etc/nginx/
``` ```
...@@ -130,7 +137,7 @@ vim /etc/nginx/nginx.conf 打开配置文件,修改 server 中的内容,其 ...@@ -130,7 +137,7 @@ vim /etc/nginx/nginx.conf 打开配置文件,修改 server 中的内容,其
配置完成后,保存退出。并重启 nginx 并重新加载生效 配置完成后,保存退出。并重启 nginx 并重新加载生效
``` ```shell
$systemctl restart nginx.service $systemctl restart nginx.service
$systemctl reload nginx.service $systemctl reload nginx.service
#查看nginx是否配置成功 #查看nginx是否配置成功
...@@ -147,12 +154,14 @@ Dinky 当前版本的 yarn 的 perjob 与 application 执行模式依赖 flink-s ...@@ -147,12 +154,14 @@ Dinky 当前版本的 yarn 的 perjob 与 application 执行模式依赖 flink-s
当然如果你的 Hadoop 为 3+ 也可以自行编译对于版本的 dlink-client-hadoop.jar 以替代 uber 包, 当然如果你的 Hadoop 为 3+ 也可以自行编译对于版本的 dlink-client-hadoop.jar 以替代 uber 包,
::: :::
``` ```shell
#创建目录 #创建目录
cd /opt/dlink/ cd /opt/dlink/
mkdir plugins mkdir plugins
``` ```
将 flink-shade-hadoop 上传到到 plugins 文件目录下,使用 flink-shade-hadoop-3 地址如下: 将 flink-shade-hadoop 上传到到 plugins 文件目录下,使用 flink-shade-hadoop-3 地址如下:
``` ```
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber?repo=cloudera-repos https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber?repo=cloudera-repos
``` ```
...@@ -161,7 +170,7 @@ https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber?r ...@@ -161,7 +170,7 @@ https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber?r
最终项目根目录如下,仅供参考: 最终项目根目录如下,仅供参考:
```shell ```
config/ -- 配置文件 config/ -- 配置文件
|- application.yml |- application.yml
extends/ -- 扩展 extends/ -- 扩展
...@@ -213,9 +222,13 @@ auto.sh --启动停止脚本 ...@@ -213,9 +222,13 @@ auto.sh --启动停止脚本
dlink-admin.jar --主程序包 dlink-admin.jar --主程序包
``` ```
#### flink 版本适配
lib 目录下默认的 flink-client 版本为 **1.13** ,如果你配置的 flink 版本不是 **1.13**,则需要删除 lib 目录下的 flink-client 包,然后从 extends 目录下找到合适的包,拷贝到 lib 目录下。
### 启动 Dinky ### 启动 Dinky
``` ```shell
#启动 #启动
$sh auto.sh start $sh auto.sh start
#停止 #停止
......
---
sidebar_position: 17
id: cross_join
title: cross join
---
## 列转行
也就是将数组展开,一行变多行,使用到 `cross join unnest()` 语句。
读取 hive 表数据,然后写入 hive 表。
### source
`source_table` 表信息如下
```sql
CREATE TABLE `test.source_table`(
`col1` string,
`col2` array<string> COMMENT '数组类型的字段')
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://hadoopCluster/user/hive/warehouse/test.db/source_table'
TBLPROPERTIES (
'transient_lastDdlTime'='1659261419')
;
```
`source_table` 表数据如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/hive_to_hive_explode_source_table_data.png)
### sink
`sink_table` 表信息如下
```sql
CREATE TABLE `test.sink_table`(
`col1` string,
`col2` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://hadoopCluster/user/hive/warehouse/test.db/sink_table'
TBLPROPERTIES (
'transient_lastDdlTime'='1659261915')
;
```
`sink_table` 表数据如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/hive_to_hive_explode_sink_table_data.png)
### flink sql 语句
下面将使用两种方言演示如何将数组中的数据展开
#### 使用flink方言
```sql
set 'table.local-time-zone' = 'GMT+08:00';
-- 在需要读取hive或者是写入hive表时,必须创建hive catalog。
-- 创建catalog
create catalog hive with (
'type' = 'hive',
'hadoop-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf',
'hive-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf'
)
;
use catalog hive;
insert overwrite test.sink_table
select col1, a.col
from test.source_table
cross join unnest(col2) as a (col)
;
```
#### 使用hive方言
```sql
set 'table.local-time-zone' = 'GMT+08:00';
-- 在需要读取hive或者是写入hive表时,必须创建hive catalog。
-- 创建catalog
create catalog hive with (
'type' = 'hive',
'hadoop-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf',
'hive-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf'
)
;
use catalog hive;
load module hive;
set 'table.sql-dialect' = 'hive';
insert overwrite table test.sink_table
select col1, a.col
from test.source_table
lateral view explode(col2) a as col
;
```
---
sidebar_position: 14
id: kafka_to_hive
title: kafka写入hive
---
## 写入无分区表
下面的案例演示的是将 kafka 表中的数据,经过处理之后,直接写入 hive 无分区表,具体 hive 表中的数据什么时候可见,具体请查看 `insert` 语句中对 hive 表使用的 sql 提示。
### hive 表
```sql
CREATE TABLE `test.order_info`(
`id` int COMMENT '订单id',
`product_count` int COMMENT '购买商品数量',
`one_price` double COMMENT '单个商品价格')
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://hadoopCluster/user/hive/warehouse/test.db/order_info'
TBLPROPERTIES (
'transient_lastDdlTime'='1659250044')
;
```
### flink sql 语句
```sql
-- 如果是 flink-1.13.x ,则需要手动设置该参数
set 'table.dynamic-table-options.enabled' = 'true';
-- 在需要读取hive或者是写入hive表时,必须创建hive catalog。
-- 创建catalog
create catalog hive with (
'type' = 'hive',
'hadoop-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf',
'hive-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf'
)
;
use catalog hive;
-- 创建连接 kafka 的虚拟表作为 source,此处使用 temporary ,是为了不让创建的虚拟表元数据保存到 hive,可以让任务重启是不出错。
-- 如果想让虚拟表元数据保存到 hive ,则可以在创建语句中加入 if not exist 语句。
CREATE temporary TABLE source_kafka(
id integer comment '订单id',
product_count integer comment '购买商品数量',
one_price double comment '单个商品价格'
) WITH (
'connector' = 'kafka',
'topic' = 'data_gen_source',
'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',
'properties.group.id' = 'for_source_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'csv.field-delimiter' = ' '
)
;
insert into test.order_info
-- 下面的语法是 flink sql 提示,用于在语句中使用到表时手动设置一些临时的参数
/*+
OPTIONS(
-- 设置写入的文件滚动时间间隔
'sink.rolling-policy.rollover-interval' = '10 s',
-- 设置检查文件是否需要滚动的时间间隔
'sink.rolling-policy.check-interval' = '1 s',
-- sink 并行度
'sink.parallelism' = '1'
)
*/
select id, product_count, one_price
from source_kafka
;
```
flink sql 写入 hive ,依赖的是 fileSystem 连接器,该连接器写入到文件系统的文件可见性,依赖于 flink 任务的 checkpoint ,
所以 dlink 界面提交任务时,一定要开启 checkpoint ,也就是设置 checkpoint 的时间间隔参数 `execution.checkpointing.interval` ,如下图所示,设置为 10000 毫秒。
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_dlink_ui.jpg)
任务运行之后,就可以看到如下的 fink ui 界面了
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_flink_ui.png)
本案例使用 streaming 方式运行, checkpoint 时间为 10 s,文件滚动时间为 10 s,在配置的时间过后,就可以看到 hive 中的数据了
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_hive_data.png)
从 hdfs 上查看 hive 表对应文件的数据,如下图所示
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_hive_table_hdfs_file.png)
可以看到,1 分钟滚动生成了 6 个文件,最新文件为 .part 开头的文件,在 hdfs 中,以 `.` 开头的文件,是不可见的,说明这个文件是由于我关闭了 flink sql 任务,然后文件无法滚动造成的。
有关读写 hive 的一些配置和读写 hive 表时其数据的可见性,可以看考[读写hive](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_read_write/)页面。
## 写入分区表
### hive 表
```sql
CREATE TABLE `test.order_info_have_partition`(
`product_count` int COMMENT '购买商品数量',
`one_price` double COMMENT '单个商品价格')
PARTITIONED BY (
`minute` string COMMENT '订单时间,分钟级别',
`order_id` int COMMENT '订单id')
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://hadoopCluster/user/hive/warehouse/test.db/order_info_have_partition'
TBLPROPERTIES (
'transient_lastDdlTime'='1659254559')
;
```
### flink sql 语句
```sql
-- 如果是 flink-1.13.x ,则需要手动设置该参数
set 'table.dynamic-table-options.enabled' = 'true';
-- 在需要读取hive或者是写入hive表时,必须创建hive catalog。
-- 创建catalog
create catalog hive with (
'type' = 'hive',
'hadoop-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf',
'hive-conf-dir' = '/data/soft/dlink-0.6.6/hadoop-conf'
)
;
use catalog hive;
-- 创建连接 kafka 的虚拟表作为 source,此处使用 temporary ,是为了不让创建的虚拟表元数据保存到 hive,可以让任务重启是不出错。
-- 如果想让虚拟表元数据保存到 hive ,则可以在创建语句中加入 if not exist 语句。
CREATE temporary TABLE source_kafka(
event_time TIMESTAMP(3) METADATA FROM 'timestamp',
id integer comment '订单id',
product_count integer comment '购买商品数量',
one_price double comment '单个商品价格'
) WITH (
'connector' = 'kafka',
'topic' = 'data_gen_source',
'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',
'properties.group.id' = 'for_source_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'csv.field-delimiter' = ' '
)
;
insert into test.order_info_have_partition
-- 下面的语法是 flink sql 提示,用于在语句中使用到表时手动设置一些临时的参数
/*+
OPTIONS(
-- 设置分区提交触发器为分区时间
'sink.partition-commit.trigger' = 'partition-time',
-- 'partition.time-extractor.timestamp-pattern' = '$year-$month-$day $hour:$minute',
-- 设置时间提取器的时间格式,要和分区字段值的格式保持一直
'partition.time-extractor.timestamp-formatter' = 'yyyy-MM-dd_HH:mm',
-- 设置分区提交延迟时间,这儿设置 1 分钟,是因为分区时间为 1 分钟间隔
'sink.partition-commit.delay' = '1 m',
-- 设置水印时区
'sink.partition-commit.watermark-time-zone' = 'GMT+08:00',
-- 设置分区提交策略,这儿是将分区提交到元数据存储,并且在分区目录下生成 success 文件
'sink.partition-commit.policy.kind' = 'metastore,success-file',
-- sink 并行度
'sink.parallelism' = '1'
)
*/
select
product_count,
one_price,
-- 不要让分区值中带有空格,分区值最后会变成目录名,有空格的话,可能会有一些未知问题
date_format(event_time, 'yyyy-MM-dd_HH:mm') as `minute`,
id as order_id
from source_kafka
;
```
flink sql 任务运行的 UI 界面如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_partition_table_flink_ui.png)
1 分钟之后查看 hive 表中数据,如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_partition_table_data.png)
查看 hive 表对应 hdfs 上的文件,可以看到
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_to_hive_partition_table_hdfs_file.png)
从上图可以看到,具体的分区目录下生成了 `_SUCCESS` 文件,表示该分区提交成功。
---
sidebar_position: 15
id: lookup_join
title: lookup join
---
## lookup join
该例中,将 mysql 表作为维表,里面保存订单信息,之后去关联订单流水表,最后输出完整的订单流水信息数据到 kafka。
### kafka 主题 (order_water)
订单流水表读取的是 kafka `order_water` 主题中的数据,数据内容如下
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_order_water_data.png)
### mysql 表 (dim.order_info)
**表结构**
```sql
CREATE TABLE `order_info` (
`id` int(11) NOT NULL COMMENT '订单id',
`user_name` varchar(50) DEFAULT NULL COMMENT '订单所属用户',
`order_source` varchar(50) DEFAULT NULL COMMENT '订单所属来源',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
```
**数据**
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_mysql_data1.png)
### flink sql 语句
```sql
set 'table.local-time-zone' = 'GMT+08:00';
-- 订单流水
CREATE temporary TABLE order_flow(
id int comment '订单id',
product_count int comment '购买商品数量',
one_price double comment '单个商品价格',
-- 一定要添加处理时间字段,lookup join 需要该字段
proc_time as proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'order_water',
'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',
'properties.group.id' = 'for_source_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'csv.field-delimiter' = ' '
)
;
-- 订单信息
create table order_info (
id int PRIMARY KEY NOT ENFORCED comment '订单id',
user_name string comment '订单所属用户',
order_source string comment '订单所属来源'
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://node01:3306/dim?useSSL=false',
'table-name' = 'order_info',
'username' = 'root',
'password' = 'root'
)
;
-- 创建连接 kafka 的虚拟表作为 sink
create table sink_kafka(
id int PRIMARY KEY NOT ENFORCED comment '订单id',
user_name string comment '订单所属用户',
order_source string comment '订单所属来源',
product_count int comment '购买商品数量',
one_price double comment '单个商品价格',
total_price double comment '总价格'
) with (
'connector' = 'upsert-kafka',
'topic' = 'for_sink',
'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',
'key.format' = 'csv',
'value.format' = 'csv',
'value.csv.field-delimiter' = ' '
)
;
-- 真正要执行的任务
insert into sink_kafka
select
a.id,
b.user_name,
b.order_source,
a.product_count,
a.one_price,
a.product_count * a.one_price as total_price
from order_flow as a
-- 一定要添加 for system_time as of 语句,否则读取 mysql 的子任务会被认为是有界流,只读取一次,之后 mysql 维表中变化后的数据无法被读取
left join order_info for system_time as of a.proc_time as b
on a.id = b.id
;
```
flink sql 任务运行之后,flink UI 界面显示为
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_flink_ui.png)
最后查看写入 kafka 中的数据为
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_sink_kafka_data1.png)
此时,修改 mysql 中的数据,修改之后为
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_mysql_data2.png)
再查看写入 kafka 中的数据为
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_sink_kafka_data2.png)
**其他**
如果 kafka 中的订单流数据中的某个订单 id 在维表 mysql 中找不到,而且 flink sql 任务中使用的是 left join 连接,
则匹配不到的订单中的 user_name 和 product_count 字段将为空字符串,具体如下图所示
![img.png](http://www.aiwenmo.com/dinky/docs/zh-CN/sql_development_guide/example/kafka_lookup_join_mysql_sink_kafka_data3.png)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment