Commit e990a130 authored by walkhan's avatar walkhan

add chinese document

parent 9049f2d0
* [首页](/zh-CN/introduce.md)
* 文档
* [快速开始](/guide/quickstart.md)
* [发展计划](/guide/roadmap.md)
* [编译部署](/guide/deploy.md)
* [核心功能](/guide/functions.md)
* [开发调试](/guide/debug.md)
* 下载
* [扩展语法补全](/extend/completion.md)
* [扩展自定义函数](/extend/udf.md)
* [扩展连接器](/extend/connector.md)
* [扩展数据源](/extend/datasource.md)
* [扩展 Flink 版本](/extend/flinkversion.md)
* [Flink-CDC 集成](/extend/flinkcdc.md)
* [DolphinScheduler 集成](/extend/dolphinscheduler.md)
* [DataSphereStudio 集成](/extend/dataspherestudio.md)
* [Hive 集成](/extend/hive.md)
* [Doris 集成](/extend/doris.md)
* [Clickhouse 集成](/extend/clickhouse.md)
* [Hudi 集成](/extend/hudi.md)
* [Iceberg 集成](/extend/hudi.md)
* [Flink CDC和Kafka进行多源合并和下游同步更新](/extend/Flink_CDC_kafka_Multi_source_merger.md)
* 开发者
* [Dlink Yarn 的三种提交实践](/share/yarnsubmit.md)
* [Dlink AGGTABLE 表值聚合的实践](/share/aggtable.md)
* [Dlink 核心概念及实现原理详解](/share/principle.md)
* 用户案例
* [OpenAPI](/api/openapi.md)
* 语言
* [中文](/zh-CN/)
* [En](/en-US/)
<!-- docs/zh-CN/_sidebar.md -->
- [Dinky简介](/zh-CN/introduce.md)
- 概念和架构
- [系统架构](/zh-CN/architecture.md)
- [基本概念](/zh-CN/concept.md)
- [功能](/zh-CN/feature.md)
- 入门
- [下载](/zh-CN/quick_start/download.md)
- [编译](/zh-CN/quick_start/build.md)
- [部署](/zh-CN/quick_start/deploy.md)
- 基本使用指南
- 参考手册
- FlinkSQL Studio
- 作业和目录创建
- 作业开发
- 作业配置
- 作业管理
- 会话管理
- 注册中心
- 集群实例
- 集群配置
- jar管理
- 数据源管理
- 文档管理
- 系统设置
- 用户管理
- Flink设置
- 运维中心
- 生命周期管理
- 作业监控
- api
- 最佳实践
- [Yarn提交实践指南](/zh-CN/practice/yarnsubmit.md)
- [Dlink 核心概念及实现原理详解](/zh-CN/practice/principle.md)
- [AGGTABLE 表值聚合的实践](/zh-CN/practice/aggtable.md)
- 扩展
- 集成
- [Flink-CDC集成](/zh-CN/extend/flinkcdc.md)
- [Flink-CDC-Kafka多源合并](/zh-CN/extend/Flink_CDC_kafka_Multi_source_merger.md)
- [hive集成](/zh-CN/extend/hive.md)
- [clickhouse集成](/zh-CN/extend/clickhouse.md)
- [Doris集成](/zh-CN/extend/doris.md)
- [Hudi集成](/zh-CN/extend/hudi.md)
- [Iceberg集成](/zh-CN/extend/iceberg.md)
- [Flink UDF集成](/zh-CN/extend/udf.md)
- [DolphinScheduler集成](/zh-CN/extend/dolphinscheduler.md)
- [DataSphereStudio集成](/zh-CN/extend/dataspherestudio.md)
- 其他
- [扩展Flink版本](/zh-CN/extend/flinkversion.md)
- [扩展连接器](/zh-CN/extend/connector.md)
- [扩展数据源](/zh-CN/extend/datasource.md)
- [FlinkSQL 编辑器自动补全函数](/zh-CN/extend/completion.md)
- 开发者指南
- 常见问题
- 历史版本
- [近期计划](/zh-CN/others/plans.md)
- [致谢](/zh-CN/others/thanks.md)
- [交流与贡献](/zh-CN/others/comminicate.md)
- [roadmap](/zh-CN/roadmap.md)
\ No newline at end of file
## 敬请期待
\ No newline at end of file
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/main/dinky_principle.png)
<big>**JobManager**</big>
JobManager 作为 Dinky 的作业管理的统一入口,负责 Flink 的各种作业执行方式及其他功能的调度。
<big>**Executor**</big>
Executor 是 Dinky 定制的 FlinkSQL 执行器,来模拟真实的 Flink 执行环境,负责 FlinkSQL 的 Catalog 管理、UDF管理、片段管理、配置管理、语句集管理、语法校验、逻辑验证、计划优化、生成 JobGraph、本地执行、远程提交、SELECT 及 SHOW 预览等核心功能。
<big>**Interceptor**</big>
Interceptor 是 Dinky 的 Flink 执行拦截器,负责对其进行片段解析、UDF注册、SET 和 AGGTABLE 等增强语法解析。
<big>**Gateway**</big>
Gateway 并非是开源项目 flink-sql-gateway,而是 Dinky 自己定制的 Gateway,负责进行基于 Yarn 环境的任务提交与管理,主要有Yarn-Per-Job 和 Yarn-Application 的 FlinkSQL 提交、停止、SavePoint 以及配置测试,而 User Jar 目前只开放了 Yarn-Application 的提交。
<big>**Flink SDK**</big>
Dinky 主要通过调用 flink-client 和 flink-table 模块进行二次开发。
<big>**Yarn SDK**</big>
Dinky 通过调用 flink-yarn 模块进行二次开发。
<big>**Flink API**</big>
Dinky 也支持通过调用 JobManager 的 RestAPI 对任务进行管理等操作,系统配置可以控制开启和停用。
<big>**Yarn-Session**</big>
Dinky 通过已注册的 Flink Session 集群实例可以对 Standalone 和 Yarn-Session 两种集群进行 FlinkSQL 的提交、Catalog 的交互式管理以及对 SELECT 和 SHOW 等语句的执行结果预览。
<big>**Yarn-Per-Job**</big>
Dinky 通过已注册的集群配置来获取对应的 YarnClient 实例,然后将本地解析生产的 JobGraph 与 Configuration 提交至 Yarn 来创建 Flink-Per-Job 应用。
<big>**Yarn-Application**</big>
Dinky 通过已注册的集群配置来获取对应的 YarnClient 实例。对于 User Jar,将 Jar 相关配置与 Configuration 提交至 Yarn 来创建 Flink-Application 应用;对于 Flink SQL,Dinky 则将作业 ID 及数据库连接配置作为 Main 入参和 dlink-app.jar 以及 Configuration 提交至 Yarn 来创建 Flink-Application 应用。
\ No newline at end of file
## 前言
最近小伙伴们一直追问,如何在 IDEA 里去调试 Dlink。本文将指导大家可以成功地搭建调试环境并可以修改相关功能的代码,当然欢迎大家将相关问题修复及新功能的实现贡献到 dev 分支哦。那一起来看看吧!
## 准备
首先,请从 GitHub 中拉取 Dlink 源码,例如:
```bash
git clone https://github.com/DataLinkDC/dlink.git
```
## IntelliJ IDEA
该指南介绍了关于如何设置 IntelliJ IDEA 来进行 Dlink 前后端开发。Eclipse 不建议使用。
以下文档描述了 IntelliJ IDEA 2021.3 (https://www.jetbrains.com/idea/download/) 的设置步骤以及 Dlink 的导入步骤。
所以以下简称 IDEA 来表示 IntelliJ IDEA 。
### 安装 Lombok 插件
IDEA 提供了插件设置来安装 Lombok 插件。如果尚未安装,请在导入 Dlink 之前按照以下说明来进行操作以启用对 Lombok 注解的支持:
1. 转到 IDEA Settings → Plugins 并选择 Marketplace 。
2. 选择并安装 Lombok 插件。
3. 如果出现提示,请重启 IDEA 。
### 导入 Dlink
1. 启动 IDEA 并选择 Open。
2. 选择已克隆的 Dlink 存储库的根文件夹。
3. 等待项目加载完成。
4. 设置 JDK 1.8 和 Maven 3.6.0。
## 前端环境
### 安装 npm
可用版本 7.19.0,安装步骤详情百度。
### 安装 node.js
可用版本 14.17.0,安装步骤详情百度。
### 初始化依赖
```bash
npm install --force
```
## 编译项目
### 编译
IDEA 里 Build → Build Project 。
### 打包
```bash
mvn clean install -Dmaven.test.skip=true
```
打包最终位于根目录 build 下,`dlink-release-0.5.0-SNAPSHOT.tar.gz` 其大小约为 40 M。
### 问题
如果在打包 dlink-web 过程失败,请先单独打包前端进行问题排查。
```bash
npm build
```
## 修改 pom
### dlink-core.pom
```xml
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
<scope>provided</scope>
</dependency>
```
把上述依赖的 scope 注释掉。
### dlink-admin.pom
可在该 pom 下按功能添加其他 Dlink 子组件依赖以及 Flink 和 Hadoop 的第三方依赖。
如使用 ClickHouse 数据源及元数据功能,则添加以下内容:
```xml
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-clickhouse</artifactId>
<version>0.5.0-SNAPSHOT</version>
</dependency>
```
如使用 Flink Hive 等其他连接器功能,则需要添加相关依赖。
## 修改配置文件
### application.yml
配置数据库连接信息:
```yaml
spring:
datasource:
url: jdbc:mysql://127.0.0.1:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: dlink
password: dlink
driver-class-name: com.mysql.cj.jdbc.Driver
```
## 初始化数据库
创建 dlink 用户并在 dlink 数据库中执行 dlink-doc/sql/dlink.sql 文件。
此外 dlink-doc/sql/dlink_history.sql 标识了各版本的升级 sql。
## 启动后端服务
启动 dlink-admin 下的 Dlink 启动类,可见 8888 端口。
## 启动前端服务
```bash
npm start
```
等待几分钟,访问 127.0.0.1:8000 可见登录页。
输入 admin/admin 登录。
## 源码结构
```java
dlink -- 父项目
|-dlink-admin -- 管理中心
|-dlink-app -- Application Jar
|-dlink-assembly -- 打包配置
|-dlink-client -- Client 中心
| |-dlink-client-1.11 -- Client-1.11 实现
| |-dlink-client-1.12 -- Client-1.12 实现
| |-dlink-client-1.13 -- Client-1.13 实现
| |-dlink-client-1.14 -- Client-1.14 实现
|-dlink-common -- 通用中心
|-dlink-connectors -- Connectors 中心
| |-dlink-connector-jdbc -- Jdbc 扩展
|-dlink-core -- 执行中心
|-dlink-doc -- 文档
| |-bin -- 启动脚本
| |-bug -- bug 反馈
| |-config -- 配置文件
| |-doc -- 使用文档
| |-sql -- sql脚本
|-dlink-executor -- 执行中心
|-dlink-extends -- 扩展中心
|-dlink-function -- 函数中心
|-dlink-gateway -- Flink 网关中心
|-dlink-metadata -- 元数据中心
| |-dlink-metadata-base -- 元数据基础组件
| |-dlink-metadata-clickhouse -- 元数据- clickhouse 实现
| |-dlink-metadata-mysql -- 元数据- mysql 实现
| |-dlink-metadata-oracle -- 元数据- oracle 实现
| |-dlink-metadata-postgresql -- 元数据- postgresql 实现
|-dlink-web -- React 前端
|-docs -- 官网文档
```
### dlink-admin
Dlink 的管理中心,标准的 SpringBoot 应用,负责与前端 react 交互。
### dlink-app
Dlink 在 Yarn Application 模式所使用的简化解析包。
### dlink-assembly
项目打包配置,管理了最终 tar.gz 的打包内容。
### dlink-client
Dlink 定制的 Flink 运行环境的实现。用来桥接 Dlink 与不同版本的 Flink 运行环境。
### dlink-common
Dlink 的子项目的公用类及实现项目。
### dlink-connectors
Dlink 的 Connectors,目前实现了 Oracle、Clickhouse、SQLServer。此外 Dlink 可以直接使用 Flink 的所有连接器,在确保依赖不冲突的情况下。
### dlink-core
Dlink 的核心模块,内包含 Flink RestAPI 、集群、SQL解释器、Job统一调度器(JobManager)、会话管理等实现。
### dlink-doc
此模块为打包所需的资源模块,包含启动脚本、sql脚本、配置文件等。
### dlink-executor
Dlink 的执行模块,是从 dlink-core 中拆分出来,内含最核心的 Executor、Interceptor、Operation 等实现。
### dlink-extends
存放 Dlink 扩展其他生态的组件。
### dlink-function
Dlink 所额外提供的 Flink 各种自定义函数。
### dlink-gateway
Dlink 的任务网关,负责把实现不同执行模式的任务提交与管理,目前主要包含 Yarn PerJob 和 Application。
### dlink-metadata
Dlink 的元数据中心,用于实现各种外部数据源对接到 Dlink,以此使用其各种查询、执行等能力。未来用于 Flink Catalog 的预装载等。
### dlink-web
Dlink 的前端项目,基于 Ant Design Pro 5.0.0。Why Not Vue ? React Who Use Who Know。(中式英语 =。=)
Dlink 的前端架构与开发后续文章会详解,本文略。
### docs
Dlink 的官网实现,大佬们可以修改或贡献 markdown。多多分享,让社区发展更快,十分感谢。
## 任务执行路线
同步执行:三角号按钮。
异步提交:小火箭按钮。
### Local
同步执行/异步提交 ==> StudioService ==> JobManager ==> Executor ==> LocalStreamExecutor ==> CustomTableEnvironmentImpl ==> LocalEnvironment
### Standalone
注册集群实例 ==> 同步执行/异步提交 ==> StudioService ==> JobManager ==> Executor ==> RemoteStreamExecutor ==> CustomTableEnvironmentImpl ==> RemoteEnvironment ==> JobGraph ==> Flink Standalone Cluster
### Yarn Session
注册集群实例 ==> 同步执行/异步提交 ==> StudioService ==> JobManager ==> Executor ==> RemoteStreamExecutor ==> CustomTableEnvironmentImpl ==> RemoteEnvironment ==> JobGraph ==> Flink Yarn Session Cluster
### Yarn Per-Job
注册集群配置 ==> 异步提交 ==> StudioService ==> JobManager ==> Executor ==> JobGraph ==> Gateway ==> YarnPerJobGateway==> YarnClient ==> Flink Yarn Per-Job Cluster
### Yarn Application
注册集群配置 ==> 异步提交 ==> StudioService ==> JobManager ==> Executor ==> TaskId & JDBC ==> Gateway ==> YarnApplicationGateway==> YarnClient ==> dlink-app.jar ==> Executor ==> AppStreamExecutor ==> CustomTableEnvironmentImpl ==> LocalEnvironmentFlink Yarn Application Cluster
## 总结
以为内容为大家带来了 Dlink 的基本功能 IDEA 部署调试步骤,并简单介绍了各模块的作用,也清晰的描述了各执行模式下 FlinkSQL 实现免 Jar 提交的代码思路。在了解以上内容后,相信大家已经可以动手改造 Dlink 了,欢迎大家及时加入 Dlink 社区成为核心贡献者,共建共赢。
后续文章将指引大家如何快速拓展 Dlink 的功能组件,敬请期待。
# Flink CDC和kafka进行多源合并和下游同步更新
编辑:谢帮桂
# 前言
本文主要是针对 Flink SQL 使用 Flink CDC 无法实现多库多表的多源合并问题,以及多源合并后如何对下游 Kafka 同步更新的问题,因为目前 Flink SQL 也只能进行单表 Flink CDC 的作业操作,这会导致数据库 CDC 的连接数过多。
但是 Flink CDC 的 dataStream API 是可以进行多库多表的同步操作的,本文希望利用 Flink CDC 的 dataStream API 进行多源合并后导入一个总线 Kafka,下游只需连接总线 kafka 就可以实现 Flink SQL 的多源合并问题,资源复用。
# 环境
|Flink|1.13.3|
|:----|:----|
|Flink CDC|2.0|
|Kafka|2.13|
|Java|1.8|
|Dinky|5.0|
我们先打印一下 Flink CDC 默认的序列化 JSON 格式如下:
```json
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1643273051, file=mysql_bin.000002, pos=5348135, row=1, server_id=1, event=2}}
ConnectRecord{topic='mysql_binlog_source.gmall.spu_info', kafkaPartition=null, key=Struct{id=12}, keySchema=Schema{mysql_binlog_source.gmall.spu_info.Key:STRUCT}, value=Struct{before=Struct{id=12,spu_name=华为智慧屏 14222K1 全面屏智能电视机,description=华为智慧屏 4K 全面屏智能电视机,category3_id=86,tm_id=3},after=Struct{id=12,spu_name=华为智慧屏 2K 全面屏智能电视机,description=华为智慧屏 4K 全面屏智能电视机,category3_id=86,tm_id=3},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1643273051000,db=gmall,table=spu_info,server_id=1,file=mysql_bin.000002,pos=5348268,row=0,thread=3742},op=u,ts_ms=1643272979401}, valueSchema=Schema{mysql_binlog_source.gmall.spu_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
```
可以看到,这种格式的 JSON,传给下游有很大的问题,要实现多源合并和同步更新,我们要解决以下两个问题。
**①总线Kafka传来的json,无法识别源库和源表来进行具体的表创建操作,因为不是固定的json格式,建表WHIT配置里也无法指定具体的库和表。**
**②总线Kafka传来的json如何进行CRUD等事件对Kafka流的同步操作,特别是Delete,下游kafka如何感知来更新ChangeLog。**
# 查看文档
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLhNR.png)
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLfE9.png)
我们可以看到红框部分,基于 Debezium 格式的 json 可以在 Kafka connector 建表中可以实现表的 CRUD 同步操作。只要总线 Kafka 的 json 格式符合该模式就可以对下游 kafka 进行 CRUD 的同步更新,刚好 Flink CDC 也是基于 Debezium。
那这里就已经解决了问题②。
剩下问题①,如何解决传来的多库多表进行指定表和库的识别,毕竟建表语句没有进行where的设置参数。
再往下翻文档:
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLRHJ.png)
可以看到,基于 Debezium-json 格式,可以把上面的 schema 定义的 json 格式的元数据给取出来放在字段里。
比如,我把 table 和 database 给放在建表语句里,那样我就可以在 select 语句中进行库和表的过滤了。
如下:
```sql
CREATE TABLE Kafka_Table (
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL, //schema 定义的 json 里的元数据字段
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
`id` INT,
`spu_name` STRING,
`description` STRING,
`category3_id` INT,
`tm_id` INT
) WITH (
'connector' = 'kafka',
'topic' = 'input_kafka4',
'properties.group.id' = '57',
'properties.bootstrap.servers' = '10.1.64.156:9092',
'scan.startup.mode' = 'latest-offset',
'debezium-json.ignore-parse-errors' = 'true',
'format' = 'debezium-json'
);
select * from Kafka_Table where origin_database='gmall' and origin_table = 'spu_info'; //这里就实现了指定库和表的过滤操作
```
那这样问题②就解决了。
那我们现在就要做两个事情:
**①写一个Flink CDC的dataStream项目进行多库多表同步,传给总线Kafka。**
**②自定义总线Kafka的json格式。**
# 新建Flink CDC的dataStream项目
```java
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 设置 CK&状态后端
//略
//2.通过 FlinkCDC 构建 SourceFunction 并读取数据
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("10.1.64.157")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall") //这个注释,就是多库同步
//.tableList("gmall.spu_info") //这个注释,就是多表同步
.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
//.deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3.打印数据并将数据写入 Kafka
streamSource.print();
String sinkTopic = "input_kafka4";
streamSource.addSink(getKafkaProducer("10.1.64.156:9092",sinkTopic));
//4.启动任务
env.execute("FlinkCDC");
}
//kafka 生产者
public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
```
##
# 自定义序列化类
```java
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.ArrayList;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
JSONObject result = new JSONObject();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
JSONObject source = new JSONObject();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
```
OK,运行 flinkCDC 项目,同步的数据库表插入一条记录,得出以下自定义格式后的 JSON:
```json
{
"op": "u",
"before": {
"spu_name": "香奈儿(Chanel)女士香水 5 号香水 粉邂逅柔情淡香水 EDT ",
"tm_id": 11,
"description": "香奈儿(Chanel)女士香水 5 号香水 粉邂逅柔情淡香水 EDT 111",
"id": 11,
"category3_id": 473
},
"source": {
"database": "gmall",
"table": "spu_info"
},
"after": {
"spu_name": "香奈儿(Chanel)女士香水 5 号香水 粉邂逅柔情淡香水 EDTss ",
"tm_id": 11,
"description": "香奈儿(Chanel)女士香水 5 号香水 粉邂逅柔情淡香水 EDT 111",
"id": 11,
"category3_id": 473
}
}
```
PS:没放 schema{}这个对象,看文档说加了识别会影响效率。
# 总线 Kafka
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSL441.png)
# Dinky 里面进行建表,提交作业
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSL2B4.png)
PS:yarn-session 模式,记得开启预览结果和打印流,不然观察不到数据 changelog
# 查看结果
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLI9x.png)
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLo36.png)
可以看到在指定库和表中新增一条数据,在下游 kafka 作业中实现了同步更新,然后试试对数据库该表的记录进行 delete,效果如下:
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLTgK.png)
可以看到"是是是.."这条记录同步删除了。
此时 Flink CDC 的记录是这样:
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSL7jO.png)
原理主要是 op 去同步下游 kafka 的 changeLog 里的 op
我们浏览一下 changeLog:(Dinky 选中打印流即可)
![image-20210615115042539](https://s4.ax1x.com/2022/01/29/HSLbuD.png)
可以看到,op 自动识别总线 kafka 发来的 JSON 进行了同步来记录操作。
后续我们就可以插入 upsert-kafka 表进行具体的表操作了。
**完成!这样只需建一个 DataStream 的总线 jar,在 Dinky 中进行提交,后续下游的作业只需要 kafka 去接总线 kafka 就可以进行 Flink CDC 在 Flink SQL 里的多源合并和同步更新。**
灵感和代码来自于尚硅谷,请支持 Dinky 和尚硅谷,另外是在测试环境进行,生产环境调优自行解决,如有更好的实践欢迎对文档进行 pr,感谢!
敬请期待
\ No newline at end of file
## FlinkSQL 编辑器自动补全函数
Dlink-0.3.2 版本上线了一个非常实用的功能——自动补全。
我们在使用 IDEA 等工具时,提示方法并补全、生成的功能大大提升了开发效率。而 Dlink 的目标便是让 FlinkSQL 更加丝滑,所以其提供了自定义的自动补全功能。对比传统的使用 `Java` 字符串来编写 FlinkSQL 的方式,Dlink 的优势是巨大。
在文档中心,我们可以根据自己的需要扩展相应的自动补全规则,如 `UDF``Connector With` 等 FlinkSQL 片段,甚至可以扩展任意可以想象到内容,如注释、模板、配置、算法等。
具体新增规则的示例请看下文描述。
## set 语法来设置执行环境参数
对于一个 FlinkSQL 的任务来说,除了 sql 口径,其任务配置也十分重要。所以 Dlink-0.3.2 版本中提供了 `sql-client``set` 语法,可以通过 `set` 关键字来指定任务的执行配置(如 “ `set table.exec.resource.default-parallelism=2;` ” ),其优先级要高于 Dlink 自身的任务配置(面板右侧)。
那么长的参数一般人谁记得住?等等,别忘了 Dlink 的新功能自动补全~
配置实现输入 `parallelism` 子字符串来自动补全 `table.exec.resource.default-parallelism=`
在文档中心中添加一个规则,名称为 `parallelism`,填充值为 `table.exec.resource.default-parallelism=`,其他内容随意。
保存之后,来到编辑器输入 `par` .
选中要补全的规则后,编辑器中自动补全了 `table.exec.resource.default-parallelism=`
至此,有些小伙伴发现,是不是可以直接定义 `pl2` 来自动生成 `set table.exec.resource.default-parallelism=2;`
当然可以的。
还有小伙伴问,可不可以定义 `pl` 生成 `set table.exec.resource.default-parallelism=;` 后,光标自动定位到 `=``;` 之间?
这个也可以的,只需要定义 `pl` 填充值为 `set table.exec.resource.default-parallelism=${1:};` ,即可实现。
所以说,只要能想象到的都可以定义,这样的 Dlink 你爱了吗?
嘘,还有点小 bug 后续修复呢。如果有什么建议及问题请及时指出哦。
\ No newline at end of file
## 扩展 Connector
将 Flink 集群上已扩展好的 Connector 直接放入 Dlink 的 lib 或者 plugins 下,然后重启即可。定制 Connector 过程同 Flink 官方一样。
## 扩展 Metadata
遵循SPI。
\ No newline at end of file
## 敬请期待
\ No newline at end of file
## 敬请期待
\ No newline at end of file
## 背景
Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。
目前 Doris 的生态正在建设中,本文将分享如何基于 Dlink 实现 Mysql 变动数据通过 Flink 实时入库 Doris。
## 准备
老规矩,列清各组件版本:
| 组件 | 版本 |
| :-------------: | :----------: |
| Flink | 1.13.3 |
| Flink-mysql-cdc | 2.1.0 |
| Doris | 0.15.1-rc09 |
| doris-flink | 1.0-SNAPSHOT |
| Mysql | 8.0.13 |
| Dlink | 0.4.0 |
需要注意的是,本文的 Doris 是基于 OracleJDK1.8 和 Scala 2.11 通过源码进行编译打包的,所以所有组件的 scala 均为 2.11,此处和 Doris 社区略有不同。
## 部署
本文的 Flink 和 Doris 的部署不做描述,详情请查阅官网。
[Doris]: https://doris.apache.org/master/zh-CN/extending-doris/flink-doris-connector.html#%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95 "Doris"
本文在 Dlink 部署成功的基础上进行,如需查看具体部署步骤,请阅读《flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink》。
Dlink 的 plugins 下添加 `doris-flink-1.0-SNAPSHOT.jar``flink-sql-connector-mysql-cdc-2.1.0.jar` 。重启 Dlink。
```java
plugins/ -- Flink 相关扩展
|- doris-flink-1.0-SNAPSHOT.jar
|- flink-csv-1.13.3.jar
|- flink-dist_2.11-1.13.3.jar
|- flink-format-changelog-json-2.1.0.jar
|- flink-json-1.13.3.jar
|- flink-shaded-zookeeper-3.4.14.jar
|- flink-sql-connector-mysql-cdc-2.1.0.jar
|- flink-table_2.11-1.13.3.jar
|- flink-table-blink_2.11-1.13.3.jar
```
当然,如果您想直接使用 FLINK_HOME 的话,可以在 `auto.sh` 文件中 `SETTING` 变量添加`$FLINK_HOME/lib`
## 数据表
### 学生表 (student)
```sql
-- Mysql
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`sid` int(11) NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `student` VALUES (1, '小红');
INSERT INTO `student` VALUES (2, '小黑');
INSERT INTO `student` VALUES (3, '小黄');
```
### 成绩表(score)
```sql
-- Mysql
DROP TABLE IF EXISTS `score`;
CREATE TABLE `score` (
`cid` int(11) NOT NULL,
`sid` int(11) NULL DEFAULT NULL,
`cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`score` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`cid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `score` VALUES (1, 1, 'chinese', 90);
INSERT INTO `score` VALUES (2, 1, 'math', 95);
INSERT INTO `score` VALUES (3, 1, 'english', 93);
INSERT INTO `score` VALUES (4, 2, 'chinese', 92);
INSERT INTO `score` VALUES (5, 2, 'math', 75);
INSERT INTO `score` VALUES (6, 2, 'english', 80);
INSERT INTO `score` VALUES (7, 3, 'chinese', 100);
INSERT INTO `score` VALUES (8, 3, 'math', 60);
```
### 学生成绩宽表(scoreinfo)
```sql
-- Doris
CREATE TABLE scoreinfo
(
cid INT,
sid INT,
name VARCHAR(32),
cls VARCHAR(32),
score INT
)
UNIQUE KEY(cid)
DISTRIBUTED BY HASH(cid) BUCKETS 10
PROPERTIES("replication_num" = "1");
```
## FlinkSQL
```sql
CREATE TABLE student (
sid INT,
name STRING,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'student');
CREATE TABLE score (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'score');
CREATE TABLE scoreinfo (
cid INT,
sid INT,
name STRING,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030' ,
'table.identifier' = 'test.scoreinfo',
'username' = 'root',
'password'=''
);
insert into scoreinfo
select
a.cid,a.sid,b.name,a.cls,a.score
from score a
left join student b on a.sid = b.sid
```
## 调试
### 在 Dlink 中提交
本示例采用了 yarn-session 的方式进行提交。
![image-20211218134246511](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibG3QjNqVbMk7L41wHykKnkV0YxDCVSYj68HlFWylpYckkXicgnTDU7uQ/0?wx_fmt=png)
### FlinkWebUI
![image-20211218134439699](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibn1jXbvKznaF8Tm4AxxvYYDI0fEtXbGm0XUeXhGp44KMlPdoOzjvtHQ/0?wx_fmt=png)
上图可见,流任务已经成功被 Dlink 提交的远程集群了。
### Doris 查询
![image-20211218135404787](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9iblKTY6o9fWZxFDQYC19wKVFRGDuUBgNOZxm14sWjyr8tUY7RDeUiaEUw/0?wx_fmt=png)
上图可见,Doris 已经被写入了历史全量数据。
### 增量测试
在 Mysql 中执行新增语句:
```sql
INSERT INTO `score` VALUES (9, 3, 'english', 100);
```
Doris 成功被追加:
![image-20211218135545742](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibvE4qyQ9ttf2kNZ3raEgabvh442HfiaIfm2l5dhdFmWoGiaHMlvcQmocw/0?wx_fmt=png)
### 变动测试
在 Mysql 中执行新增语句:
```sql
update score set score = 100 where cid = 1
```
Doris 成功被修改:
![image-20211218135949764](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ib3liaIvXQcCSboO4IoeJhtTRa38ukNogtFzwg31mNEFwRcJ1wGNIhQkQ/0?wx_fmt=png)
## 扩展 Flink-CDC
`flink-sql-connector-mysql-cdc-2.1.0.jar` 等 CDC 依赖加入到 Dlink 的 `plugins` 下即可。
\ No newline at end of file
## 扩展其他版本的 Flink
Flink 的版本取决于 lib 下的 dlink-client-1.13.jar。当前版本默认为 Flink 1.13.3 API。向其他版本的集群提交任务可能存在问题,已实现 1.11、1.12、1.13, 1.14,切换版本时只需要将对应依赖在lib下进行替换,然后重启即可。
切换版本时需要同时更新 plugins 下的 Flink 依赖。
\ No newline at end of file
## 前言
最近有很多小伙伴问,dlink 如何连接 Hive 进行数据开发?
关于 dlink 连接 Hive 的步骤同 Flink 的 `sql-client ` ,只不过它没有默认加载的配置文件。下文将详细讲述对 Hive 操作的全过程。
## 准备工作
由于搭建 Hive 的开发环境会涉及到重多组件和插件,那其版本对应问题也是至关重要,它能帮我们避免很多不必要的问题,当然小版本号之间具备一定的兼容性。
我们先来梳理下本教程的各个组件版本:
| 组件 | 版本 |
| :----: | :----: |
| Dlink | 0.3.2 |
| Flink | 1.12.4 |
| Hadoop | 2.7.7 |
| Hive | 2.3.6 |
| Mysql | 8.0.15 |
再来梳理下本教程的各个插件版本:
| 所属组件 | 插件 | 版本 |
| :-----------: | :------------------------: | :-------------------: |
| Dlink | dlink-client | 1.12 |
| Dlink & Flink | flink-sql-connector-hive | 2.3.6_2.11-1.12.3 |
| Dlink & Flink | flink-shaded-hadoop-3-uber | 3.1.1.7.2.8.0-224-9.0 |
## 部署扩展
部署扩展的工作非常简单(前提是 Dlink 部署完成并成功连接 Flink 集群,相关部署步骤请查看《Dlink实时计算平台——部署篇》),只需要把 `flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar``flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar` 两个插件分别加入到 Dlink 的 plugins 目录与 Flink 的 lib 目录下即可,然后重启二者。当然,还需要放置 `hive-site.xml`,位置自定义,Dlink 可以访问到即可。
## 创建 Hive Catalog
已知,Hive 已经新建了一个数据库实例 `hdb` ,创建了一张表 `htest`,列为 `name``age`,存储位置默认为 `hdfs:///usr/local/hadoop/hive-2.3.9/warehouse/hdb.db` 。(此处为何 2.3.9 呢,因为 `flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar` 只支持到最高版本 2.3.6,小编先装了个 2.3.9 后装了个 2.3.6,尴尬 > _ < ~)
```sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'hdb',
'hive-conf-dir' = '/usr/local/dlink/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
select * from htest
```
在 Dlink 编辑器中输入以上 sql ,创建 Hive Catalog,并查询一张表。
其中,`hive-conf-dir` 需要指定 `hive-site.xml` 的路径,其他同 Flink 官方解释。
执行查询后(记得选中执行配置的预览结果),可以从查询结果中查看到 htest 表中只有一条数据。(这是正确的,因为小编太懒了,只随手模拟了一条数据)
此时可以使用 FlinkSQL 愉快地操作 Hive 的数据了。
## 使用 Hive Dialect
很熟悉 Hive 的语法以及需要对 Hive 执行其自身特性的语句怎么办?
同 Flink 官方解释一样,只需要使用 `SET table.sql-dialect=hive` 来启用方言即可。注意有两种方言 `default``hive` ,它们的使用可以随意切换哦~
```sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'hdb',
'hive-conf-dir' = '/usr/local/dlink/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
-- set hive dialect
SET table.sql-dialect=hive;
-- alter table location
alter table htest set location 'hdfs:///usr/htest';
-- set default dialect
SET table.sql-dialect=default;
select * from htest;
```
上述 sql 中添加了 Hive Dialect 的使用,FlinkSQL 本身不支持 `alter table .. set location ..` 的语法,使用 Hive Dialect 则可以实现语法的切换。本 sql 内容对 htest 表进行存储位置的改变,将其更改为一个新的路径,然后再执行查询。
由上图可见,被更改过 location 的 htest 此时查询没有数据,是正确的。
然后将 location 更改为之前的路径,再执行查询,则可见原来的那条数据,如下图所示。
## 总结
由上所知,Dlink 以更加友好的交互方式展现了 Flink 集成 Hive 的部分功能,当然其他更多的 Hive 功能需要您自己在使用的过程中去体验与挖掘。
目前,Dlink 支持 Flink 绝大多数特性与功能,集成与拓展方式与 Flink 官方文档描述一致,只需要在 Dlink 的 plugins 目录下添加依赖即可。
\ No newline at end of file
## 背景资料
Apache hudi (发音为“ hoodie”)是下一代流式数据湖平台。Apache Hudi 将核心仓库和数据库功能直接引入到数据库中。Hudi 提供表、事务、高效的升级/删除、高级索引、流式摄入服务、数据集群/压缩优化和并发,同时保持数据以开放源码文件格式存储 , Apache Hudi 不仅非常适合流式工作负载,而且它还允许您创建高效的增量批处理管道。
实时数仓流批一体已经成为大势所趋。
为什么要使用 Hudi ?
1. 目前业务架构较为繁重
2. 维护多套框架
3. 数据更新频率较大
## 准备&&部署
| 组件 | 版本 | 备注 |
| ------------- | ----------------------------------- | ---------- |
| Flink | 1.13.5 | 集成到CM |
| Flink-SQL-CDC | 2.1.1 | |
| Hudi | 0.10.0-patch | 打过补丁的 |
| Mysql | 8.0.13 | 阿里云 |
| Dlink | dlink-release-0.5.0-SNAPSHOT.tar.gz | |
| Scala | 2.12 | |
### 1. 部署Flink1.13.5
flink 集成到CM中
此步骤略。
### 2. 集成Hudi 0.10.0
​ ①. 地址: https://github.com/danny0405/hudi/tree/010-patch 打过补丁的 大佬请忽略^_^
​ a. 下载压缩包 分支010-patch 不要下载 master 上传 解压
​ b. unzip 010-patch.zip
​ c. 找到 `packging--hudi-flink-bundle` 下的 `pom.xml`,更改 `flink-bundel-shade-hive2` 下的 `hive-version` 更改为 `2.1.1-chd6.3.2` 的版本。
```shell
vim pom.xml # 修改hive版本为 : 2.1.1-cdh6.3.2
```
​ d. 执行编译:
```shell
mvn clean install -DskipTests -DskipITs -Dcheckstyle.skip=true -Drat.skip=true -Dhadoop.version=3.0.0 -Pflink-bundle-shade-hive2 -Dscala-2.12
```
​ 因为 `chd6.3.0` 使用的是 `hadoop3.0.0` ,所以要指定 `hadoop` 的版本, `hive` 使用的是 `2.1.1` 的版本,也要指定 `hive` 的版本,不然使用 `sync to hive` 的时候,会报类的冲突问题。 `scala` 版本是 `2.12`
​ 同时 flink 集成到 cm 的时候也是 `scala2.12` 版本统一。
编译完成如下图:
![img](http://www.aiwenmo.com/dinky/hudi/hudill.png)
②. 把相关应的jar包 放到相对应的目录下
```shell
# hudi的包
ln -s /opt/module/hudi-0.10.0/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.10.0.jar /opt/cloudera/parcels/CDH/jars/
ln -s /opt/module/hudi-0.10.0/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.10.0.jar /opt/cloudera/parcels/CDH/jars/
ln -s /opt/module/hudi-0.10.0/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.10.0.jar /opt/cloudera/parcels/CDH/lib/hive/lib
# 同步sync to hive 每台节点都要放
cp /opt/module/hudi-0.10.0/hudi-flink-bundle/target/hudi-flink-bundle_2.12-0.10.0.jar /opt/cloudera/parcels/FLINK/lib/flink/lib/
# 以下三个jar 放到flink/lib 下 否则同步数据到hive的时候会报错
cp /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-client-core-3.0.0-cdh6.3.2.jar /opt/module/flink-1.13.5/lib/
cp /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-client-common-3.0.0-cdh6.3.2.jar /opt/module/flink-1.13.5/lib/
cp /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-client-jobclient-3.0.0-cdh6.3.2.jar /opt/module/flink-1.13.5/lib/
# 执行以下命令
cd /opt/module/flink-1.13.5/lib/
scp -r ./* cdh5:`pwd`
scp -r ./* cdh6:`pwd`
scp -r ./* cdh7:`pwd`
```
### 3. 安装 Dlink-0.5.0
a. github 地址: https://github.com/DataLinkDC/dlink
b. 部署步骤见 github-readme.md 传送门: https://github.com/DataLinkDC/dlink/blob/main/README.md
ps: 注意 还需要将 `hudi-flink-bundle_2.12-0.10.0.jar` 这个包放到 dlink的 `plugins` 下 。
`plugins` 下的包 如下图所示:
![img](http://www.aiwenmo.com/dinky/hudi/dlinkll.png)
c. 访问: [http://ip:port/#/user/login](http://cdh7.vision.com:8811/#/user/login) 默认用户: admin 密码: admin
d. 创建集群实例:
![img](http://www.aiwenmo.com/dinky/hudi/hudi_cluster.png)
![img](http://www.aiwenmo.com/dinky/hudi/hudi_clustertable.png)
## 数据表
### 1. DDL准备
(以下ddl 通过Python程序模板生成 大佬请略过! O(∩_∩)O )
```sql
------------- '订单表' order_mysql_goods_order -----------------
CREATE TABLE source_order_mysql_goods_order (
`goods_order_id` bigint COMMENT '自增主键id'
, `goods_order_uid` string COMMENT '订单uid'
, `customer_uid` string COMMENT '客户uid'
, `customer_name` string COMMENT '客户name'
, `student_uid` string COMMENT '学生uid'
, `order_status` bigint COMMENT '订单状态 1:待付款 2:部分付款 3:付款审核 4:已付款 5:已取消'
, `is_end` bigint COMMENT '订单是否完结 1.未完结 2.已完结'
, `discount_deduction` bigint COMMENT '优惠总金额(单位:分)'
, `contract_deduction` bigint COMMENT '老合同抵扣金额(单位:分)'
, `wallet_deduction` bigint COMMENT '钱包抵扣金额(单位:分)'
, `original_price` bigint COMMENT '订单原价(单位:分)'
, `real_price` bigint COMMENT '实付金额(单位:分)'
, `pay_success_time` timestamp(3) COMMENT '完全支付时间'
, `tags` string COMMENT '订单标签(1新签 2续费 3扩科 4报名-合新 5转班-合新 6续费-合新 7试听-合新)'
, `status` bigint COMMENT '是否有效(1.生效 2.失效 3.超时未付款)'
, `remark` string COMMENT '订单备注'
, `delete_flag` bigint COMMENT '是否删除(1.否,2.是)'
, `test_flag` bigint COMMENT '是否测试数据(1.否,2.是)'
, `create_time` timestamp(3) COMMENT '创建时间'
, `update_time` timestamp(3) COMMENT '更新时间'
, `create_by` string COMMENT '创建人uid(唯一标识)'
, `update_by` string COMMENT '更新人uid(唯一标识)'
,PRIMARY KEY(goods_order_id) NOT ENFORCED
) COMMENT '订单表'
WITH (
'connector' = 'mysql-cdc'
,'hostname' = 'rm-bp1t34384933232rds.aliyuncs.com'
,'port' = '3306'
,'username' = 'app_kfkdr'
,'password' = 'CV122fff0E40'
,'server-time-zone' = 'UTC'
,'scan.incremental.snapshot.enabled' = 'true'
,'debezium.snapshot.mode'='latest-offset' -- 或者key是scan.startup.mode,initial表示要历史数据,latest-offset表示不要历史数据
,'debezium.datetime.format.date'='yyyy-MM-dd'
,'debezium.datetime.format.time'='HH-mm-ss'
,'debezium.datetime.format.datetime'='yyyy-MM-dd HH-mm-ss'
,'debezium.datetime.format.timestamp'='yyyy-MM-dd HH-mm-ss'
,'debezium.datetime.format.timestamp.zone'='UTC+8'
,'database-name' = 'order'
,'table-name' = 'goods_order'
-- ,'server-id' = '2675788754-2675788754'
);
CREATE TABLE sink_order_mysql_goods_order(
`goods_order_id` bigint COMMENT '自增主键id'
, `goods_order_uid` string COMMENT '订单uid'
, `customer_uid` string COMMENT '客户uid'
, `customer_name` string COMMENT '客户name'
, `student_uid` string COMMENT '学生uid'
, `order_status` bigint COMMENT '订单状态 1:待付款 2:部分付款 3:付款审核 4:已付款 5:已取消'
, `is_end` bigint COMMENT '订单是否完结 1.未完结 2.已完结'
, `discount_deduction` bigint COMMENT '优惠总金额(单位:分)'
, `contract_deduction` bigint COMMENT '老合同抵扣金额(单位:分)'
, `wallet_deduction` bigint COMMENT '钱包抵扣金额(单位:分)'
, `original_price` bigint COMMENT '订单原价(单位:分)'
, `real_price` bigint COMMENT '实付金额(单位:分)'
, `pay_success_time` timestamp(3) COMMENT '完全支付时间'
, `tags` string COMMENT '订单标签(1新签 2续费 3扩科 4报名-合新 5转班-合新 6续费-合新 7试听-合新)'
, `status` bigint COMMENT '是否有效(1.生效 2.失效 3.超时未付款)'
, `remark` string COMMENT '订单备注'
, `delete_flag` bigint COMMENT '是否删除(1.否,2.是)'
, `test_flag` bigint COMMENT '是否测试数据(1.否,2.是)'
, `create_time` timestamp(3) COMMENT '创建时间'
, `update_time` timestamp(3) COMMENT '更新时间'
, `create_by` string COMMENT '创建人uid(唯一标识)'
, `update_by` string COMMENT '更新人uid(唯一标识)'
,PRIMARY KEY (goods_order_id) NOT ENFORCED
) COMMENT '订单表'
WITH (
'connector' = 'hudi'
, 'path' = 'hdfs://cluster1/data/bizdata/cdc/mysql/order/goods_order' -- 路径会自动创建
, 'hoodie.datasource.write.recordkey.field' = 'goods_order_id' -- 主键
, 'write.precombine.field' = 'update_time' -- 相同的键值时,取此字段最大值,默认ts字段
, 'read.streaming.skip_compaction' = 'true' -- 避免重复消费问题
, 'write.bucket_assign.tasks' = '2' -- 并发写的 bucekt 数
, 'write.tasks' = '2'
, 'compaction.tasks' = '1'
, 'write.operation' = 'upsert' -- UPSERT(插入更新)\INSERT(插入)\BULK_INSERT(批插入)(upsert性能会低些,不适合埋点上报)
, 'write.rate.limit' = '20000' -- 限制每秒多少条
, 'table.type' = 'COPY_ON_WRITE' -- 默认COPY_ON_WRITE ,
, 'compaction.async.enabled' = 'true' -- 在线压缩
, 'compaction.trigger.strategy' = 'num_or_time' -- 按次数压缩
, 'compaction.delta_commits' = '20' -- 默认为5
, 'compaction.delta_seconds' = '60' -- 默认为1小时
, 'hive_sync.enable' = 'true' -- 启用hive同步
, 'hive_sync.mode' = 'hms' -- 启用hive hms同步,默认jdbc
, 'hive_sync.metastore.uris' = 'thrift://cdh2.vision.com:9083' -- required, metastore的端口
, 'hive_sync.jdbc_url' = 'jdbc:hive2://cdh1.vision.com:10000' -- required, hiveServer地址
, 'hive_sync.table' = 'order_mysql_goods_order' -- required, hive 新建的表名 会自动同步hudi的表结构和数据到hive
, 'hive_sync.db' = 'cdc_ods' -- required, hive 新建的数据库名
, 'hive_sync.username' = 'hive' -- required, HMS 用户名
, 'hive_sync.password' = '123456' -- required, HMS 密码
, 'hive_sync.skip_ro_suffix' = 'true' -- 去除ro后缀
);
---------- source_order_mysql_goods_order=== TO ==>> sink_order_mysql_goods_order ------------
insert into sink_order_mysql_goods_order select * from source_order_mysql_goods_order;
```
## 调试
### 1.对上述SQL执行语法校验:
![img](http://www.aiwenmo.com/dinky/hudi/hudi_explainsql.png)
### 2. 获取JobPlan
![img](http://www.aiwenmo.com/dinky/hudi/hudi_jobplan.png)
### 3. 执行任务
![img](http://www.aiwenmo.com/dinky/hudi/hudi_executesql.png)
### 4. dlink 查看执行的任务
![img](http://www.aiwenmo.com/dinky/hudi/hudi_process.png)
### 5. Flink-webUI 查看 作业
![img](http://www.aiwenmo.com/dinky/hudi/hudi_flink.png)
### 6. 查看hdfs路径下数据
![img](http://www.aiwenmo.com/dinky/hudi/hudi_hdfs.png)
### 7. 查看hive表:
![img](http://www.aiwenmo.com/dinky/hudi/hudi_hive.png)
查看订单号对应的数据
![img](http://www.aiwenmo.com/dinky/hudi/hudi_hivedata.png)
### 8.更新数据操作
```sql
UPDATE `order`.`goods_order`
SET
`remark` = 'cdc_test update'
WHERE
`goods_order_id` = 73270;
```
再次查看 hive 数据 发现已经更新
![img](http://www.aiwenmo.com/dinky/hudi/hudi_hiveupdate.png)
### 9.删除数据操作
(内部业务中采用逻辑删除 不使用物理删除 此例仅演示/测试使用 谨慎操作)
```sql
delete from `order`.`goods_order` where goods_order_id='73270';
```
![img](http://www.aiwenmo.com/dinky/hudi/hudi_hivedelete.png)
### 10.将此数据再次插入
```sql
INSERT INTO `order`.`goods_order`(`goods_order_id`, `goods_order_uid`, `customer_uid`, `customer_name`, `student_uid`, `order_status`, `is_end`, `discount_deduction`, `contract_deduction`, `wallet_deduction`, `original_price`, `real_price`, `pay_success_time`, `tags`, `status`, `remark`, `delete_flag`, `test_flag`, `create_time`, `update_time`, `create_by`, `update_by`) VALUES (73270, '202112121667480848077045760', 'VA100002435', 'weweweywu', 'S100002435', 4, 1, 2000000, 0, 0, 2000000, 0, '2021-12-12 18:51:41', '1', 1, '', 1, 1, '2021-12-12 18:51:41', '2022-01-10 13:53:59', 'VA100681', 'VA100681');
```
再次查询hive数据 数据正常进入。
![img](http://www.aiwenmo.com/dinky/hudi/hudi_hiveinsert.png)
至此 Dlink在Flink-SQL-CDC 到Hudi Sync Hive 测试结束
## 结论
通过 Dlink + Flink-Mysql-CDC + Hudi 的方式大大降低了我们流式入湖的成本,其中 Flink-Mysql-CDC 简化了CDC的架构与成本,而 Hudi 高性能的读写更有利于变动数据的存储,最后 Dlink 则将整个数据开发过程 sql 平台化,使我们的开发运维更加专业且舒适,期待 Dlink 后续的发展。
## 作者
zhumingye
\ No newline at end of file
## 敬请期待
\ No newline at end of file
## 扩展 UDF
将 Flink 集群上已扩展好的 UDF 直接放入 Dlink 的 lib 或者 plugins 下,然后重启即可。定制 UDF 过程同 Flink 官方一样。
注意:以下功能均为对应版本已实现的功能,实测可用。
| 应用 | 方向 | 功能 | 进展 |
|:-------:|:----------:|-------------------------------------------|:-----:|
| 开发中心 | FlinkSQL | 支持 sql-client 所有语法 | 0.4.0 |
| | | 支持 Flink 所有 Configuration | 0.4.0 |
| | | 支持 Flink 所有 Connector | 0.4.0 |
| | | 支持 SELECT、SHOW 等查询实时预览 | 0.4.0 |
| | | 支持 INSERT 语句集 | 0.4.0 |
| | | 新增 SQL 片段语法 | 0.4.0 |
| | | 新增 AGGTABLE 表值聚合语法及 UDATF 支持 | 0.4.0 |
| | | 新增 CDCSOURCE 多源合并语法支持 | 0.6.0 |
| | | 新增 FlinkSQLEnv 执行环境复用 | 0.5.0 |
| | | 新增 Flink Catalog 交互查询 | 0.4.0 |
| | | 新增 执行环境的共享与私有会话机制 | 0.4.0 |
| | | 新增 多种方言的作业目录管理(FlinkSQL、SQL、Java) | 0.5.0 |
| | | 新增 作业配置与执行配置管理 | 0.4.0 |
| | | 新增 基于 Explain 的语法校验与逻辑解析 | 0.4.0 |
| | | 新增 JobPlan 图预览 | 0.5.0 |
| | | 新增 基于 StreamGraph 的表级血缘分析 | 0.4.0 |
| | | 新增 基于上下文元数据自动提示与补全 | 0.4.0 |
| | | 新增 自定义规则的自动提示与补全 | 0.4.0 |
| | | 新增 关键字高亮与代码缩略图 | 0.4.0 |
| | | 新增 选中片段执行 | 0.4.0 |
| | | 新增 布局拖拽 | 0.4.0 |
| | | 新增 SQL导出 | 0.5.0 |
| | | 新增 快捷键保存、校验、美化 | 0.5.0 |
| | | 支持 local 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 standalone 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn session 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn per-job 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 UDF Java 方言Local模式在线编写、调试、动态加载 | 0.5.0 |
| | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 |
| | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 |
| | | 支持 作业 Cancel | 0.4.0 |
| | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 |
| | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 |
| | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 |
| | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 |
| | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 |
| | | 新增 外部数据源的 SQL 执行与预览 | 0.5.0 |
| | BI | 新增 折线图的渲染 | 0.5.0 |
| | | 新增 条形图图的渲染 | 0.5.0 |
| | | 新增 饼图的渲染 | 0.5.0 |
| | 元数据 | 新增 查询外部数据源的元数据信息 | 0.4.0 |
| | | 新增 FlinkSQL 和 SQL 的自动生成 | 0.6.0 |
| | 归档 | 新增 执行与提交历史 | 0.4.0 |
| 运维中心 | 暂无 | 暂无 | 0.4.0 |
| 注册中心 | Flink 集群实例 | 新增 外部 Flink 集群实例注册 | 0.4.0 |
| | | 新增 外部 Flink 集群实例心态检测与版本获取 | 0.4.0 |
| | | 新增 外部 Flink 集群手动一键回收 | 0.4.0 |
| | Flink 集群配置 | 新增 Flink On Yarn 集群配置注册及测试 | 0.4.0 |
| | User Jar | 新增 外部 User Jar 注册 | 0.4.0 |
| | 数据源 | 新增 Mysql 数据源注册及测试 | 0.4.0 |
| | | 新增 Oracle 数据源注册及测试 | 0.4.0 |
| | | 新增 postgreSql 数据源注册及测试 | 0.4.0 |
| | | 新增 ClickHouse 数据源注册及测试 | 0.4.0 |
| OpenApi | 调度 | 新增 submitTask 调度接口 | 0.5.0 |
| | FlinkSQL | 新增 executeSql 提交接口 | 0.5.0 |
| | | 新增 explainSql 验证接口 | 0.5.0 |
| | | 新增 getJobPlan 计划接口 | 0.5.0 |
| | | 新增 getStreamGraph 计划接口 | 0.5.0 |
| | | 新增 getJobData 数据接口 | 0.5.0 |
| | Flink | 新增 executeJar 提交接口 | 0.5.0 |
| | | 新增 cancel 停止接口 | 0.5.0 |
| | | 新增 savepoint 触发接口 | 0.5.0 |
| 关于 | 关于 Dinky | 版本更新记录
\ No newline at end of file
## Dinky介绍
实时即未来,Dinky 为 Apache Flink 而生,让 Flink SQL 纵享丝滑,并致力于实时计算平台建设。
Dinky 架构于 Apache Flink,增强 Flink 的应用与体验,探索流式数仓。即站在巨人肩膀上创新与实践,Dinky 在未来批流一体的发展趋势下潜力无限。
最后,Dinky 的发展皆归功于 Apache Flink 等其他优秀的开源项目的指导与成果。
## Dinky由来
Dinky(原 Dlink):
1.Dinky 英译为 “ 小巧而精致的 ” ,最直观的表明了它的特征:轻量级但又具备复杂的大数据开发能力。
2.为 “ Data Integrate No Knotty ” 的首字母组合,英译 “ 数据整合不难 ”,寓意 “ 易于建设批流一体平台及应用 ”。
3.从 Dlink 改名为 Dinky 过渡平滑,更加形象的阐明了开源项目的目标,始终指引参与者们 “不忘初心,方得始终 ”。
\ No newline at end of file
欢迎您加入社区交流分享,也欢迎您为社区贡献自己的力量。
在此非常感谢大家的支持~
QQ社区群:**543709668**,申请备注 “ Dinky ”,不写不批
微信社区群(推荐):添加微信号 wenmo_ai 邀请进群,申请备注 “ Dinky + 企业名 + 职位”,不写不批
公众号(最新消息获取建议关注):[DataLink数据中台](https://mmbiz.qpic.cn/mmbiz_jpg/dyicwnSlTFTp6w4PuJruFaLV6uShCJDkzqwtnbQJrQ90yKDuuIC8tyMU5DK69XZibibx7EPPBRQ3ic81se5UQYs21g/0?wx_fmt=jpeg)
\ No newline at end of file
1.任务生命周期管理
2.作业监控及运维
3.流作业自动恢复
4.作业日志查看
5.钉钉报警和推送
\ No newline at end of file
[Apache Flink](https://github.com/apache/flink)
[Mybatis Plus](https://github.com/baomidou/mybatis-plus)
[ant-design-pro](https://github.com/ant-design/ant-design-pro)
[Monaco Editor](https://github.com/Microsoft/monaco-editor)
[SpringBoot]()
[docsify](https://github.com/docsifyjs/docsify/)
此外,感谢 [JetBrains](https://www.jetbrains.com/?from=dlink) 提供的免费开源 License 赞助
[![JetBrains](https://github.com/DataLinkDC/dlink/raw/main/dlink-doc/images/main/jetbrains.svg)](https://www.jetbrains.com/?from=dlink)
\ No newline at end of file
## 摘要
本文讲述了 Dlink 对 Flink 的表值聚合功能的应用与增强。增强主要在于定义了 AGGTABLE 来通过 FlinkSql 进行表值聚合的实现,以下将通过两个示例 top2 与 to_map 进行讲解。
## 准备工作
### 准备测试表
#### 学生表(student)
```sql
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`sid` int(11) NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `student` VALUES (1, '小明');
INSERT INTO `student` VALUES (2, '小红');
INSERT INTO `student` VALUES (3, '小黑');
INSERT INTO `student` VALUES (4, '小白');
```
#### 一维成绩表(score)
```sql
DROP TABLE IF EXISTS `score`;
CREATE TABLE `score` (
`cid` int(11) NOT NULL,
`sid` int(11) NULL DEFAULT NULL,
`cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`score` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`cid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `score` VALUES (1, 1, 'chinese', 90);
INSERT INTO `score` VALUES (2, 1, 'math', 100);
INSERT INTO `score` VALUES (3, 1, 'english', 95);
INSERT INTO `score` VALUES (4, 2, 'chinese', 98);
INSERT INTO `score` VALUES (5, 2, 'english', 99);
INSERT INTO `score` VALUES (6, 3, 'chinese', 99);
INSERT INTO `score` VALUES (7, 3, 'english', 100);
```
#### 前二排名表(scoretop2)
```sql
DROP TABLE IF EXISTS `scoretop2`;
CREATE TABLE `scoretop2` (
`cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`score` int(11) NULL DEFAULT NULL,
`rank` int(11) NOT NULL,
PRIMARY KEY (`cls`, `rank`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
```
#### 二维成绩单表(studentscore)
```sql
DROP TABLE IF EXISTS `studentscore`;
CREATE TABLE `studentscore` (
`sid` int(11) NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`chinese` int(11) NULL DEFAULT NULL,
`math` int(11) NULL DEFAULT NULL,
`english` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
```
### 问题提出
#### 输出各科成绩前二的分数
要求输出已有学科排名前二的分数到scoretop2表中。
#### 输出二维成绩单
要求将一维成绩表转化为二维成绩单,其中不存在的成绩得分为0,并输出至studentscore表中。
## Dlink 的 AGGTABLE
​ 本文以 Flink 官方的 Table Aggregate Functions 示例 Top2 为例进行比较说明,传送门 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/#table-aggregate-functions
### 官方 Table API 实现
```java
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.*;
// mutable accumulator of structured type for the aggregate function
public static class Top2Accumulator {
public Integer first;
public Integer second;
}
// function that takes (value INT), stores intermediate results in a structured
// type of Top2Accumulator, and returns the result as a structured type of Tuple2<Integer, Integer>
// for value and rank
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accumulator> {
@Override
public Top2Accumulator createAccumulator() {
Top2Accumulator acc = new Top2Accumulator();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accumulator acc, Integer value) {
if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}
public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>> out) {
// emit the value and rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
TableEnvironment env = TableEnvironment.create(...);
// call function "inline" without registration in Table API
env
.from("MyTable")
.groupBy($("myField"))
.flatAggregate(call(Top2.class, $("value")))
.select($("myField"), $("f0"), $("f1"));
// call function "inline" without registration in Table API
// but use an alias for a better naming of Tuple2's fields
env
.from("MyTable")
.groupBy($("myField"))
.flatAggregate(call(Top2.class, $("value")).as("value", "rank"))
.select($("myField"), $("value"), $("rank"));
// register function
env.createTemporarySystemFunction("Top2", Top2.class);
// call registered function in Table API
env
.from("MyTable")
.groupBy($("myField"))
.flatAggregate(call("Top2", $("value")).as("value", "rank"))
.select($("myField"), $("value"), $("rank"));
```
### Dlink FlinkSql 实现
#### 示例
```sql
CREATE AGGTABLE aggdemo AS
SELECT myField,value,rank
FROM MyTable
GROUP BY myField
AGG BY TOP2(value) as (value,rank);
```
#### 优势
​ 可以通过 FlinkSql 来实现表值聚合的需求,降低了开发与维护成本。
#### 缺点
​ 语法固定,示例关键字必须存在并进行描述,where 可以加在 FROM 和 GROUP BY 之间。
## Dlink 本地实现各科成绩前二
​ 本示例通过 Dlink 的本地环境进行演示实现。
### 进入Dlink
![image-20210615115042539](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkGIAFicLZ3bwSawOianJQnNWuKAvZJ3Bb00DiaBxtxvnXgToGibPAwMFhs6A/0?wx_fmt=png)
​ 只有版本号大于等于 0.2.2-rc1 的 Dlink 才支持本文 AGGTABLE 的使用。
### 编写 FlinkSQL
```sql
jdbcconfig:='connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',
'username'='dlink',
'password'='dlink',;
CREATE TABLE student (
sid INT,
name STRING,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
${jdbcconfig}
'table-name' = 'student'
);
CREATE TABLE score (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
${jdbcconfig}
'table-name' = 'score'
);
CREATE TABLE scoretop2 (
cls STRING,
score INT,
`rank` INT,
PRIMARY KEY (cls,`rank`) NOT ENFORCED
) WITH (
${jdbcconfig}
'table-name' = 'scoretop2'
);
CREATE AGGTABLE aggscore AS
SELECT cls,score,rank
FROM score
GROUP BY cls
AGG BY TOP2(score) as (score,rank);
insert into scoretop2
select
b.cls,b.score,b.`rank`
from aggscore b
```
​ 本 Sql 使用了 Dlink 的增强特性 Fragment 机制,对 jdbc的配置进行了定义。
### 维护 FlinkSQL 及配置
![image-20210615115521967](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkGeibmfcst4hHVTqzFmX6LvBXqgPTFcCOWHuIxEcbNHgfnUc0mhPm1eFw/0?wx_fmt=png)
​ 编写 FlinkSQL ,配置开启 Fragment 机制,设置 Flink 集群为本地执行。点击保存。
### 同步执行INSERT
![image-20210615115714713](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkGApFiacyxkKERLE9FhsteTeTovcjTQHiaPKcxY6YqSukkVYZWVFGxPJibQ/0?wx_fmt=png)
​ 点击同步执行按钮运行当前编辑器中的 FlinkSQL 语句集。弹出提示信息,等待执行完成后自动关闭并刷新信息和结果。
​ 当前版本使用异步提交功能将直接提交任务到集群,Studio 不负责执行结果的记录。提交任务前请保存 FlinkSQL 和配置,否则将提交旧的语句和配置。
### 执行反馈
![image-20210615115913647](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkGL7Wv8Tefsn0h1USWf2VLXB2Tb3yx4K2QksiaFplehnrvz25cE0nQnlA/0?wx_fmt=png)
​ 本地执行成功,“0_admin” 为本地会话,里面存储了 Catalog。
### 同步执行SELECT查看中间过程
![image-20210615120129426](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkGXkEXFib5ic21kOemq6ib8kWAdLCBicicjBxU9oibmaSs4Hru8EccxKe5z0dg/0?wx_fmt=png)
​ 由于当前会话中已经存储了表的定义,此时直接选中 select 语句点击同步执行可以重新计算并展示其计算过程中产生的结果,由于 Flink 表值聚合操作机制,该结果非最终结果。
### 同步执行SELECT查看最终结果
![image-20210615121542233](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkG5mNQFZp4YIuwIrh6cJteFIwsbomibSk32hWbFqlt887F9lee9NYT8fQ/0?wx_fmt=png)
​ 在草稿的页面使用相同的会话可以共享 Catalog,此时只需要执行 select 查询 sink 表就可以预览最终的统计结果。
### 查看Mysql表的数据
![image-20210615120738413](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkGerEdvQLXGNqfm7KZT7ARaNBV0mlrUdah69JAB3miaBFBgUU3iaaowcLg/0?wx_fmt=png)
​ sink 表中只有五条数据,结果是正确的。
## Dlink 远程实现二维成绩单
​ 本示例通过 Dlink 控制远程集群来实现。
​ 远程集群的 lib 中需要上传 dlink-function.jar 。
### 编写FlinkSQL
```sql
jdbcconfig:='connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',
'username'='dlink',
'password'='dlink',;
CREATE TABLE student (
sid INT,
name STRING,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
${jdbcconfig}
'table-name' = 'student'
);
CREATE TABLE score (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
${jdbcconfig}
'table-name' = 'score'
);
CREATE TABLE studentscore (
sid INT,
name STRING,
chinese INT,
math INT,
english INT,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
${jdbcconfig}
'table-name' = 'studentscore'
);
CREATE AGGTABLE aggscore2 AS
SELECT sid,data
FROM score
GROUP BY sid
AGG BY TO_MAP(cls,score) as (data);
insert into studentscore
select
a.sid,a.name,
cast(GET_KEY(b.data,'chinese','0') as int),
cast(GET_KEY(b.data,'math','0') as int),
cast(GET_KEY(b.data,'english','0') as int)
from student a
left join aggscore2 b on a.sid=b.sid
```
​ 本实例通过表值聚合将分组后的多行转单列然后通过 GET_KEY 取值的思路来实现。同时,也使用了 Fragment 机制。
### 同步执行
![image-20210615131731449](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkGxHX5T3C2vr2CF9LicZicBnGZOYmpXVq343zYFPjXsae0icQ1mTVWcsugQ/0?wx_fmt=png)
​ 与示例一相似,不同点在于需要更改集群配置为 远程集群。远程集群的注册在集群中心注册,Hosts 需要填写 JobManager 的地址,HA模式则使用英文逗号分割可能出现的地址,如“127.0.0.1:8081,127.0.0.2:8081,127.0.0.3:8081”。心跳监测正常的集群实例即可用于任务执行或提交。
### Flink UI
![image-20210615131931183](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkGCSGp5fSGaRz0PgvFlEmWSRdiaZZHbmicvYWXnLzoNL3HWEc3mL1W2jPA/0?wx_fmt=png)
​ 打开集群的 Flink UI 可以发现刚刚提交的批任务,此时可以发现集群版本号为 1.12.2 ,而 Dlink 默认版本为 1.12.4 ,所以一般大版本内可以互相兼容。
### 查看Mysql表的数据
![image-20210615132004925](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTrkkX1Jsib7GxQY7tpiciaNdkGc9NX5IzQ6Kog5oYPiaaELmCYzh3vpdUaK40hNuFPrlAWY1jlZd7QbtQ/0?wx_fmt=png)
​ 查看 Mysql 表的最终数据,发现存在四条结果,且也符合问题的要求。
## Dlink 是什么
Dlink 是一个基于 Apache Flink 二次开发的网页版的 FlinkSQL Studio,可以连接多个 Flink 集群实例,并在线开发、执行、提交 FlinkSQL 语句以及预览其运行结果,支持 Flink 官方所有语法并进行了些许增强。
## 与 Flink 的不同
Dlink 基于 Flink 源码二次开发,主要应用于 SQL 任务的管理与执行。以下将介绍 Dlink-0.2.3 与 Flink 的不同。
### Dlink 的原理
![](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTo5cwuZy7GSLibw5J7Lx6cicx0vXaDHqn5VrrDJ9d3hcEicbEVO77NcP6bOylC9bOpuibM08JJ8bh8XQQ/0?wx_fmt=png)
### Dlink 的 FlinkSQL 执行原理
![](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqVImq5JvQzZ7oMqpnQ2NVHdmM6Pfib63atzoWNIqK7Ue6p9KfdibV889sOcZJ1Olw3kLHLmHZiab3Tg/0?wx_fmt=png)
### Connector 的扩展
Dlink 的 Connector 的使用与扩展同 Flink 的完全一致,即当使用 Flink 集成 Dlink 时,只需要将 Flink 扩展的依赖加入 Dlink 的 lib 下即可。
当然,Dlink 自身源码也提供了一些 Connector ,它们遵循 Flink 的扩展要求,可以直接被加入到 Flink 的 lib 下进行使用。
### 多版本支持
Dlink 的单机版只能同时稳定连接同一大版本号下的不同版本的 Flink 集群实例,连接其他大版本号的集群实例在提交任务时可能存在问题;而 DataLink 中的 Dlink 微服务版可以同时稳定连接所有版本号的 Flink 集群实例。
Dlink 提供了多版本的 `dlink-client.jar`,根据需求选择对应版本的依赖加入到 lib 下即可稳定连接该版本的 Flink 集群实例。
### Catalog 共享
Dlink 提供了共享会话对 Flink 的 Catalog、环境配置等进行了长期管理,可以实现团队开发共享 Catalog 的效果。
### Sql 语法增强
Dlink 对 FlinkSQL 的语法进行增强,主要表现为 Sql 片段与表值聚合 Sql 化。
#### Sql 片段
```sql
sf:=select * from;tb:=student;
${sf} ${tb}
## 效果等同于
select * from student
```
#### 表值聚合
```sql
CREATE AGGTABLE aggdemo AS
SELECT myField,value,rank
FROM MyTable
GROUP BY myField
AGG BY TOP2(value) as (value,rank);
```
### 同步执行结果预览
Dlink 可以对同步执行的 FlinkSQL 进行运行完成的结果预览,同 `sql-client`
## 概念原理
在 Dlink 中具有六个概念,当熟悉他们的原理时,可以搭配出更强大的使用效果。
### 本地环境
本地环境即为`LocalEnvironment`,是在本地模式运行 Flink 程序的句柄,在本地的 JVM (standalone 或嵌入其他程序)里运行程序,通过调用`ExecutionEnvironment.createLocalEnvironment()`方法来实现。
Dlink 通过本地环境来实现隔离调试,本地环境执行时所需要的 `connector` 等资源在 `lib` 目录下引入。本地环境执行过程包含完整的 sql 执行过程。
### 远程环境
远程环境即为`RemoteEnvironment`,是在远程模式中向指定集群提交 Flink 程序的句柄,在目标集群的环境里运行程序,通过调用`ExecutionEnvironment.createRemoteEnvironment(host,port)`方法来实现,其中 host 为 `rest.address` ,port 为 `rest.port`
Dlink 可以对任意 standalone、on yarn等运行模式的远程集群进行 sql 提交。远程环境执行过程只包含 sql 任务的准备工作,即解析、优化、转化物理执行计划、生成算子、提交作业执行图。所以远程环境执行时所需要的 connector 等资源也需要在 lib 目录下引入。
### 共享会话
共享会话为用户与执行环境的操作会话,主要包含 Catalog、片段、执行环境配置等内容。可以认为官方的 `sql-client` 是一个会话,保留了本次命令窗口的操作结果,当退出 `sql-client` 后,会话结束。
Dlink 的共享会话相当于可以启动多个 `sql-client` 来进行会话操作,并且其他用户可以使用您的会话 key ,在对应环境中共享您的会话的所有信息。例如,通过执行环境 + 共享会话可以确定唯一的 Catalog。
### 临时会话
临时会话指不启用共享会话,您每次交互执行操作时,都会创建临时的独立的会话,操作解释后立即释放,适合作业解耦处理。
Dlink 的临时会话相当于只启动一个 `sql-client` ,执行完语句后立即关闭再启动。
### 同步执行
同步执行指通过 Studio 进行操作时为同步等待,当语句运行完成后返回运行结果。
Dlink 的语句与官方语句一致,并做了些许增强。Dlink 将所有语句划分为三种类型,即 `DDL``DQL``DML` 。对于同步执行来说, `DDL``DQL` 均为等待语句执行完成后返回运行结果,而 `DML` 语句则立即返回异步提交操作的执行结果。
### 异步提交
异步提交指通过 Studio 进行操作时为异步操作,当语句被执行后立马返回操作执行结果。
对于三种语句类型,Dlink 的异步提交均立即返回异步操作的执行结果。当前版本的 Dlink 的异步提交不进行历史记录。
### 搭配使用
| 执行环境 | 会话 | 运行方式 | 适用场景 |
| -------- | -------- | -------- | ------------------------------------------------------------ |
| 本地环境 | 临时会话 | 同步执行 | 无集群或集群不可用的情况下单独开发FlinkSQL作业,需要查看运行结果 |
| 本地环境 | 共享会话 | 同步执行 | 无集群或集群不可用的情况下复用Catalog或让同事排查bug,需要查看运行结果 |
| 本地环境 | 临时会话 | 异步提交 | 无集群或集群不可用的情况下快速启动一个作业,不需要查看运行结果 |
| 本地环境 | 共享会话 | 异步提交 | 共享会话效果无效 |
| 远程环境 | 临时会话 | 同步执行 | 依靠集群单独开发FlinkSQL作业,需要查看运行结果 |
| 远程环境 | 共享会话 | 同步执行 | 依靠集群复用Catalog或让同事排查bug,需要查看运行结果 |
| 远程环境 | 临时会话 | 异步提交 | 快速向集群提交任务,不需要查看运行结果 |
| 远程环境 | 共享会话 | 异步提交 | 共享会话效果无效 |
## 源码扩展
Dlink 的源码是非常简单的, Spring Boot 项目轻松上手。
### 项目结构
```java
dlink -- 父项目
|-dlink-admin -- 管理中心
|-dlink-client -- Client 中心
| |-dlink-client-1.12 -- Client-1.12 实现
| |-dlink-client-1.13 -- Client-1.13 实现
|-dlink-connectors -- Connectors 中心
| |-dlink-connector-jdbc -- Jdbc 扩展
|-dlink-core -- 执行中心
|-dlink-doc -- 文档
| |-bin -- 启动脚本
| |-bug -- bug 反馈
| |-config -- 配置文件
| |-doc -- 使用文档
| |-sql -- sql脚本
|-dlink-function -- 函数中心
|-dlink-web -- React 前端
```
### 模块介绍
#### dlink-admin
该模块为管理模块,基于 `Spring Boot + MybatisPlus` 框架开发,目前版本对作业、目录、文档、集群、语句等功能模块进行管理。
#### dlink-client
该模块为 Client 的封装模块,依赖了 `flink-client`,并自定义了新功能的实现如 `CustomTableEnvironmentImpl``SqlManager ` 等。
通过该模块完成对不同版本的 Flink 集群的适配工作。
#### dlink-connectors
该模块为 Connector 的封装模块,用于扩展 Flink 的 `Connector`
#### dlink-core
该模块为 Dlink 的核心处理模块,里面涉及了共享会话、拦截器、执行器等任务执行过程使用到的功能。
#### dlink-doc
该模块为 Dlink 的文档模块,部署相关资源以及使用文档等资料都在该模块下。
#### dlink-function
该模块为 UDF 的封装模块,用于扩展 Flink 的 `UDF`
#### dlink-web
该模块为 Dlink 的前端工程,基于 `Ant Design Pro` 开发,属于 `React` 技术栈,其中的 Sql 在线编辑器是基于 `Monaco Editor` 开发。
## Yarn-Session 实践
### 注册 Session 集群
![image-20211128225423360](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6iaAIees3BsUONXbIocdRUI0WWVSzibPpltibBbMmWgfWJ0AklUlPF9Ugw/0?wx_fmt=png)
进入集群中心进行远程集群的注册。点击新建按钮配置远程集群的参数。图中示例配置了一个 Flink on Yarn 的高可用集群,其中 JobManager HA 地址需要填写集群中所有可能被作为 JobManager 的 RestAPI 地址,多个地址间使用英文逗号分隔。表单提交时可能需要较长时间的等待,因为 dlink 正在努力的计算当前活跃的 JobManager 地址。
保存成功后,页面将展示出当前的 JobManager 地址以及被注册集群的版本号,状态为正常时表示可用。
注意:只有具备 JobManager 实例的 Flink 集群才可以被成功注册到 dlink 中。( Yarn-Per-Job 和 Yarn-Application 也具有 JobManager,当然也可以手动注册,但无法提交任务)
如状态异常时,请检查被注册的 Flink 集群地址是否能正常访问,默认端口号为8081,可能更改配置后发生了变化,查看位置为 Flink Web 的 JobManager 的 Configuration 中的 rest 相关属性。
### 执行 Hello World
万物都具有 Hello World 的第一步,当然 dlink 也是具有的。我们选取了基于 datagen 的流查询作为第一行 Flink Sql。具体如下:
```sql
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
select order_number,price,order_time from Orders
```
该例子使用到了 datagen,需要在 dlink 的 plugins 目录下添加 flink-table.jar。
点击 Flink Sql Studio 进入开发页面:
![image-20211128230416447](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6RuS3kibG0jJnDjoUicX7sN63UN6j7Osg4lWh5SelOw0hp4Vj6icFAuDkA/0?wx_fmt=png)
在中央的编辑器中编辑 Flink Sql。
右边作业配置:
1. 执行模式:选中 yarn-session;
2. Flink 集群:选中上文注册的测试集群;
3. SavePoint 策略:选中禁用;
4. 按需进行其他配置。
右边执行配置:
1. 预览结果:启用;
2. 远程执行:启用。
点击快捷操作栏的三角号按钮同步执行该 FlinkSQL 任务。
### 预览数据
![image-20211128231230335](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6leQx4mfqsdwVftlEUXSFWnEzOTJGrCGHUKo98SpIn11WkZquEwwjpg/0?wx_fmt=png)
切换到历史选项卡点击刷新可以查看提交进度。切换到结果选项卡,等待片刻点击获取最新数据即可预览 SELECT。
### 停止任务
![image-20211128231523703](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6koYZDQqzsawPOCTP64ycdUZlib5oJA9vo9fpX43DNTmibY60ojZv44zQ/0?wx_fmt=png)
切换到进程选项卡,选则对应的集群实例,查询当前任务,可执行停止操作。
## Yarn-Per-Job 实践
### 注册集群配置
进入集群中心——集群配置,注册配置。
![image-20211128231914983](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6Vcw98k4yfgR2dSo3BUhxdtpRdd8A7NLyXkZhFibhiciarp9DTY415UehQ/0?wx_fmt=png)
1. Hadoop 配置文件路径:指定配置文件路径(末尾无/),需要包含以下文件:core-site.xml,hdfs-site.xml,yarn-site.xml;
2. Flink 配置 lib 路径:指定 lib 的 hdfs 路径(末尾无/),需要包含 Flink 运行时的所有依赖,即 flink 的 lib 目录下的所有 jar;
3. Flink 配置文件路径:指定配置文件 flink-conf.yaml 的具体路径(末尾无/);
4. 按需配置其他参数(重写效果);
5. 配置基本信息(标识、名称等);
6. 点击测试或者保存。
### 执行升级版 Hello World
之前的 hello world 是个 SELECT 任务,改良下变为 INSERT 任务:
```sql
CREATE TABLE Orders (
order_number INT,
price DECIMAL(32,2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_number.kind' = 'sequence',
'fields.order_number.start' = '1',
'fields.order_number.end' = '1000'
);
CREATE TABLE pt (
ordertotal INT,
numtotal INT
) WITH (
'connector' = 'print'
);
insert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders
```
![image-20211128232734409](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6HU0ics1Er4CibiaRc8jndmc3rHX9J3ArSp4QF2qyARR46hPzOh0kDbYYQ/0?wx_fmt=png)
编写 Flink SQL;
作业配置:
1. 执行模式:选中 yarn-per-job ;
2. Flink 集群配置:选中刚刚注册的配置;
3. SavePoint 策略:选中最近一次。
快捷操作栏:
1. 点击保存按钮保存当前所有配置;
2. 点击小火箭异步提交作业。
![image-20211128233423276](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6OsQDhibZQbh1nXrx9d0OBiaiaibnIMnLcSfT3MNxhY9CEyAxrA9NLczohA/0?wx_fmt=png)
注意,执行历史需要手动刷新。
### 自动注册集群
点击集群中心——集群实例,即可发现自动注册的 Per-Job 集群。
![image-20211128234056101](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6zCpJ7Knv1WYHTUxHt8iagpNNjRAPsPiaqBoD2xkLupcM7Op1Q48tb6ibQ/0?wx_fmt=png)
### 查看 Flink Web UI
提交成功后,点击历史的蓝色地址即可快速打开 Flink Web UI地址。
![image-20211128233735071](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6AQq5qVMAkwUM8WiaLnTrzDnSlLicTJZjWxowdW9dKUibNp33nrnHpL2Ng/0?wx_fmt=png)
### 从 Savepoint 处停止
在进程选项卡中选择自动注册的 Per-Job 集群,查看任务并 SavePoint-Cancel。
![image-20211128234321656](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6jDvzsoHias24jAY6ElCFj6lFCX1jjdGmoGGa5vI5xgNSEUzyEiaZEZNA/0?wx_fmt=png)
在右侧保存点选项卡可以查看该任务的所有 SavePoint 记录。
![image-20211128234619397](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6TGv1Gun2rB35Upv8hnsoeHzeDdFT4Ryag8icHz7BrzhE4YELiaMk7KYw/0?wx_fmt=png)
### 从 SavePoint 处启动
再次点击小火箭提交任务。
![image-20211128235430989](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6WTchGqUeBiaNY5hT6tjQ611UaQA3jawOS4uQHrN65icia3d4qLAfZMEibg/0?wx_fmt=png)
查看对应 Flink Web UI,从 Stdout 输出中证实 SavePoint 恢复成功。
## Yarn-Application 实践
### 注册集群配置
使用之前注册的集群配置即可。
### 上传 dlink-app.jar
第一次使用时,需要将 dlink-app.jar 上传到 hdfs 指定目录,目录可修改如下:
![image-20211128235914006](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6loq9rmu2tlkDvgSoD6WSlNBrniabV7MibNtSQrA2wKnCjKOzUGGiawW4g/0?wx_fmt=png)
50070 端口 浏览文件系统如下:
![image-20211129000047789](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X68kEAFYLBQpGcamP5djEaj9LiaLqlQCxVIXrbdFbgCb4Ct25HTAHCRIw/0?wx_fmt=png)
### 执行升级版 Hello World
作业配置:
1. 执行模式:选中 yarn-application ;
快捷操作栏:
1. 点击保存按钮保存当前所有配置;
2. 点击小火箭异步提交作业。
### 其他同 Per-Job
其他操作同 yarn-per-job ,本文不再做描述。
### 提交 User Jar
作业中心—— Jar 管理,注册 User Jar 配置。
![image-20211129000804888](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6lGkXGSXSP9JyWTxc5rrh8zD9y5XYR9HkKxRRDicbUQicSFhaAAR0Ulxw/0?wx_fmt=png)
右边作业配置的可执行 Jar 选择刚刚注册的 Jar 配置,保存后点击小火箭提交作业。
![image-20211129000933320](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6qyajpuR1OPlsFynwibdSRX3ECcRGGJPmutqyaibJbFS8HYCYic0rswuiaw/0?wx_fmt=png)
由于提交了个批作业,Yarn 可以发现已经执行完成并销毁集群了。
![image-20211129001241101](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6UJbUIgHTSJHN29zicKuf761ERGnZHibMMURhuFpL2Iiah9LSiceTXIrAyg/0?wx_fmt=png)
### 任务生命周期管理
FlinkSQL 生命周期:创建、开发、调试、发布、上线、注销。
Dlink 的 FlinkSQL Studio 负责 FlinkSQL 的开发和调试,在确定最终的 SQL 口径及任务配置后,可通过任务发布功能自动地在运维中心注册测试或生产环境下的最终任务,同时具备版本的管理,将开发与运维分离,保证生产环境的稳定性。
在运维中心可以上线已发布的任务,或者将已上线的任务进行下线,然后可以通过维护功能将任务重新进入开发和调试的进度。
最后,可以在运维中心注销已经不需要或者错误的任务,将被彻底删除。
### 元数据管理
Dlink 目前支持对外部元数据的采集功能,将建设统一的元数据管理,使其可以不需要依赖第三方元数据平台,独自进行更加适应实时数仓的元数据消费操作,统一规范拥有大量数据表、复杂关系的建设需求。
元数据主要包含采集、构建、管理、同步功能。
采集:Dlink 通过 SPI 来扩展实现更多数据源的元数据采集功能,使其可以轻松对接第三方存储库、元数据平台等,甚至可以将消息队列的元数据采集进行扩展,以便于洞悉实时数仓的流数据结构。
构建:Dlink 提供构建逻辑表、字段、关系的能力,解耦外部存储层。通过词根维护来规范命名定义。
管理:Dlink 支持对逻辑表和物理表的结构的可视化管理能力,可添加物理表不支持的信息如标签、分类、注释、权限等。
同步:Dlink 支持自动或手动地将元数据变动同步至对应数据源,或根据逻辑表在数据源上创建物理表。
### 血缘和影响分析
Dlink 目前具备任务表级的 FlinkSQL 血缘分析,通过 FlinkSQL 解析并构造后的 StreamGraph 来获取血缘关系,规避了冗余 Create Table 等的影响,同时支持多 Create View 的语句,使 FlinkSQL 结构更加清晰明了易于维护。
FlinkSQL 任务被发布到运维中心时,会自动生成血缘关系,与元数据管理的元数据信息做对应,进而形成全局的数据链路关系,便同时得到了影响分析。拥有了血缘和影响分析,便更加方便的管理和优化所有的数据任务。
处在 Studio 开发环节的任务,则可以根据已发布的任务构成的数据链路关系来获取自身的全局血缘及影响分析。
单从血缘分析来说,含有表级、字段级、记录级。Dlink 将完善字段级血缘并开放,记录级则是未来探索的一个方向,记录级的血缘将会更直观地展现出数据的治理过程,便于排查数据内容问题。
### 集群运维
Dlink 目前的 FlinkSQL 敏捷需要提取部署好外部的环境才能使用,而该过程目前是通过人工手动进行,需要进行复杂的运维操作,此外还要解决因依赖导致的各种问题。
Dlink 将对集群环境的搭建和启停等操作进行自动化地支持。
首先配置免密通信集群的节点信息,将部署资源提前放到 Dlink 目录下或通过镜像地址进行下载,通过集群模板的配置来分发和部署所使用的 Flink 资源及其他资源,若为 K8S 环境则打包镜像并装载至容器。资源到位后可直接通过 Dlink 启动对应集群如 Standalone 、Yarn-Session 和 K8S-Session等。做到集群部署运维托管 Dlink 。
### 运行监控
Dlink 需要对集群资源及 Flink 作业进行时序监控,支持外部对接 Prometheus 消费定制化的时序数据。
Dlink 通过 SPI 的方式来实现自定义监控接口实现,使其可以插件化地管理不同的中间件的不同的 Metrics 的实现或者对接外部 Metrics 采集组件。
Dlink 通过 JobManager 对 Flink 作业进行状态监控,反馈异常的指标,辅助用户对作业进行口径或者参数优化。
### 报警推送
Dlink 通过 SPI 来扩展报警方式,将先实现钉钉的报警插件,后续企业号、邮箱等留给社区贡献开发。
Dlink 通过自定义报警规则及内容模板来触发报警或者推送报表,使用户第一时间知晓生产环境的异常状况以及其自定义的报表及推送信息如批流任务启停、依赖任务启停、集群操作推送等。
### 依赖调度
Dlink 定位是批流一体平台,不排除用户存在大量的复杂依赖关系的调度需求。
Dlink 提供依赖调度引擎,通过全局的数据链路关系自动获得任务的 DAG 图,根据指定的依赖调度作业参数手动或定时拉起守护线程 Daemon,Daemon 通过子调度组、 DAG 及节点权重、并行度、黑名单、超时时间、异常处理策略、任务历史执行信息、运行监控反馈的资源信息等来通过 SDJF(短依赖作业优先)算法进行大量依赖作业的动态调度编排,合理充分利用资源的同时缩短整个数仓的数据同步周期。Daemon 触发报警规则或异常时会进行报警,执行完所有的任务后会触发推送,并根据后驱依赖调度组配置进行递归调度。
在容错方面,Daemon 可以在异常任务处跳过当前节点或后续影响节点,也可触发停止并报警。当 Daemon 因异常原因停止后,由于其自身状态信息根据归档周期进行持久化存储,所以可以从最新的快照恢复 Daemon ,从而恢复后续任务的正常执行。当然可以对Daemon进行暂停、或停止操作,进行作业维护,维护成功后可以恢复执行。
以上的特性将使用户无需梳理复杂的依赖关系或者手动配置 DAG,也不需要估测调度间隔或者长期观察任务执行情况进行手动优化。由于 Daemon 依据任务历史执行数据作为调度影响因子,随着时间的推移会自动编排出最合适的并行调度计划(类似于机器学习)。此外由于子依赖调度组的设计可以在执行前合并子组的 DAG,使用户可以将大量任务以业务主题划分调度组,更有利于作业的维护,而其后驱依赖调度组的设计则可以以时序的方式隔离两个调度组,实现隔离 DAG 调度。
### 作业自动恢复
Dlink 批流一体的发展趋势必然会出现越来越多的流或批流一体任务。
而其守护线程 Daemon 分为两者,一种是上文说到的依赖调度守护线程,另一种则是实时任务守护线程。在实时任务守护线程下,Daemon 支持根据 savepoint 周期配置项来周期性地进行 savepoint 的触发,满足在任务异常失败后自动从 savepoint 恢复的机制,checkpoint 则依赖 Flink 自身的恢复能力自动从 checkpoint 恢复任务,当然也可以通过 RocksDB 管理 checkpoint 并存储至文件系统,Daemon 在任务异常失败后自动从 checkpoint 恢复。可见两种恢复机制的成本不一样,根据具体需求选择。周期性的备份状态自然会造成大量的冗余文件,可以配置保留的备份次数,自动清除过期状态。当作业超过失败重启次数后,Daemon 会自动报警;当满足推送周期可自动推送任务的运行信息。
### 守护进程
在RPC版本发布前,仍为守护线程,上文谈到了 Daemon 的两种线程分类,此外还一种守护进程,位于 RPC 版本。
在 RPC 版本中,上文所说的两种 Daemon 主线程会在运行期间周期地及手动触发地发送自身信息给 Daemon 进程,当 Daemon 在预计的延时内未接受到 Daemon 主线程的信息,会认为该线程异常中断,便远程通信使其自动从快照恢复。
守护进程 Daemon 还管理作业执行等线程,Dlink 的 FlinkSQL 作业提交看似简单,但其后台进行了复杂的多步处理如:准备执行环境、解析增强语法、组装语句集、解析翻译优化得到 JobGraph、获取 yarnClient、提交JobGraph、等待响应。提交线程将其进度以及需要持久化到数据库的信息发送给 Daemon,Daemon 负责管理以及委托持久化。当然也可以通过 Daemon 来中断提交线程。
此外 Daemon 也负责 dlink-client 、dlink-server 与 dlink-admin三个进程的实例管理,配合 dubbo 来治理服务及扩展新服务。
### 库表数据同步
Dlink 将提供基于 Flink 引擎的可视化构建库表数据同步任务的功能。
离线方面,Dlink 通过界面配置库表同步的作业配置,作业启动后,Dlink 从配置中获取数据源信息及库表选择信息等其他配置项,自动构建 Flink 批作业并交由 Daemon 依赖调度托管大量任务的有序稳定执行。
实时方面,Dlink 则根据配置信息自动构建 FlinkCDC 无锁作业,并交由 Daemon 实时任务守护进行流任务托管。
批流一体方面,Dlink 则将由上述两个 Daemon 协作完成,后者启动流任务后,前者通过批任务完成历史数据合并,或直接使用 FlinkCDC自带的批流一体读取来实现同步,具体按需求选择。
以上数据同步任务的定义将提供 SQL 语句 create datasync 来实现一句 SQL 定义任务的效果。
### 企业级功能
Dlink 将提供轻量的企业管理能力,如多租户、项目、角色、权限、审计。
此外 Dlink 将重新设计后台架构,使其更加解耦且插件化,基于服务的治理来满足大型场景的建设需求。
### 多版本 Flink-Client Server
在单机版本中,dlink-client 的执行环境所需要的依赖均从项目的 lib 和 plugins 目录下加载,一个 Dlink 实例只能部署一个版本的 Flink 环境。
在 RPC 版本中,将通过服务治理来同时支持不同版本的 dlink-client 任务提交。dlink-admin 管理 Flink-Client Server,通知 dlink-server 来启动 dlink-client,dlink-client 可以根据指定的依赖启动对应的 Flink Client环境并久驻,也可以根据环境变量来作为插件部署到 Flink 集群直接启动对应的 Flink Client环境并久驻。
Dlink 的任务在提交时,会根据指定集群实例或集群配置来获取对应版本号或者指定的 Flink-Client Server 来选择对应的 Flink-Client Server 进行任务的提交等其他操作。
### Flink StreamGraph 和 JobGraph 的可视化修改
Dlink 将提供 StreamGraph 和 JobGraph 两种状态下的任务计划可视化修改功能,如修改 StreamGraph 的算子并行度、自动追加 Sink 等。还支持将 Jar 提交任务在 dlink-client 转换成 StreamGraph 和 JobGraph ,然后进行分析、修改及统一提交,这样 Jar 任务也将可以得到血缘分析,进而可以被合并到数据链路图,被依赖调度一起托管。
### Flink 自动化动态扩缩容
Flink 流任务的动态扩缩容是个降本增效的好措施,Dlink 将提供自动化的自动动态扩缩容来应对 Reactive Mode 和非 Reactive Mode 两种场景。
首先 Dlink 会通过运行监控接口获取流作业的时序资源占用数据,以天级别或周级别甚至月级别来计算和评估资源的占用模型。
对于 Reactive Mode ,即 Flink 1.13 之后的 Standalone Application Mode 模式下,可通过 Kubernetes Horizontal Pod Autoscaler 进行自动扩缩容。
而对于非 Reactive Mode ,Dlink 将通过 Daemon 依据资源预测模型进行周期性的作业调整并行度等其他优化配置和重启作业来完成较高成本的自动化动态扩缩容。
### FlinkSQL OLAP & BI
Dlink 将投入更多精力来优化基于 FlinkSQL 来进行 OLAP 查询和查询结果BI化,使其可以通过柱状图、折线图、饼图等直观地展现出数据特征。
在 FlinkSQL OLAP 方面,一是,Dlink 将优化 Session 模式的作业提交效率与作业配置,逐步减少整个查询请求的响应时间;二是,Dlink 将自动装载指定数据源的元数据到对应会话中,使其 SQL 开发只需要关注 select 的口径,无需再次编写 set 和 create。
在 BI 方面,Dlink 将 FlinkSQL 及其他查询引擎如 jdbc 的查询结果进行自动化的转换,将表格数据转换为柱状图、折线图、饼图等其他图形所需要的数据格式,并进行渲染,便于数据科学家更值观地分析数据。
### FlinkSQL 翻译及生成
Dlink 将提供 FlinkSQL 翻译功能,该功能可以将传统 SQL 如 Mysql、Oracle 等 DDL 、DQL 语句翻译为 FlinkSQL 语句,便于作业迁移和降低门槛。通过SPI来扩展其他 Dialect 的转换。
Dlink也将提供 FlinkSQL 生成功能,通过元数据来生成 DDL,自动对齐 insert into select 等,使 FlinkSQL 开发更加便捷。
### Dlink-Jdbc
Dlink 将提供自身 jdbc 组件来便捷基于 Dlink 引擎的 FlinkSQL 任务提交。第三方系统(业务系统、数据库工具、调度平台、BI平台等等使用 jdbc 的系统)通过引入 dlink-jdbc.jar,如同开发 Mysql 的 jdbc 应用操作来执行 FlinkSQL,与 dlink-server 进行通信,dlink-server 根据 url 参数配置在对应版本的 dlink-client 上执行其 FlinkSQL。
### FlinkSQL Studio 交互优化
Dlink 目前提供了简陋的 Studio ,虽然可以满足基本的开发需求,但 Studio 其他功能同样对开发调试具有重大影响,如项目导入导出、文件导入导出、开发Demo、配置模板、执行日志、SQL 对比等功能。
Dlink 除了将逐步完成以上功能外,还要进行交互上的优化,使其更加接近专业的 IDE,如风格切换、面板调整、定时保存、History对比和恢复等。
### 实践分享
Dlink 将投入更多精力围绕业界主流的存储架构、平台等进行应用实践分享。
Dlink 通过用户在生产上对接各种生态的实践进行总结和整理,最终在公众号、官网中分享各实践主题下的用户经验与操作说明,如 FlinkCDC、Hive、ClickHouse、Doris、Hudi、Iceberg 等基于 Dlink 快速落地的经验。
Dlink 也将积极对接其他开源平台如 Linkis、AirFlow、DolphinScheduler、DataSphere Studio 等,使其可以为各平台在 Flink 支持上提供更多一种的选择,也将实现对应的批量作业导入功能,使其可以低成本地迁移作业。
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment