Commit 1f48eb8e authored by wenmo's avatar wenmo

upload docs

parent 8aedb681
This diff is collapsed.
![logo](_media/dlinklogo.png)
![logo](_media/dlink.svg)
# Dlink <small>0.4</small>
> Dlink 为 Apache Flink 而生,让 Flink SQL 更加丝滑。
> Dlink 为 Apache Flink 而生,让 Flink SQL 纵享丝滑。
- 交互式 FlinkSQL Studio
......@@ -11,4 +11,4 @@
[GitHub](https://github.com/DataLinkDC/dlink)
[Gitee](https://gitee.com/DataLinkDC/dlink)
[Get Started](/quickstart)
[Get Started](/guide/quickstart)
<?xml version="1.0" standalone="no"?><!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"><svg t="1639891966613" class="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" p-id="4315" xmlns:xlink="http://www.w3.org/1999/xlink" width="200" height="200"><defs><style type="text/css"></style></defs><path d="M828.163241 196.040908l-0.226151 0C747.136833 115.146507 635.360074 65.11502 511.867482 65.11502c-122.740462 0-234.780211 50.031487-315.318502 130.925888l-0.488117 0c-81.026408 80.538291-130.813325 192.446034-130.813325 316.426743 0 122.872468 49.78794 234.649228 130.813325 315.187519 81.026408 81.513502 192.57804 131.301442 315.806619 131.301442 123.492592 0 235.269351-49.786916 315.807642-131.301442l0.488117 0c80.801281-80.538291 130.813325-192.316074 130.813325-315.187519C958.976566 388.486942 909.188626 276.579199 828.163241 196.040908M779.352559 779.331581 779.352559 779.331581l0 0.26299c-68.335365 68.335365-162.766061 110.538559-267.485077 110.538559-104.455004 0-199.148689-42.202171-266.995937-110.538559l-0.488117-0.26299c-68.335365-68.335365-110.312408-162.898067-110.312408-266.864954 0-104.943121 41.977044-199.767789 110.312408-267.972171l0.488117-0.243547c67.847248-68.222801 162.540933-110.687962 266.995937-110.687962 104.719017 0 199.148689 42.466184 267.485077 110.687962l0 0.243547c68.823482 68.204381 110.800525 163.02905 110.800525 267.972171C890.153084 616.434537 848.175017 710.996217 779.352559 779.331581" p-id="4316" fill="#BF73F2" data-spm-anchor-id="a313x.7781069.0.i7" class="selected"></path><path d="M741.278405 530.395976c-2.177596 0-4.618181-0.225127-6.832615-0.488117l-6.082532-0.976234-430.776094-66.007343c-0.713244-0.243547-1.464351-0.243547-1.952468-0.243547-25.118074-4.749164-41.488927-27.690666-37.809117-52.227502 0.225127-3.284813 1.201361-6.477528 2.665713-10.250458 0.488117-0.731664 0.976234-1.707898 1.464351-2.684132 0.976234-2.553149 1.952468-4.28049 3.416819-6.100952 40.024575-68.955488 127.171378-112.62201 225.995127-113.598244 0.976234 0 33.19196-3.547803 45.393863-34.542724 0-0.243547 11.489682-20.98801 24.405853-20.98801 5.858428 0 12.69002 6.232958 13.666254 11.713786 0.264013 1.464351 7.096628 47.458895 34.656311 62.966077 90.074482 30.506804 158.409846 97.997941 176.206164 174.987407 0.48914 1.219781 0.976234 4.03592 1.239224 6.214539l0 1.238201 0.226151 0c0.26299 4.26207 0.26299 8.035-0.48914 12.07092C783.256472 513.443885 763.995803 530.395976 741.278405 530.395976" p-id="4317" fill="#E65270" data-spm-anchor-id="a313x.7781069.0.i8" class=""></path><path d="M476.010833 804.224528c-11.489682 0-20.500916-6.570649-21.252023-7.320733l-17.797341-14.381545c-106.408495-30.262234-175.983083-119.473045-162.315806-208.42189l12.465916-83.95511c1.201361-6.833639 7.058766-10.738575 13.892405-10.250458l52.227502 7.677866c6.833639 0.976234 11.226692 7.078209 10.250458 13.798261l-7.546883 60.525492c0 0-1.464351 25.118074 5.594415 49.89948 1.238201-14.042831 3.415796-31.238468 6.608511-50.275034l9.011234-59.436694c0.750083-3.284813 2.440585-6.214539 5.14416-7.67889 2.440585-2.308579 5.857405-3.059686 8.786107-2.571568l336.044546 53.204759c6.607488 0.976234 11.000542 7.320733 10.024308 13.797237l-12.69002 82.490759c-13.44215 88.705298-102.014418 151.934365-210.149231 151.934365l-6.345522 0-20.012799 8.785084C484.045833 803.512307 480.140896 804.224528 476.010833 804.224528" p-id="4318" fill="#FAA100" data-spm-anchor-id="a313x.7781069.0.i6" class=""></path></svg>
\ No newline at end of file
This diff is collapsed.
* [首页](/)
* 指南
* [快速开始](quickstart.md)
* [编译部署](api.md)
* [核心功能](api.md)
* [开发调试](api.md)
* [快速开始](/guide/quickstart.md)
* [编译部署](/guide/deploy.md)
* [核心功能](/guide/functions.md)
* [开发调试](/guide/debug.md)
* 扩展
* [扩展语法补全](api.md)
* [扩展自定义函数](api.md)
* [扩展连接器](api.md)
* [扩展数据源](api.md)
* [扩展 Flink 版本](api.md)
* [Flink-CDC 集成](api.md)
* [DolphinScheduler 集成](api.md)
* [DataSphereStudio 集成](api.md)
* [Hive 集成](api.md)
* [Doris 集成](api.md)
* [Clickhouse 集成](api.md)
* [扩展语法补全](/extend/completion.md)
* [扩展自定义函数](/extend/udf.md)
* [扩展连接器](/extend/connector.md)
* [扩展数据源](/extend/datasource.md)
* [扩展 Flink 版本](/extend/flinkversion.md)
* [Flink-CDC 集成](/extend/flinkcdc.md)
* [DolphinScheduler 集成](/extend/dolphinscheduler.md)
* [DataSphereStudio 集成](/extend/dataspherestudio.md)
* [Hive 集成](/extend/hive.md)
* [Doris 集成](/extend/doris.md)
* [Clickhouse 集成](/extend/clickhouse.md)
* 分享
* [Dlink Yarn 的三种提交实践](/share/yarnsubmit.md)
* [Dlink AGGTABLE 表值聚合的实践](/share/aggtable.md)
* [Dlink 核心概念及实现原理详解](/share/principle.md)
* API
* [OpenAPI](api.md)
* [OpenAPI](/api/openapi.md)
* 语言
* [中文](/)
......
# 1
\ No newline at end of file
## 敬请期待
\ No newline at end of file
![logo](_media/dlinklogo.png)
![logo](../_media/dlink.svg)
# Dlink <small>0.4</small>
......@@ -11,4 +11,4 @@
[GitHub](https://github.com/DataLinkDC/dlink)
[Gitee](https://gitee.com/DataLinkDC/dlink)
[Get Started](/en-US/guide)
\ No newline at end of file
[Get Started](/en-US/guide/quickstart)
\ No newline at end of file
* [Home](/en-US/)
* Guide
* [Guide](/en-US/quickstart.md)
* [Quick Start](/en-US/guide/quickstart.md)
* [Deploy](/en-US/guide/deploy.md)
* [Functions](/en-US/guide/functions.md)
* [Debug](/en-US/guide/debug.md)
* Extend
* [Completion](/en-US/extend/completion.md)
* [UDF](/en-US/extend/udf.md)
* [Connector](/en-US/extend/connector.md)
* [Datasource](/en-US/extend/datasource.md)
* [Flink version](/en-US/extend/flinkversion.md)
* [Flink-CDC](/en-US/extend/flinkcdc.md)
* [DolphinScheduler](/en-US/extend/dolphinscheduler.md)
* [DataSphereStudio](/en-US/extend/dataspherestudio.md)
* [Hive](/en-US/extend/hive.md)
* [Doris](/en-US/extend/doris.md)
* [Clickhouse](/en-US/extend/clickhouse.md)
* Share
* [Dlink Yarn](/en-US/share/yarnsubmit.md)
* [Dlink AGGTABLE](/en-US/share/aggtable.md)
* [Dlink Principle](/en-US/share/principle.md)
* API
* [API](/en-US/api.md)
* [OpenAPI](/en-US/api/openapi.md)
* Language
* [中文](/)
* [En](/en-US/)
\ No newline at end of file
# 1
\ No newline at end of file
## 敬请期待
\ No newline at end of file
## 敬请期待
\ No newline at end of file
## FlinkSQL 编辑器自动补全函数
Dlink-0.3.2 版本上线了一个非常实用的功能——自动补全。
我们在使用 IDEA 等工具时,提示方法并补全、生成的功能大大提升了开发效率。而 Dlink 的目标便是让 FlinkSQL 更加丝滑,所以其提供了自定义的自动补全功能。对比传统的使用 `Java` 字符串来编写 FlinkSQL 的方式,Dlink 的优势是巨大。
在文档中心,我们可以根据自己的需要扩展相应的自动补全规则,如 `UDF``Connector With` 等 FlinkSQL 片段,甚至可以扩展任意可以想象到内容,如注释、模板、配置、算法等。
具体新增规则的示例请看下文描述。
## set 语法来设置执行环境参数
对于一个 FlinkSQL 的任务来说,除了 sql 口径,其任务配置也十分重要。所以 Dlink-0.3.2 版本中提供了 `sql-client``set` 语法,可以通过 `set` 关键字来指定任务的执行配置(如 “ `set table.exec.resource.default-parallelism=2;` ” ),其优先级要高于 Dlink 自身的任务配置(面板右侧)。
那么长的参数一般人谁记得住?等等,别忘了 Dlink 的新功能自动补全~
配置实现输入 `parallelism` 子字符串来自动补全 `table.exec.resource.default-parallelism=`
在文档中心中添加一个规则,名称为 `parallelism`,填充值为 `table.exec.resource.default-parallelism=`,其他内容随意。
保存之后,来到编辑器输入 `par` .
选中要补全的规则后,编辑器中自动补全了 `table.exec.resource.default-parallelism=`
至此,有些小伙伴发现,是不是可以直接定义 `pl2` 来自动生成 `set table.exec.resource.default-parallelism=2;`
当然可以的。
还有小伙伴问,可不可以定义 `pl` 生成 `set table.exec.resource.default-parallelism=;` 后,光标自动定位到 `=``;` 之间?
这个也可以的,只需要定义 `pl` 填充值为 `set table.exec.resource.default-parallelism=${1:};` ,即可实现。
所以说,只要能想象到的都可以定义,这样的 Dlink 你爱了吗?
嘘,还有点小 bug 后续修复呢。如果有什么建议及问题请及时指出哦。
\ No newline at end of file
## 扩展 Connector
将 Flink 集群上已扩展好的 Connector 直接放入 Dlink 的 lib 或者 plugins 下,然后重启即可。定制 Connector 过程同 Flink 官方一样。
## 扩展 Metadata
遵循SPI。
\ No newline at end of file
## 敬请期待
\ No newline at end of file
## 敬请期待
\ No newline at end of file
## 背景
Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。
目前 Doris 的生态正在建设中,本文将分享如何基于 Dlink 实现 Mysql 变动数据通过 Flink 实时入库 Doris。
## 准备
老规矩,列清各组件版本:
| 组件 | 版本 |
| :-------------: | :----------: |
| Flink | 1.13.3 |
| Flink-mysql-cdc | 2.1.0 |
| Doris | 0.15.1-rc09 |
| doris-flink | 1.0-SNAPSHOT |
| Mysql | 8.0.13 |
| Dlink | 0.4.0 |
需要注意的是,本文的 Doris 是基于 OracleJDK1.8 和 Scala 2.11 通过源码进行编译打包的,所以所有组件的 scala 均为 2.11,此处和 Doris 社区略有不同。
## 部署
本文的 Flink 和 Doris 的部署不做描述,详情请查阅官网。
[Doris]: https://doris.apache.org/master/zh-CN/extending-doris/flink-doris-connector.html#%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95 "Doris"
本文在 Dlink 部署成功的基础上进行,如需查看具体部署步骤,请阅读《flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink》。
Dlink 的 plugins 下添加 `doris-flink-1.0-SNAPSHOT.jar``flink-sql-connector-mysql-cdc-2.1.0.jar` 。重启 Dlink。
```java
plugins/ -- Flink 相关扩展
|- doris-flink-1.0-SNAPSHOT.jar
|- flink-csv-1.13.3.jar
|- flink-dist_2.11-1.13.3.jar
|- flink-format-changelog-json-2.1.0.jar
|- flink-json-1.13.3.jar
|- flink-shaded-zookeeper-3.4.14.jar
|- flink-sql-connector-mysql-cdc-2.1.0.jar
|- flink-table_2.11-1.13.3.jar
|- flink-table-blink_2.11-1.13.3.jar
```
当然,如果您想直接使用 FLINK_HOME 的话,可以在 `auto.sh` 文件中 `SETTING` 变量添加`$FLINK_HOME/lib`
## 数据表
### 学生表 (student)
```sql
-- Mysql
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`sid` int(11) NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `student` VALUES (1, '小红');
INSERT INTO `student` VALUES (2, '小黑');
INSERT INTO `student` VALUES (3, '小黄');
```
### 成绩表(score)
```sql
-- Mysql
DROP TABLE IF EXISTS `score`;
CREATE TABLE `score` (
`cid` int(11) NOT NULL,
`sid` int(11) NULL DEFAULT NULL,
`cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`score` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`cid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `score` VALUES (1, 1, 'chinese', 90);
INSERT INTO `score` VALUES (2, 1, 'math', 95);
INSERT INTO `score` VALUES (3, 1, 'english', 93);
INSERT INTO `score` VALUES (4, 2, 'chinese', 92);
INSERT INTO `score` VALUES (5, 2, 'math', 75);
INSERT INTO `score` VALUES (6, 2, 'english', 80);
INSERT INTO `score` VALUES (7, 3, 'chinese', 100);
INSERT INTO `score` VALUES (8, 3, 'math', 60);
```
### 学生成绩宽表(scoreinfo)
```sql
-- Doris
CREATE TABLE scoreinfo
(
cid INT,
sid INT,
name VARCHAR(32),
cls VARCHAR(32),
score INT
)
UNIQUE KEY(cid)
DISTRIBUTED BY HASH(cid) BUCKETS 10
PROPERTIES("replication_num" = "1");
```
## FlinkSQL
```sql
CREATE TABLE student (
sid INT,
name STRING,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'student');
CREATE TABLE score (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'score');
CREATE TABLE scoreinfo (
cid INT,
sid INT,
name STRING,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030' ,
'table.identifier' = 'test.scoreinfo',
'username' = 'root',
'password'=''
);
insert into scoreinfo
select
a.cid,a.sid,b.name,a.cls,a.score
from score a
left join student b on a.sid = b.sid
```
## 调试
### 在 Dlink 中提交
本示例采用了 yarn-session 的方式进行提交。
![image-20211218134246511](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibG3QjNqVbMk7L41wHykKnkV0YxDCVSYj68HlFWylpYckkXicgnTDU7uQ/0?wx_fmt=png)
### FlinkWebUI
![image-20211218134439699](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibn1jXbvKznaF8Tm4AxxvYYDI0fEtXbGm0XUeXhGp44KMlPdoOzjvtHQ/0?wx_fmt=png)
上图可见,流任务已经成功被 Dlink 提交的远程集群了。
### Doris 查询
![image-20211218135404787](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9iblKTY6o9fWZxFDQYC19wKVFRGDuUBgNOZxm14sWjyr8tUY7RDeUiaEUw/0?wx_fmt=png)
上图可见,Doris 已经被写入了历史全量数据。
### 增量测试
在 Mysql 中执行新增语句:
```sql
INSERT INTO `score` VALUES (9, 3, 'english', 100);
```
Doris 成功被追加:
![image-20211218135545742](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibvE4qyQ9ttf2kNZ3raEgabvh442HfiaIfm2l5dhdFmWoGiaHMlvcQmocw/0?wx_fmt=png)
### 变动测试
在 Mysql 中执行新增语句:
```sql
update score set score = 100 where cid = 1
```
Doris 成功被修改:
![image-20211218135949764](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ib3liaIvXQcCSboO4IoeJhtTRa38ukNogtFzwg31mNEFwRcJ1wGNIhQkQ/0?wx_fmt=png)
## 扩展 Flink-CDC
`flink-sql-connector-mysql-cdc-2.1.0.jar` 等 CDC 依赖加入到 Dlink 的 `plugins` 下即可。
\ No newline at end of file
## 扩展其他版本的 Flink
Flink 的版本取决于 lib 下的 dlink-client-1.13.jar。当前版本默认为 Flink 1.13.3 API。向其他版本的集群提交任务可能存在问题,已实现 1.11、1.12、1.13, 1.14,切换版本时只需要将对应依赖在lib下进行替换,然后重启即可。
切换版本时需要同时更新 plugins 下的 Flink 依赖。
\ No newline at end of file
## 前言
最近有很多小伙伴问,dlink 如何连接 Hive 进行数据开发?
关于 dlink 连接 Hive 的步骤同 Flink 的 `sql-client ` ,只不过它没有默认加载的配置文件。下文将详细讲述对 Hive 操作的全过程。
## 准备工作
由于搭建 Hive 的开发环境会涉及到重多组件和插件,那其版本对应问题也是至关重要,它能帮我们避免很多不必要的问题,当然小版本号之间具备一定的兼容性。
我们先来梳理下本教程的各个组件版本:
| 组件 | 版本 |
| :----: | :----: |
| Dlink | 0.3.2 |
| Flink | 1.12.4 |
| Hadoop | 2.7.7 |
| Hive | 2.3.6 |
| Mysql | 8.0.15 |
再来梳理下本教程的各个插件版本:
| 所属组件 | 插件 | 版本 |
| :-----------: | :------------------------: | :-------------------: |
| Dlink | dlink-client | 1.12 |
| Dlink & Flink | flink-sql-connector-hive | 2.3.6_2.11-1.12.3 |
| Dlink & Flink | flink-shaded-hadoop-3-uber | 3.1.1.7.2.8.0-224-9.0 |
## 部署扩展
部署扩展的工作非常简单(前提是 Dlink 部署完成并成功连接 Flink 集群,相关部署步骤请查看《Dlink实时计算平台——部署篇》),只需要把 `flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar``flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar` 两个插件分别加入到 Dlink 的 plugins 目录与 Flink 的 lib 目录下即可,然后重启二者。当然,还需要放置 `hive-site.xml`,位置自定义,Dlink 可以访问到即可。
## 创建 Hive Catalog
已知,Hive 已经新建了一个数据库实例 `hdb` ,创建了一张表 `htest`,列为 `name``age`,存储位置默认为 `hdfs:///usr/local/hadoop/hive-2.3.9/warehouse/hdb.db` 。(此处为何 2.3.9 呢,因为 `flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar` 只支持到最高版本 2.3.6,小编先装了个 2.3.9 后装了个 2.3.6,尴尬 > _ < ~)
```sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'hdb',
'hive-conf-dir' = '/usr/local/dlink/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
select * from htest
```
在 Dlink 编辑器中输入以上 sql ,创建 Hive Catalog,并查询一张表。
其中,`hive-conf-dir` 需要指定 `hive-site.xml` 的路径,其他同 Flink 官方解释。
执行查询后(记得选中执行配置的预览结果),可以从查询结果中查看到 htest 表中只有一条数据。(这是正确的,因为小编太懒了,只随手模拟了一条数据)
此时可以使用 FlinkSQL 愉快地操作 Hive 的数据了。
## 使用 Hive Dialect
很熟悉 Hive 的语法以及需要对 Hive 执行其自身特性的语句怎么办?
同 Flink 官方解释一样,只需要使用 `SET table.sql-dialect=hive` 来启用方言即可。注意有两种方言 `default``hive` ,它们的使用可以随意切换哦~
```sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'hdb',
'hive-conf-dir' = '/usr/local/dlink/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
-- set hive dialect
SET table.sql-dialect=hive;
-- alter table location
alter table htest set location 'hdfs:///usr/htest';
-- set default dialect
SET table.sql-dialect=default;
select * from htest;
```
上述 sql 中添加了 Hive Dialect 的使用,FlinkSQL 本身不支持 `alter table .. set location ..` 的语法,使用 Hive Dialect 则可以实现语法的切换。本 sql 内容对 htest 表进行存储位置的改变,将其更改为一个新的路径,然后再执行查询。
由上图可见,被更改过 location 的 htest 此时查询没有数据,是正确的。
然后将 location 更改为之前的路径,再执行查询,则可见原来的那条数据,如下图所示。
## 总结
由上所知,Dlink 以更加友好的交互方式展现了 Flink 集成 Hive 的部分功能,当然其他更多的 Hive 功能需要您自己在使用的过程中去体验与挖掘。
目前,Dlink 支持 Flink 绝大多数特性与功能,集成与拓展方式与 Flink 官方文档描述一致,只需要在 Dlink 的 plugins 目录下添加依赖即可。
\ No newline at end of file
## 扩展 UDF
将 Flink 集群上已扩展好的 UDF 直接放入 Dlink 的 lib 或者 plugins 下,然后重启即可。定制 UDF 过程同 Flink 官方一样。
## 敬请期待
\ No newline at end of file
### 从安装包开始
Dlink 可以部署在 Flink 及 Hadoop 集群之外的任意节点,只要网络通就可;部署在集群内部也可。
```
config/ -- 配置文件
|- application.yml
extends/ -- 扩展
|- dlink-client-1.11.jar
|- dlink-client-1.12.jar
|- dlink-client-1.14.jar
html/ -- 前端编译产物,用于Nginx
jar/ -- dlink application模式提交sql用到的jar
lib/ -- 内部组件
|- dlink-client-1.13.jar -- 必需
|- dlink-connector-jdbc.jar
|- dlink-function.jar
|- dlink-metadata-clickhouse.jar
|- dlink-metadata-mysql.jar
|- dlink-metadata-oracle.jar
|- dlink-metadata-postgresql.jar
plugins/
|- flink-connector-jdbc_2.11-1.13.3.jar
|- flink-csv-1.13.3.jar
|- flink-dist_2.11-1.13.3.jar
|- flink-json-1.13.3.jar
|- flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
|- flink-shaded-zookeeper-3.4.14.jar
|- flink-table-blink_2.11-1.13.3.jar
|- flink-table_2.11-1.13.3.jar
|- mysql-connector-java-8.0.21.jar
sql/
|- dlink.sql --Mysql初始化脚本
auto.sh --启动停止脚本
dlink-admin.jar --程序包
```
解压后结构如上所示,修改配置文件内容。lib 文件夹下存放 dlink 自身的扩展文件,plugins 文件夹下存放 flink 及 hadoop 的官方扩展文件。其中 plugins 中的所有 jar 需要根据版本号自行下载并添加,才能体验完整功能,当然也可以放自己修改的 Flink 源码编译包。当然,如果您硬要使用 FLINK_HOME 的话,可以在 `auto.sh` 文件中 `SETTING` 变量添加`$FLINK_HOME/lib` 。extends 文件夹只作为扩展插件的备份管理,不会被 dlink 加载。
在Mysql数据库中创建数据库并执行初始化脚本。
执行以下命令管理应用。
```shell
sh auto.sh start
sh auto.sh stop
sh auto.sh restart
sh auto.sh status
```
前端 Nginx 部署:
将 html 文件夹上传至 nginx 的 html 文件夹下,修改 nginx 配置文件并重启。如果 Nginx 和 Dlink 在同一节点,则可以指定静态资源的绝对路径到 Dlink 的 html 目录。
```shell
server {
listen 9999;
server_name localhost;
# gzip config
gzip on;
gzip_min_length 1k;
gzip_comp_level 9;
gzip_types text/plain application/javascript application/x-javascript text/css application/xml text/javascript application/x-httpd-php image/jpeg image/gif image/png;
gzip_vary on;
gzip_disable "MSIE [1-6]\.";
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
try_files $uri $uri/ /index.html;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
location ^~ /api {
proxy_pass http://127.0.0.1:8888;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Real-IP $remote_addr;
}
}
```
1. server.listen 填写前端访问端口,例如 9999
2. proxy_pass 填写后端地址如 http://127.0.0.1:8888
3. 将 html 文件夹下打包好的前端资源上传到 nginx 的 html 文件夹中
4. 重载配置文件:nginx -s reload
5. 直接访问前端端口,例如 9999
### 从源码编译
#### 项目目录
```java
dlink -- 父项目
|-dlink-admin -- 管理中心
|-dlink-app -- Application Jar
|-dlink-assembly -- 打包配置
|-dlink-client -- Client 中心
| |-dlink-client-1.11 -- Client-1.11 实现
| |-dlink-client-1.12 -- Client-1.12 实现
| |-dlink-client-1.13 -- Client-1.13 实现
| |-dlink-client-1.14 -- Client-1.14 实现
|-dlink-common -- 通用中心
|-dlink-connectors -- Connectors 中心
| |-dlink-connector-jdbc -- Jdbc 扩展
|-dlink-core -- 执行中心
|-dlink-doc -- 文档
| |-bin -- 启动脚本
| |-bug -- bug 反馈
| |-config -- 配置文件
| |-doc -- 使用文档
| |-sql -- sql脚本
|-dlink-executor -- 执行中心
|-dlink-extends -- 扩展中心
|-dlink-function -- 函数中心
|-dlink-gateway -- Flink 网关中心
|-dlink-metadata -- 元数据中心
| |-dlink-metadata-base -- 元数据基础组件
| |-dlink-metadata-clickhouse -- 元数据- clickhouse 实现
| |-dlink-metadata-mysql -- 元数据- mysql 实现
| |-dlink-metadata-oracle -- 元数据- oracle 实现
| |-dlink-metadata-postgresql -- 元数据- postgresql 实现
|-dlink-web -- React 前端
|-docs -- 官网 md
```
#### 编译打包
以下环境版本实测编译成功:
| 环境 | 版本 |
| :-----: | :-------: |
| npm | 7.19.0 |
| node.js | 14.17.0 |
| jdk | 1.8.0_201 |
| maven | 3.6.0 |
| lombok | 1.18.16 |
| mysql | 5.7+ |
```shell
mvn clean install -Dmaven.test.skip=true
```
#### 登录
超级管理员:admin/admin;
新增用户:默认密码 123456。
#### 集群中心
注册 Flink 集群地址时,格式为 host:port ,用英文逗号分隔。即添加 Flink 集群的 JobManager 的 RestApi 地址。当 HA 模式时,地址间用英文逗号分隔,例如:192.168.123.101:8081,192.168.123.102:8081,192.168.123.103:8081。新增和修改的等待时间较长,是因为需要检测最新的 JobManager 地址。心跳检测为手动触发,会更新集群状态与 JobManager 地址。
#### Studio
1. 在左侧目录区域创建文件夹或任务。
2. 在中间编辑区编写 FlinkSQL 。
3. 在右侧配置作业配置和执行参数。
4. Fragment 开启后,可以使用增强的 sql 片段语法:
```sql
sf:=select * from;tb:=student;
${sf} ${tb}
##效果等同于
select * from student
```
5. 内置 sql 增强语法-表值聚合:
```sql
CREATE AGGTABLE aggdemo AS
SELECT myField,value,rank
FROM MyTable
GROUP BY myField
AGG BY TOP2(value) as (value,rank);
```
6. MaxRowNum 为批流(Session模式下)执行Select时预览查询结果的最大集合长度,默认 100,最大 9999。
7. SavePoint策略支持最近一次、最早一次、指定一次三种策略。
8. Flink 共享会话共享 Catalog ,会话的使用需要在左侧会话选项卡手动创建并维护。
9. 连接器为 Catalog 里的表信息,清空按钮会销毁当前会话。
10. Local 模式主要用于语法校验、血缘分析、执行图预览等功能,当然也可执行任务,但目前版本建议请使用远程集群来执行任务。
11. 执行 SQL 时,如果您选中了部分 SQL,则会执行选中的内容,否则执行全部内容。
12. 小火箭的提交功能是异步提交当前任务已保存的 FlinkSQL 及配置到集群。由于适用于快速提交稳定的任务,所以无法提交草稿,且无法预览数据。
13. 执行信息或者历史中那个很长很长的就是集群上的 JobId 或者 APPID,任务历史可以查看执行过的任务的数据回放。
14. 草稿是无法被异步远程提交的,只能同步执行,且无法保存。
15. Studio 的布局可以随意拖动,但由于是实时计算,联动较多,请温柔些。
16. 同步执行时可以自由指定任务名,异步提交默认为作业名。
17. 支持 set 语法设置 Flink 的执行配置,其优先级大于右侧的配置。
18. 支持远程集群查看、SavePoint 及停止任务。
19. 支持自定义及上下文的 sql 函数或片段的自动补全,通过函数文档维护。
20. 支持 Flink 所有官方的连接器及插件的扩展,但需注意版本号适配。
21. 使用 IDEA 进行源码调试时,需要在 admin 及 core 下修改相应 pom 依赖的引入来完成功能的加载。
22. 支持基于 StreamGraph 的可执行 FlinkSql (Insert into)的血缘分析,无论你的 sql 有多复杂或者多 view。
23. Dlink 目前提交方式支持 Standalone 、Yarn Session、Yarn PerJob、Yarn Application,K8S 后续支持。
24. Dlink 目前对于 Flink 多版本的支持只能一个 Dlink 实例支持一个 Flink 版本,未来将开源同时支持多版本的能力。
25. 使用 Yarn PerJob、Yarn Application 需要配置集群配置,且其自动注册的集群实例需要手动点击回收。
26. 其他内容后续更新。。。
## 运行截图
> 登录页
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/login.png)
> 首页
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/welcome.png)
> Studio SQL 开发提示与补全
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/sqldev.png)
> Studio 语法和逻辑检查
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/sqlcheck.png)
> Studio 批流SELECT预览
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/selectpreview.png)
> Studio 异常反馈
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/sqlerror.png)
> Studio 进程监控
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/process.png)
> Studio 执行历史
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/history.png)
> Studio 数据回放
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/datashow.png)
> Studio SavePoint 管理
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/savepoint.png)
> Studio 血缘分析
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/ca.png)
> Studio 函数浏览
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/function.png)
> Studio 共享会话
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/session.png)
> 集群管理
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/cluster.png)
> 集群配置管理
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/clusterconfiguration.png)
> 数据源管理
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/db.png)
This diff is collapsed.
# 1
\ No newline at end of file
This diff is collapsed.
## Yarn-Session 实践
### 注册 Session 集群
![image-20211128225423360](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6iaAIees3BsUONXbIocdRUI0WWVSzibPpltibBbMmWgfWJ0AklUlPF9Ugw/0?wx_fmt=png)
进入集群中心进行远程集群的注册。点击新建按钮配置远程集群的参数。图中示例配置了一个 Flink on Yarn 的高可用集群,其中 JobManager HA 地址需要填写集群中所有可能被作为 JobManager 的 RestAPI 地址,多个地址间使用英文逗号分隔。表单提交时可能需要较长时间的等待,因为 dlink 正在努力的计算当前活跃的 JobManager 地址。
保存成功后,页面将展示出当前的 JobManager 地址以及被注册集群的版本号,状态为正常时表示可用。
注意:只有具备 JobManager 实例的 Flink 集群才可以被成功注册到 dlink 中。( Yarn-Per-Job 和 Yarn-Application 也具有 JobManager,当然也可以手动注册,但无法提交任务)
如状态异常时,请检查被注册的 Flink 集群地址是否能正常访问,默认端口号为8081,可能更改配置后发生了变化,查看位置为 Flink Web 的 JobManager 的 Configuration 中的 rest 相关属性。
### 执行 Hello World
万物都具有 Hello World 的第一步,当然 dlink 也是具有的。我们选取了基于 datagen 的流查询作为第一行 Flink Sql。具体如下:
```sql
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
select order_number,price,order_time from Orders
```
该例子使用到了 datagen,需要在 dlink 的 plugins 目录下添加 flink-table.jar。
点击 Flink Sql Studio 进入开发页面:
![image-20211128230416447](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6RuS3kibG0jJnDjoUicX7sN63UN6j7Osg4lWh5SelOw0hp4Vj6icFAuDkA/0?wx_fmt=png)
在中央的编辑器中编辑 Flink Sql。
右边作业配置:
1. 执行模式:选中 yarn-session;
2. Flink 集群:选中上文注册的测试集群;
3. SavePoint 策略:选中禁用;
4. 按需进行其他配置。
右边执行配置:
1. 预览结果:启用;
2. 远程执行:启用。
点击快捷操作栏的三角号按钮同步执行该 FlinkSQL 任务。
### 预览数据
![image-20211128231230335](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6leQx4mfqsdwVftlEUXSFWnEzOTJGrCGHUKo98SpIn11WkZquEwwjpg/0?wx_fmt=png)
切换到历史选项卡点击刷新可以查看提交进度。切换到结果选项卡,等待片刻点击获取最新数据即可预览 SELECT。
### 停止任务
![image-20211128231523703](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6koYZDQqzsawPOCTP64ycdUZlib5oJA9vo9fpX43DNTmibY60ojZv44zQ/0?wx_fmt=png)
切换到进程选项卡,选则对应的集群实例,查询当前任务,可执行停止操作。
## Yarn-Per-Job 实践
### 注册集群配置
进入集群中心——集群配置,注册配置。
![image-20211128231914983](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6Vcw98k4yfgR2dSo3BUhxdtpRdd8A7NLyXkZhFibhiciarp9DTY415UehQ/0?wx_fmt=png)
1. Hadoop 配置文件路径:指定配置文件路径(末尾无/),需要包含以下文件:core-site.xml,hdfs-site.xml,yarn-site.xml;
2. Flink 配置 lib 路径:指定 lib 的 hdfs 路径(末尾无/),需要包含 Flink 运行时的所有依赖,即 flink 的 lib 目录下的所有 jar;
3. Flink 配置文件路径:指定配置文件 flink-conf.yaml 的具体路径(末尾无/);
4. 按需配置其他参数(重写效果);
5. 配置基本信息(标识、名称等);
6. 点击测试或者保存。
### 执行升级版 Hello World
之前的 hello world 是个 SELECT 任务,改良下变为 INSERT 任务:
```sql
CREATE TABLE Orders (
order_number INT,
price DECIMAL(32,2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_number.kind' = 'sequence',
'fields.order_number.start' = '1',
'fields.order_number.end' = '1000'
);
CREATE TABLE pt (
ordertotal INT,
numtotal INT
) WITH (
'connector' = 'print'
);
insert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders
```
![image-20211128232734409](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6HU0ics1Er4CibiaRc8jndmc3rHX9J3ArSp4QF2qyARR46hPzOh0kDbYYQ/0?wx_fmt=png)
编写 Flink SQL;
作业配置:
1. 执行模式:选中 yarn-per-job ;
2. Flink 集群配置:选中刚刚注册的配置;
3. SavePoint 策略:选中最近一次。
快捷操作栏:
1. 点击保存按钮保存当前所有配置;
2. 点击小火箭异步提交作业。
![image-20211128233423276](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6OsQDhibZQbh1nXrx9d0OBiaiaibnIMnLcSfT3MNxhY9CEyAxrA9NLczohA/0?wx_fmt=png)
注意,执行历史需要手动刷新。
### 自动注册集群
点击集群中心——集群实例,即可发现自动注册的 Per-Job 集群。
![image-20211128234056101](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6zCpJ7Knv1WYHTUxHt8iagpNNjRAPsPiaqBoD2xkLupcM7Op1Q48tb6ibQ/0?wx_fmt=png)
### 查看 Flink Web UI
提交成功后,点击历史的蓝色地址即可快速打开 Flink Web UI地址。
![image-20211128233735071](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6AQq5qVMAkwUM8WiaLnTrzDnSlLicTJZjWxowdW9dKUibNp33nrnHpL2Ng/0?wx_fmt=png)
### 从 Savepoint 处停止
在进程选项卡中选择自动注册的 Per-Job 集群,查看任务并 SavePoint-Cancel。
![image-20211128234321656](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6jDvzsoHias24jAY6ElCFj6lFCX1jjdGmoGGa5vI5xgNSEUzyEiaZEZNA/0?wx_fmt=png)
在右侧保存点选项卡可以查看该任务的所有 SavePoint 记录。
![image-20211128234619397](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6TGv1Gun2rB35Upv8hnsoeHzeDdFT4Ryag8icHz7BrzhE4YELiaMk7KYw/0?wx_fmt=png)
### 从 SavePoint 处启动
再次点击小火箭提交任务。
![image-20211128235430989](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6WTchGqUeBiaNY5hT6tjQ611UaQA3jawOS4uQHrN65icia3d4qLAfZMEibg/0?wx_fmt=png)
查看对应 Flink Web UI,从 Stdout 输出中证实 SavePoint 恢复成功。
## Yarn-Application 实践
### 注册集群配置
使用之前注册的集群配置即可。
### 上传 dlink-app.jar
第一次使用时,需要将 dlink-app.jar 上传到 hdfs 指定目录,目录可修改如下:
![image-20211128235914006](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6loq9rmu2tlkDvgSoD6WSlNBrniabV7MibNtSQrA2wKnCjKOzUGGiawW4g/0?wx_fmt=png)
50070 端口 浏览文件系统如下:
![image-20211129000047789](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X68kEAFYLBQpGcamP5djEaj9LiaLqlQCxVIXrbdFbgCb4Ct25HTAHCRIw/0?wx_fmt=png)
### 执行升级版 Hello World
作业配置:
1. 执行模式:选中 yarn-application ;
快捷操作栏:
1. 点击保存按钮保存当前所有配置;
2. 点击小火箭异步提交作业。
### 其他同 Per-Job
其他操作同 yarn-per-job ,本文不再做描述。
### 提交 User Jar
作业中心—— Jar 管理,注册 User Jar 配置。
![image-20211129000804888](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6lGkXGSXSP9JyWTxc5rrh8zD9y5XYR9HkKxRRDicbUQicSFhaAAR0Ulxw/0?wx_fmt=png)
右边作业配置的可执行 Jar 选择刚刚注册的 Jar 配置,保存后点击小火箭提交作业。
![image-20211129000933320](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6qyajpuR1OPlsFynwibdSRX3ECcRGGJPmutqyaibJbFS8HYCYic0rswuiaw/0?wx_fmt=png)
由于提交了个批作业,Yarn 可以发现已经执行完成并销毁集群了。
![image-20211129001241101](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6UJbUIgHTSJHN29zicKuf761ERGnZHibMMURhuFpL2Iiah9LSiceTXIrAyg/0?wx_fmt=png)
## 敬请期待
\ No newline at end of file
## FlinkSQL 编辑器自动补全函数
Dlink-0.3.2 版本上线了一个非常实用的功能——自动补全。
我们在使用 IDEA 等工具时,提示方法并补全、生成的功能大大提升了开发效率。而 Dlink 的目标便是让 FlinkSQL 更加丝滑,所以其提供了自定义的自动补全功能。对比传统的使用 `Java` 字符串来编写 FlinkSQL 的方式,Dlink 的优势是巨大。
在文档中心,我们可以根据自己的需要扩展相应的自动补全规则,如 `UDF``Connector With` 等 FlinkSQL 片段,甚至可以扩展任意可以想象到内容,如注释、模板、配置、算法等。
具体新增规则的示例请看下文描述。
## set 语法来设置执行环境参数
对于一个 FlinkSQL 的任务来说,除了 sql 口径,其任务配置也十分重要。所以 Dlink-0.3.2 版本中提供了 `sql-client``set` 语法,可以通过 `set` 关键字来指定任务的执行配置(如 “ `set table.exec.resource.default-parallelism=2;` ” ),其优先级要高于 Dlink 自身的任务配置(面板右侧)。
那么长的参数一般人谁记得住?等等,别忘了 Dlink 的新功能自动补全~
配置实现输入 `parallelism` 子字符串来自动补全 `table.exec.resource.default-parallelism=`
在文档中心中添加一个规则,名称为 `parallelism`,填充值为 `table.exec.resource.default-parallelism=`,其他内容随意。
保存之后,来到编辑器输入 `par` .
选中要补全的规则后,编辑器中自动补全了 `table.exec.resource.default-parallelism=`
至此,有些小伙伴发现,是不是可以直接定义 `pl2` 来自动生成 `set table.exec.resource.default-parallelism=2;`
当然可以的。
还有小伙伴问,可不可以定义 `pl` 生成 `set table.exec.resource.default-parallelism=;` 后,光标自动定位到 `=``;` 之间?
这个也可以的,只需要定义 `pl` 填充值为 `set table.exec.resource.default-parallelism=${1:};` ,即可实现。
所以说,只要能想象到的都可以定义,这样的 Dlink 你爱了吗?
嘘,还有点小 bug 后续修复呢。如果有什么建议及问题请及时指出哦。
\ No newline at end of file
## 扩展 Connector
将 Flink 集群上已扩展好的 Connector 直接放入 Dlink 的 lib 或者 plugins 下,然后重启即可。定制 Connector 过程同 Flink 官方一样。
## 扩展 Metadata
遵循SPI。
\ No newline at end of file
## 敬请期待
\ No newline at end of file
## 敬请期待
\ No newline at end of file
## 背景
Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。
目前 Doris 的生态正在建设中,本文将分享如何基于 Dlink 实现 Mysql 变动数据通过 Flink 实时入库 Doris。
## 准备
老规矩,列清各组件版本:
| 组件 | 版本 |
| :-------------: | :----------: |
| Flink | 1.13.3 |
| Flink-mysql-cdc | 2.1.0 |
| Doris | 0.15.1-rc09 |
| doris-flink | 1.0-SNAPSHOT |
| Mysql | 8.0.13 |
| Dlink | 0.4.0 |
需要注意的是,本文的 Doris 是基于 OracleJDK1.8 和 Scala 2.11 通过源码进行编译打包的,所以所有组件的 scala 均为 2.11,此处和 Doris 社区略有不同。
## 部署
本文的 Flink 和 Doris 的部署不做描述,详情请查阅官网。
[Doris]: https://doris.apache.org/master/zh-CN/extending-doris/flink-doris-connector.html#%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95 "Doris"
本文在 Dlink 部署成功的基础上进行,如需查看具体部署步骤,请阅读《flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink》。
Dlink 的 plugins 下添加 `doris-flink-1.0-SNAPSHOT.jar``flink-sql-connector-mysql-cdc-2.1.0.jar` 。重启 Dlink。
```java
plugins/ -- Flink 相关扩展
|- doris-flink-1.0-SNAPSHOT.jar
|- flink-csv-1.13.3.jar
|- flink-dist_2.11-1.13.3.jar
|- flink-format-changelog-json-2.1.0.jar
|- flink-json-1.13.3.jar
|- flink-shaded-zookeeper-3.4.14.jar
|- flink-sql-connector-mysql-cdc-2.1.0.jar
|- flink-table_2.11-1.13.3.jar
|- flink-table-blink_2.11-1.13.3.jar
```
当然,如果您想直接使用 FLINK_HOME 的话,可以在 `auto.sh` 文件中 `SETTING` 变量添加`$FLINK_HOME/lib`
## 数据表
### 学生表 (student)
```sql
-- Mysql
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`sid` int(11) NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `student` VALUES (1, '小红');
INSERT INTO `student` VALUES (2, '小黑');
INSERT INTO `student` VALUES (3, '小黄');
```
### 成绩表(score)
```sql
-- Mysql
DROP TABLE IF EXISTS `score`;
CREATE TABLE `score` (
`cid` int(11) NOT NULL,
`sid` int(11) NULL DEFAULT NULL,
`cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`score` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`cid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `score` VALUES (1, 1, 'chinese', 90);
INSERT INTO `score` VALUES (2, 1, 'math', 95);
INSERT INTO `score` VALUES (3, 1, 'english', 93);
INSERT INTO `score` VALUES (4, 2, 'chinese', 92);
INSERT INTO `score` VALUES (5, 2, 'math', 75);
INSERT INTO `score` VALUES (6, 2, 'english', 80);
INSERT INTO `score` VALUES (7, 3, 'chinese', 100);
INSERT INTO `score` VALUES (8, 3, 'math', 60);
```
### 学生成绩宽表(scoreinfo)
```sql
-- Doris
CREATE TABLE scoreinfo
(
cid INT,
sid INT,
name VARCHAR(32),
cls VARCHAR(32),
score INT
)
UNIQUE KEY(cid)
DISTRIBUTED BY HASH(cid) BUCKETS 10
PROPERTIES("replication_num" = "1");
```
## FlinkSQL
```sql
CREATE TABLE student (
sid INT,
name STRING,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'student');
CREATE TABLE score (
cid INT,
sid INT,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'score');
CREATE TABLE scoreinfo (
cid INT,
sid INT,
name STRING,
cls STRING,
score INT,
PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030' ,
'table.identifier' = 'test.scoreinfo',
'username' = 'root',
'password'=''
);
insert into scoreinfo
select
a.cid,a.sid,b.name,a.cls,a.score
from score a
left join student b on a.sid = b.sid
```
## 调试
### 在 Dlink 中提交
本示例采用了 yarn-session 的方式进行提交。
![image-20211218134246511](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibG3QjNqVbMk7L41wHykKnkV0YxDCVSYj68HlFWylpYckkXicgnTDU7uQ/0?wx_fmt=png)
### FlinkWebUI
![image-20211218134439699](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibn1jXbvKznaF8Tm4AxxvYYDI0fEtXbGm0XUeXhGp44KMlPdoOzjvtHQ/0?wx_fmt=png)
上图可见,流任务已经成功被 Dlink 提交的远程集群了。
### Doris 查询
![image-20211218135404787](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9iblKTY6o9fWZxFDQYC19wKVFRGDuUBgNOZxm14sWjyr8tUY7RDeUiaEUw/0?wx_fmt=png)
上图可见,Doris 已经被写入了历史全量数据。
### 增量测试
在 Mysql 中执行新增语句:
```sql
INSERT INTO `score` VALUES (9, 3, 'english', 100);
```
Doris 成功被追加:
![image-20211218135545742](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ibvE4qyQ9ttf2kNZ3raEgabvh442HfiaIfm2l5dhdFmWoGiaHMlvcQmocw/0?wx_fmt=png)
### 变动测试
在 Mysql 中执行新增语句:
```sql
update score set score = 100 where cid = 1
```
Doris 成功被修改:
![image-20211218135949764](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTp0RDozicic8lrYycianYsUN9ib3liaIvXQcCSboO4IoeJhtTRa38ukNogtFzwg31mNEFwRcJ1wGNIhQkQ/0?wx_fmt=png)
## 扩展 Flink-CDC
`flink-sql-connector-mysql-cdc-2.1.0.jar` 等 CDC 依赖加入到 Dlink 的 `plugins` 下即可。
\ No newline at end of file
## 扩展其他版本的 Flink
Flink 的版本取决于 lib 下的 dlink-client-1.13.jar。当前版本默认为 Flink 1.13.3 API。向其他版本的集群提交任务可能存在问题,已实现 1.11、1.12、1.13, 1.14,切换版本时只需要将对应依赖在lib下进行替换,然后重启即可。
切换版本时需要同时更新 plugins 下的 Flink 依赖。
\ No newline at end of file
## 前言
最近有很多小伙伴问,dlink 如何连接 Hive 进行数据开发?
关于 dlink 连接 Hive 的步骤同 Flink 的 `sql-client ` ,只不过它没有默认加载的配置文件。下文将详细讲述对 Hive 操作的全过程。
## 准备工作
由于搭建 Hive 的开发环境会涉及到重多组件和插件,那其版本对应问题也是至关重要,它能帮我们避免很多不必要的问题,当然小版本号之间具备一定的兼容性。
我们先来梳理下本教程的各个组件版本:
| 组件 | 版本 |
| :----: | :----: |
| Dlink | 0.3.2 |
| Flink | 1.12.4 |
| Hadoop | 2.7.7 |
| Hive | 2.3.6 |
| Mysql | 8.0.15 |
再来梳理下本教程的各个插件版本:
| 所属组件 | 插件 | 版本 |
| :-----------: | :------------------------: | :-------------------: |
| Dlink | dlink-client | 1.12 |
| Dlink & Flink | flink-sql-connector-hive | 2.3.6_2.11-1.12.3 |
| Dlink & Flink | flink-shaded-hadoop-3-uber | 3.1.1.7.2.8.0-224-9.0 |
## 部署扩展
部署扩展的工作非常简单(前提是 Dlink 部署完成并成功连接 Flink 集群,相关部署步骤请查看《Dlink实时计算平台——部署篇》),只需要把 `flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar``flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar` 两个插件分别加入到 Dlink 的 plugins 目录与 Flink 的 lib 目录下即可,然后重启二者。当然,还需要放置 `hive-site.xml`,位置自定义,Dlink 可以访问到即可。
## 创建 Hive Catalog
已知,Hive 已经新建了一个数据库实例 `hdb` ,创建了一张表 `htest`,列为 `name``age`,存储位置默认为 `hdfs:///usr/local/hadoop/hive-2.3.9/warehouse/hdb.db` 。(此处为何 2.3.9 呢,因为 `flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar` 只支持到最高版本 2.3.6,小编先装了个 2.3.9 后装了个 2.3.6,尴尬 > _ < ~)
```sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'hdb',
'hive-conf-dir' = '/usr/local/dlink/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
select * from htest
```
在 Dlink 编辑器中输入以上 sql ,创建 Hive Catalog,并查询一张表。
其中,`hive-conf-dir` 需要指定 `hive-site.xml` 的路径,其他同 Flink 官方解释。
执行查询后(记得选中执行配置的预览结果),可以从查询结果中查看到 htest 表中只有一条数据。(这是正确的,因为小编太懒了,只随手模拟了一条数据)
此时可以使用 FlinkSQL 愉快地操作 Hive 的数据了。
## 使用 Hive Dialect
很熟悉 Hive 的语法以及需要对 Hive 执行其自身特性的语句怎么办?
同 Flink 官方解释一样,只需要使用 `SET table.sql-dialect=hive` 来启用方言即可。注意有两种方言 `default``hive` ,它们的使用可以随意切换哦~
```sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'hdb',
'hive-conf-dir' = '/usr/local/dlink/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
-- set hive dialect
SET table.sql-dialect=hive;
-- alter table location
alter table htest set location 'hdfs:///usr/htest';
-- set default dialect
SET table.sql-dialect=default;
select * from htest;
```
上述 sql 中添加了 Hive Dialect 的使用,FlinkSQL 本身不支持 `alter table .. set location ..` 的语法,使用 Hive Dialect 则可以实现语法的切换。本 sql 内容对 htest 表进行存储位置的改变,将其更改为一个新的路径,然后再执行查询。
由上图可见,被更改过 location 的 htest 此时查询没有数据,是正确的。
然后将 location 更改为之前的路径,再执行查询,则可见原来的那条数据,如下图所示。
## 总结
由上所知,Dlink 以更加友好的交互方式展现了 Flink 集成 Hive 的部分功能,当然其他更多的 Hive 功能需要您自己在使用的过程中去体验与挖掘。
目前,Dlink 支持 Flink 绝大多数特性与功能,集成与拓展方式与 Flink 官方文档描述一致,只需要在 Dlink 的 plugins 目录下添加依赖即可。
\ No newline at end of file
## 扩展 UDF
将 Flink 集群上已扩展好的 UDF 直接放入 Dlink 的 lib 或者 plugins 下,然后重启即可。定制 UDF 过程同 Flink 官方一样。
## 敬请期待
\ No newline at end of file
### 从安装包开始
Dlink 可以部署在 Flink 及 Hadoop 集群之外的任意节点,只要网络通就可;部署在集群内部也可。
```
config/ -- 配置文件
|- application.yml
extends/ -- 扩展
|- dlink-client-1.11.jar
|- dlink-client-1.12.jar
|- dlink-client-1.14.jar
html/ -- 前端编译产物,用于Nginx
jar/ -- dlink application模式提交sql用到的jar
lib/ -- 内部组件
|- dlink-client-1.13.jar -- 必需
|- dlink-connector-jdbc.jar
|- dlink-function.jar
|- dlink-metadata-clickhouse.jar
|- dlink-metadata-mysql.jar
|- dlink-metadata-oracle.jar
|- dlink-metadata-postgresql.jar
plugins/
|- flink-connector-jdbc_2.11-1.13.3.jar
|- flink-csv-1.13.3.jar
|- flink-dist_2.11-1.13.3.jar
|- flink-json-1.13.3.jar
|- flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
|- flink-shaded-zookeeper-3.4.14.jar
|- flink-table-blink_2.11-1.13.3.jar
|- flink-table_2.11-1.13.3.jar
|- mysql-connector-java-8.0.21.jar
sql/
|- dlink.sql --Mysql初始化脚本
auto.sh --启动停止脚本
dlink-admin.jar --程序包
```
解压后结构如上所示,修改配置文件内容。lib 文件夹下存放 dlink 自身的扩展文件,plugins 文件夹下存放 flink 及 hadoop 的官方扩展文件。其中 plugins 中的所有 jar 需要根据版本号自行下载并添加,才能体验完整功能,当然也可以放自己修改的 Flink 源码编译包。当然,如果您硬要使用 FLINK_HOME 的话,可以在 `auto.sh` 文件中 `SETTING` 变量添加`$FLINK_HOME/lib` 。extends 文件夹只作为扩展插件的备份管理,不会被 dlink 加载。
在Mysql数据库中创建数据库并执行初始化脚本。
执行以下命令管理应用。
```shell
sh auto.sh start
sh auto.sh stop
sh auto.sh restart
sh auto.sh status
```
前端 Nginx 部署:
将 html 文件夹上传至 nginx 的 html 文件夹下,修改 nginx 配置文件并重启。如果 Nginx 和 Dlink 在同一节点,则可以指定静态资源的绝对路径到 Dlink 的 html 目录。
```shell
server {
listen 9999;
server_name localhost;
# gzip config
gzip on;
gzip_min_length 1k;
gzip_comp_level 9;
gzip_types text/plain application/javascript application/x-javascript text/css application/xml text/javascript application/x-httpd-php image/jpeg image/gif image/png;
gzip_vary on;
gzip_disable "MSIE [1-6]\.";
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
try_files $uri $uri/ /index.html;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
location ^~ /api {
proxy_pass http://127.0.0.1:8888;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Real-IP $remote_addr;
}
}
```
1. server.listen 填写前端访问端口,例如 9999
2. proxy_pass 填写后端地址如 http://127.0.0.1:8888
3. 将 html 文件夹下打包好的前端资源上传到 nginx 的 html 文件夹中
4. 重载配置文件:nginx -s reload
5. 直接访问前端端口,例如 9999
### 从源码编译
#### 项目目录
```java
dlink -- 父项目
|-dlink-admin -- 管理中心
|-dlink-app -- Application Jar
|-dlink-assembly -- 打包配置
|-dlink-client -- Client 中心
| |-dlink-client-1.11 -- Client-1.11 实现
| |-dlink-client-1.12 -- Client-1.12 实现
| |-dlink-client-1.13 -- Client-1.13 实现
| |-dlink-client-1.14 -- Client-1.14 实现
|-dlink-common -- 通用中心
|-dlink-connectors -- Connectors 中心
| |-dlink-connector-jdbc -- Jdbc 扩展
|-dlink-core -- 执行中心
|-dlink-doc -- 文档
| |-bin -- 启动脚本
| |-bug -- bug 反馈
| |-config -- 配置文件
| |-doc -- 使用文档
| |-sql -- sql脚本
|-dlink-executor -- 执行中心
|-dlink-extends -- 扩展中心
|-dlink-function -- 函数中心
|-dlink-gateway -- Flink 网关中心
|-dlink-metadata -- 元数据中心
| |-dlink-metadata-base -- 元数据基础组件
| |-dlink-metadata-clickhouse -- 元数据- clickhouse 实现
| |-dlink-metadata-mysql -- 元数据- mysql 实现
| |-dlink-metadata-oracle -- 元数据- oracle 实现
| |-dlink-metadata-postgresql -- 元数据- postgresql 实现
|-dlink-web -- React 前端
|-docs -- 官网 md
```
#### 编译打包
以下环境版本实测编译成功:
| 环境 | 版本 |
| :-----: | :-------: |
| npm | 7.19.0 |
| node.js | 14.17.0 |
| jdk | 1.8.0_201 |
| maven | 3.6.0 |
| lombok | 1.18.16 |
| mysql | 5.7+ |
```shell
mvn clean install -Dmaven.test.skip=true
```
#### 登录
超级管理员:admin/admin;
新增用户:默认密码 123456。
#### 集群中心
注册 Flink 集群地址时,格式为 host:port ,用英文逗号分隔。即添加 Flink 集群的 JobManager 的 RestApi 地址。当 HA 模式时,地址间用英文逗号分隔,例如:192.168.123.101:8081,192.168.123.102:8081,192.168.123.103:8081。新增和修改的等待时间较长,是因为需要检测最新的 JobManager 地址。心跳检测为手动触发,会更新集群状态与 JobManager 地址。
#### Studio
1. 在左侧目录区域创建文件夹或任务。
2. 在中间编辑区编写 FlinkSQL 。
3. 在右侧配置作业配置和执行参数。
4. Fragment 开启后,可以使用增强的 sql 片段语法:
```sql
sf:=select * from;tb:=student;
${sf} ${tb}
##效果等同于
select * from student
```
5. 内置 sql 增强语法-表值聚合:
```sql
CREATE AGGTABLE aggdemo AS
SELECT myField,value,rank
FROM MyTable
GROUP BY myField
AGG BY TOP2(value) as (value,rank);
```
6. MaxRowNum 为批流(Session模式下)执行Select时预览查询结果的最大集合长度,默认 100,最大 9999。
7. SavePoint策略支持最近一次、最早一次、指定一次三种策略。
8. Flink 共享会话共享 Catalog ,会话的使用需要在左侧会话选项卡手动创建并维护。
9. 连接器为 Catalog 里的表信息,清空按钮会销毁当前会话。
10. Local 模式主要用于语法校验、血缘分析、执行图预览等功能,当然也可执行任务,但目前版本建议请使用远程集群来执行任务。
11. 执行 SQL 时,如果您选中了部分 SQL,则会执行选中的内容,否则执行全部内容。
12. 小火箭的提交功能是异步提交当前任务已保存的 FlinkSQL 及配置到集群。由于适用于快速提交稳定的任务,所以无法提交草稿,且无法预览数据。
13. 执行信息或者历史中那个很长很长的就是集群上的 JobId 或者 APPID,任务历史可以查看执行过的任务的数据回放。
14. 草稿是无法被异步远程提交的,只能同步执行,且无法保存。
15. Studio 的布局可以随意拖动,但由于是实时计算,联动较多,请温柔些。
16. 同步执行时可以自由指定任务名,异步提交默认为作业名。
17. 支持 set 语法设置 Flink 的执行配置,其优先级大于右侧的配置。
18. 支持远程集群查看、SavePoint 及停止任务。
19. 支持自定义及上下文的 sql 函数或片段的自动补全,通过函数文档维护。
20. 支持 Flink 所有官方的连接器及插件的扩展,但需注意版本号适配。
21. 使用 IDEA 进行源码调试时,需要在 admin 及 core 下修改相应 pom 依赖的引入来完成功能的加载。
22. 支持基于 StreamGraph 的可执行 FlinkSql (Insert into)的血缘分析,无论你的 sql 有多复杂或者多 view。
23. Dlink 目前提交方式支持 Standalone 、Yarn Session、Yarn PerJob、Yarn Application,K8S 后续支持。
24. Dlink 目前对于 Flink 多版本的支持只能一个 Dlink 实例支持一个 Flink 版本,未来将开源同时支持多版本的能力。
25. 使用 Yarn PerJob、Yarn Application 需要配置集群配置,且其自动注册的集群实例需要手动点击回收。
26. 其他内容后续更新。。。
## 运行截图
> 登录页
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/login.png)
> 首页
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/welcome.png)
> Studio SQL 开发提示与补全
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/sqldev.png)
> Studio 语法和逻辑检查
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/sqlcheck.png)
> Studio 批流SELECT预览
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/selectpreview.png)
> Studio 异常反馈
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/sqlerror.png)
> Studio 进程监控
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/process.png)
> Studio 执行历史
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/history.png)
> Studio 数据回放
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/datashow.png)
> Studio SavePoint 管理
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/savepoint.png)
> Studio 血缘分析
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/ca.png)
> Studio 函数浏览
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/function.png)
> Studio 共享会话
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/session.png)
> 集群管理
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/cluster.png)
> 集群配置管理
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/clusterconfiguration.png)
> 数据源管理
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/db.png)
This diff is collapsed.
......@@ -6,6 +6,7 @@
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1" />
<meta name="description" content="Description">
<meta name="viewport" content="width=device-width, initial-scale=1.0, minimum-scale=1.0">
<link rel="icon" type="image/x-ico" href="./_media/dlink.png" />
<link rel="stylesheet" href="//cdn.jsdelivr.net/npm/docsify@4/lib/themes/vue.css">
</head>
<body>
......@@ -22,7 +23,6 @@
onlyCover: true,
relativePath: true,
homepage: '_coverpage.md',
// loadSidebar: true,
autoHeader: true,
formatUpdated: '{MM}/{DD} {HH}:{mm}',
ext: '.md',
......@@ -31,5 +31,6 @@
</script>
<!-- Docsify v4 -->
<script src="//cdn.jsdelivr.net/npm/docsify@4"></script>
<script src="//cdn.jsdelivr.net/npm/docsify-copy-code/dist/docsify-copy-code.min.js"></script>
</body>
</html>
## 简介
Dlink 为 Apache Flink 而生,让 Flink SQL 更加丝滑。它是一个交互式的 FlinkSQL Studio,可以在线开发、补全、校验 、执行、预览 FlinkSQL,支持 Flink 官方所有语法及其增强语法,并且可以同时对多 Flink 集群实例进行提交、停止、SavePoint 等运维操作,如同您的 IntelliJ IDEA For Flink SQL。
需要注意的是,Dlink 它更专注于 FlinkSQL 的应用,而不是 DataStream。在开发过程中您不会看到任何一句 java、scala 或者 python。所以,它的目标是基于 100% FlinkSQL 来实现批流一体的实时计算平台。
值得惊喜的是,Dlink 的实现基于最新 Flink 源码二次开发,而交互更加贴近 Flink 的功能与体验,并且紧随官方社区发展。即站在巨人肩膀上开发与创新,Dlink 在未来批流一体的发展趋势下潜力无限。
## 原理
![](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/040/Dlink_principle.png)
## 功能
注意:只表明核心功能,不包括细节。
| 域 | 概要 | 进展 |
| :-----------------: | :----------------------------------: | :----: |
| 基本管理 | 作业及 Savepoint 管理 | 已实现 |
| | FlinkSQL 及执行配置管理 | 已实现 |
| | Flink 集群及配置管理 | 已实现 |
| | 数据源管理 | 已实现 |
| | 文档管理 | 已实现 |
| | 系统配置 | 已实现 |
| | 用户管理 | 已实现 |
| FlinkSQL 语法增强 | SQL 片段语法 | 已实现 |
| | AGGTABLE 语法 | 已实现 |
| | 语句集 | 已实现 |
| | 支持 sql-client 所有语法 | 已实现 |
| | 支持 Flink 所有 Configuration | 已实现 |
| FlinkSQL 交互式开发 | 会话的 connector 查询 | 已实现 |
| | 语法检查 | 已实现 |
| | 执行图校验 | 已实现 |
| | 上下文元数据自动提示与补全 | 已实现 |
| | 自定义代码补全 | 已实现 |
| | 关键字高亮 | 已实现 |
| | 结构折叠与缩略图 | 已实现 |
| | 支持选中提交 | 已实现 |
| | 布局拖拽 | 已实现 |
| | SELECT、SHOW等语法数据预览 | 已实现 |
| | JobPlanGraph 预览 | 已实现 |
| Flink 任务运维 | standalone SQL提交 | 已实现 |
| | yarn session SQL提交 | 已实现 |
| | yarn per-job SQL提交 | 已实现 |
| | yarn application SQL提交 | 已实现 |
| | yarn application Jar提交 | 已实现 |
| | 作业 Cancel | 已实现 |
| | 作业 SavePoint Cancel,Stop,Trigger | 已实现 |
| | 作业从 SavePoint 恢复 | 已实现 |
| 元数据功能 | Flink Catelog 浏览(connector) | 已实现 |
| | 外部数据源元数据浏览 | 已实现 |
| 共享会话 | 支持 Session 集群 Catelog 持久与浏览 | 已实现 |
| | 支持共享与私有会话 | 已实现 |
| Flink 集群中心 | 手动注册 Session 集群 | 已实现 |
| | 自动注册 per-job 和 application 集群 | 已实现 |
## 近期计划
1.支持同时托管多版本的Flink实例
2.支持K8S多种运行模式
3.支持多种任务调度框架接口
4.支持UDF动态加载
5.完善Studio交互功能
## 技术栈
[Apache Flink](https://github.com/apache/flink)
[Mybatis Plus](https://github.com/baomidou/mybatis-plus)
[ant-design-pro](https://github.com/ant-design/ant-design-pro)
[Monaco Editor](https://github.com/Microsoft/monaco-editor)
[SpringBoot]()
## 致谢
感谢 [JetBrains](https://www.jetbrains.com/?from=dlink) 提供的免费开源 License 赞助
[![JetBrains](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/main/jetbrains.svg)](https://www.jetbrains.com/?from=dlink)
## 交流与贡献
欢迎您加入社区交流分享,也欢迎您为社区贡献自己的力量。
在此有意向参与代码及文档贡献或积极测试者可以私信我加入 Dlink Contributors 群聊进一步了解。
dlink将正式开始社区积极的发展阶段,社区的主旨是开放、交流、创新、共赢,dlink的核心理念是创新,即不受思想约束地勇于尝试。dlink本就是一个创新型的解决方案,而不是模仿已有产品的思路按部就班,一味模仿对于社区及所有人的发展意义并不大,积极创新才可能独树一帜,并为大家带来更大的利益。无论您是否已经建成了自己的FlinkSQL平台,相信它一定会在创新的方向上为您带来些许启发。
在此非常感谢大家的支持~
QQ社区群:**543709668**,申请备注 “ Dlink ”,不写不批
微信社区群(推荐,大佬云集):添加微信号 wenmo_ai 邀请进群,申请备注 “ Dlink ”,不写不批
公众号(最新消息获取建议关注):[DataLink数据中台](https://mmbiz.qpic.cn/mmbiz_jpg/dyicwnSlTFTp6w4PuJruFaLV6uShCJDkzqwtnbQJrQ90yKDuuIC8tyMU5DK69XZibibx7EPPBRQ3ic81se5UQYs21g/0?wx_fmt=jpeg)
163 邮箱:aiwenmo@163.com
QQ 邮箱:809097465@qq.com
\ No newline at end of file
This diff is collapsed.
## Yarn-Session 实践
### 注册 Session 集群
![image-20211128225423360](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6iaAIees3BsUONXbIocdRUI0WWVSzibPpltibBbMmWgfWJ0AklUlPF9Ugw/0?wx_fmt=png)
进入集群中心进行远程集群的注册。点击新建按钮配置远程集群的参数。图中示例配置了一个 Flink on Yarn 的高可用集群,其中 JobManager HA 地址需要填写集群中所有可能被作为 JobManager 的 RestAPI 地址,多个地址间使用英文逗号分隔。表单提交时可能需要较长时间的等待,因为 dlink 正在努力的计算当前活跃的 JobManager 地址。
保存成功后,页面将展示出当前的 JobManager 地址以及被注册集群的版本号,状态为正常时表示可用。
注意:只有具备 JobManager 实例的 Flink 集群才可以被成功注册到 dlink 中。( Yarn-Per-Job 和 Yarn-Application 也具有 JobManager,当然也可以手动注册,但无法提交任务)
如状态异常时,请检查被注册的 Flink 集群地址是否能正常访问,默认端口号为8081,可能更改配置后发生了变化,查看位置为 Flink Web 的 JobManager 的 Configuration 中的 rest 相关属性。
### 执行 Hello World
万物都具有 Hello World 的第一步,当然 dlink 也是具有的。我们选取了基于 datagen 的流查询作为第一行 Flink Sql。具体如下:
```sql
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
select order_number,price,order_time from Orders
```
该例子使用到了 datagen,需要在 dlink 的 plugins 目录下添加 flink-table.jar。
点击 Flink Sql Studio 进入开发页面:
![image-20211128230416447](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6RuS3kibG0jJnDjoUicX7sN63UN6j7Osg4lWh5SelOw0hp4Vj6icFAuDkA/0?wx_fmt=png)
在中央的编辑器中编辑 Flink Sql。
右边作业配置:
1. 执行模式:选中 yarn-session;
2. Flink 集群:选中上文注册的测试集群;
3. SavePoint 策略:选中禁用;
4. 按需进行其他配置。
右边执行配置:
1. 预览结果:启用;
2. 远程执行:启用。
点击快捷操作栏的三角号按钮同步执行该 FlinkSQL 任务。
### 预览数据
![image-20211128231230335](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6leQx4mfqsdwVftlEUXSFWnEzOTJGrCGHUKo98SpIn11WkZquEwwjpg/0?wx_fmt=png)
切换到历史选项卡点击刷新可以查看提交进度。切换到结果选项卡,等待片刻点击获取最新数据即可预览 SELECT。
### 停止任务
![image-20211128231523703](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6koYZDQqzsawPOCTP64ycdUZlib5oJA9vo9fpX43DNTmibY60ojZv44zQ/0?wx_fmt=png)
切换到进程选项卡,选则对应的集群实例,查询当前任务,可执行停止操作。
## Yarn-Per-Job 实践
### 注册集群配置
进入集群中心——集群配置,注册配置。
![image-20211128231914983](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6Vcw98k4yfgR2dSo3BUhxdtpRdd8A7NLyXkZhFibhiciarp9DTY415UehQ/0?wx_fmt=png)
1. Hadoop 配置文件路径:指定配置文件路径(末尾无/),需要包含以下文件:core-site.xml,hdfs-site.xml,yarn-site.xml;
2. Flink 配置 lib 路径:指定 lib 的 hdfs 路径(末尾无/),需要包含 Flink 运行时的所有依赖,即 flink 的 lib 目录下的所有 jar;
3. Flink 配置文件路径:指定配置文件 flink-conf.yaml 的具体路径(末尾无/);
4. 按需配置其他参数(重写效果);
5. 配置基本信息(标识、名称等);
6. 点击测试或者保存。
### 执行升级版 Hello World
之前的 hello world 是个 SELECT 任务,改良下变为 INSERT 任务:
```sql
CREATE TABLE Orders (
order_number INT,
price DECIMAL(32,2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_number.kind' = 'sequence',
'fields.order_number.start' = '1',
'fields.order_number.end' = '1000'
);
CREATE TABLE pt (
ordertotal INT,
numtotal INT
) WITH (
'connector' = 'print'
);
insert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders
```
![image-20211128232734409](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6HU0ics1Er4CibiaRc8jndmc3rHX9J3ArSp4QF2qyARR46hPzOh0kDbYYQ/0?wx_fmt=png)
编写 Flink SQL;
作业配置:
1. 执行模式:选中 yarn-per-job ;
2. Flink 集群配置:选中刚刚注册的配置;
3. SavePoint 策略:选中最近一次。
快捷操作栏:
1. 点击保存按钮保存当前所有配置;
2. 点击小火箭异步提交作业。
![image-20211128233423276](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6OsQDhibZQbh1nXrx9d0OBiaiaibnIMnLcSfT3MNxhY9CEyAxrA9NLczohA/0?wx_fmt=png)
注意,执行历史需要手动刷新。
### 自动注册集群
点击集群中心——集群实例,即可发现自动注册的 Per-Job 集群。
![image-20211128234056101](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6zCpJ7Knv1WYHTUxHt8iagpNNjRAPsPiaqBoD2xkLupcM7Op1Q48tb6ibQ/0?wx_fmt=png)
### 查看 Flink Web UI
提交成功后,点击历史的蓝色地址即可快速打开 Flink Web UI地址。
![image-20211128233735071](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6AQq5qVMAkwUM8WiaLnTrzDnSlLicTJZjWxowdW9dKUibNp33nrnHpL2Ng/0?wx_fmt=png)
### 从 Savepoint 处停止
在进程选项卡中选择自动注册的 Per-Job 集群,查看任务并 SavePoint-Cancel。
![image-20211128234321656](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6jDvzsoHias24jAY6ElCFj6lFCX1jjdGmoGGa5vI5xgNSEUzyEiaZEZNA/0?wx_fmt=png)
在右侧保存点选项卡可以查看该任务的所有 SavePoint 记录。
![image-20211128234619397](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6TGv1Gun2rB35Upv8hnsoeHzeDdFT4Ryag8icHz7BrzhE4YELiaMk7KYw/0?wx_fmt=png)
### 从 SavePoint 处启动
再次点击小火箭提交任务。
![image-20211128235430989](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6WTchGqUeBiaNY5hT6tjQ611UaQA3jawOS4uQHrN65icia3d4qLAfZMEibg/0?wx_fmt=png)
查看对应 Flink Web UI,从 Stdout 输出中证实 SavePoint 恢复成功。
## Yarn-Application 实践
### 注册集群配置
使用之前注册的集群配置即可。
### 上传 dlink-app.jar
第一次使用时,需要将 dlink-app.jar 上传到 hdfs 指定目录,目录可修改如下:
![image-20211128235914006](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6loq9rmu2tlkDvgSoD6WSlNBrniabV7MibNtSQrA2wKnCjKOzUGGiawW4g/0?wx_fmt=png)
50070 端口 浏览文件系统如下:
![image-20211129000047789](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X68kEAFYLBQpGcamP5djEaj9LiaLqlQCxVIXrbdFbgCb4Ct25HTAHCRIw/0?wx_fmt=png)
### 执行升级版 Hello World
作业配置:
1. 执行模式:选中 yarn-application ;
快捷操作栏:
1. 点击保存按钮保存当前所有配置;
2. 点击小火箭异步提交作业。
### 其他同 Per-Job
其他操作同 yarn-per-job ,本文不再做描述。
### 提交 User Jar
作业中心—— Jar 管理,注册 User Jar 配置。
![image-20211129000804888](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6lGkXGSXSP9JyWTxc5rrh8zD9y5XYR9HkKxRRDicbUQicSFhaAAR0Ulxw/0?wx_fmt=png)
右边作业配置的可执行 Jar 选择刚刚注册的 Jar 配置,保存后点击小火箭提交作业。
![image-20211129000933320](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6qyajpuR1OPlsFynwibdSRX3ECcRGGJPmutqyaibJbFS8HYCYic0rswuiaw/0?wx_fmt=png)
由于提交了个批作业,Yarn 可以发现已经执行完成并销毁集群了。
![image-20211129001241101](https://mmbiz.qpic.cn/mmbiz_png/dyicwnSlTFTqSas1xKRbibnZdGY1iagC3X6UJbUIgHTSJHN29zicKuf761ERGnZHibMMURhuFpL2Iiah9LSiceTXIrAyg/0?wx_fmt=png)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment