Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
f56c7c52
Commit
f56c7c52
authored
Dec 18, 2021
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
upload doc
parent
ab4f2429
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
288 additions
and
0 deletions
+288
-0
Dlink在Flink-mysql-cdc到Doris的实践.md
dlink-doc/doc/Dlink在Flink-mysql-cdc到Doris的实践.md
+202
-0
Top2WithRetract.java
...ion/src/main/java/com/dlink/ud/udtaf/Top2WithRetract.java
+86
-0
No files found.
dlink-doc/doc/Dlink在Flink-mysql-cdc到Doris的实践.md
0 → 100644
View file @
f56c7c52
# Dlink 在 Flink-mysql-cdc 到 Doris 的实践
## 背景
Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。
目前 Doris 的生态正在建设中,本文将分享如何基于 Dlink 实现 Mysql 变动数据通过 Flink 实时入库 Doris。
## 准备
老规矩,列清各组件版本:
| 组件 | 版本 |
| :-------------: | :----------: |
| Flink | 1.13.3 |
| Flink-mysql-cdc | 2.1.0 |
| Doris | 0.15.1-rc09 |
| doris-flink | 1.0-SNAPSHOT |
| Mysql | 8.0.13 |
| Dlink | 0.4.0 |
需要注意的是,本文的 Doris 是基于 OracleJDK1.8 和 Scala 2.11 通过源码进行编译打包的,所以所有组件的 scala 均为 2.11,此处和 Doris 社区略有不同。
## 部署
本文的 Flink 和 Doris 的部署不做描述,详情请查阅官网。
[
Doris
]:
https://doris.apache.org/master/zh-CN/extending-doris/flink-doris-connector.html#%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95
"Doris"
本文在 Dlink 部署成功的基础上进行,如需查看具体部署步骤,请阅读《flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink》。
Dlink 的 plugins 下添加
`doris-flink-1.0-SNAPSHOT.jar`
和
`flink-sql-connector-mysql-cdc-2.1.0.jar`
。重启 Dlink。
```
java
plugins
/
--
Flink
相关扩展
|-
doris
-
flink
-
1.0
-
SNAPSHOT
.
jar
|-
flink
-
csv
-
1.13
.
3
.
jar
|-
flink
-
dist_2
.
11
-
1.13
.
3
.
jar
|-
flink
-
format
-
changelog
-
json
-
2.1
.
0
.
jar
|-
flink
-
json
-
1.13
.
3
.
jar
|-
flink
-
shaded
-
zookeeper
-
3.4
.
14
.
jar
|-
flink
-
sql
-
connector
-
mysql
-
cdc
-
2.1
.
0
.
jar
|-
flink
-
table_2
.
11
-
1.13
.
3
.
jar
|-
flink
-
table
-
blink_2
.
11
-
1.13
.
3
.
jar
```
当然,如果您想直接使用 FLINK_HOME 的话,可以在
`auto.sh`
文件中
`SETTING`
变量添加
`$FLINK_HOME/lib`
。
## 数据表
### 学生表 (student)
```
sql
-- Mysql
DROP
TABLE
IF
EXISTS
`student`
;
CREATE
TABLE
`student`
(
`sid`
int
(
11
)
NOT
NULL
,
`name`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
,
PRIMARY
KEY
(
`sid`
)
USING
BTREE
)
ENGINE
=
InnoDB
CHARACTER
SET
=
utf8
COLLATE
=
utf8_general_ci
ROW_FORMAT
=
Dynamic
;
INSERT
INTO
`student`
VALUES
(
1
,
'小红'
);
INSERT
INTO
`student`
VALUES
(
2
,
'小黑'
);
INSERT
INTO
`student`
VALUES
(
3
,
'小黄'
);
```
### 成绩表(score)
```
sql
-- Mysql
DROP
TABLE
IF
EXISTS
`score`
;
CREATE
TABLE
`score`
(
`cid`
int
(
11
)
NOT
NULL
,
`sid`
int
(
11
)
NULL
DEFAULT
NULL
,
`cls`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
,
`score`
int
(
11
)
NULL
DEFAULT
NULL
,
PRIMARY
KEY
(
`cid`
)
USING
BTREE
)
ENGINE
=
InnoDB
CHARACTER
SET
=
utf8
COLLATE
=
utf8_general_ci
ROW_FORMAT
=
Dynamic
;
INSERT
INTO
`score`
VALUES
(
1
,
1
,
'chinese'
,
90
);
INSERT
INTO
`score`
VALUES
(
2
,
1
,
'math'
,
95
);
INSERT
INTO
`score`
VALUES
(
3
,
1
,
'english'
,
93
);
INSERT
INTO
`score`
VALUES
(
4
,
2
,
'chinese'
,
92
);
INSERT
INTO
`score`
VALUES
(
5
,
2
,
'math'
,
75
);
INSERT
INTO
`score`
VALUES
(
6
,
2
,
'english'
,
80
);
INSERT
INTO
`score`
VALUES
(
7
,
3
,
'chinese'
,
100
);
INSERT
INTO
`score`
VALUES
(
8
,
3
,
'math'
,
60
);
```
### 学生成绩宽表(scoreinfo)
```
sql
-- Doris
CREATE
TABLE
scoreinfo
(
cid
INT
,
sid
INT
,
name
VARCHAR
(
32
),
cls
VARCHAR
(
32
),
score
INT
)
UNIQUE
KEY
(
cid
)
DISTRIBUTED
BY
HASH
(
cid
)
BUCKETS
10
PROPERTIES
(
"replication_num"
=
"1"
);
```
## FlinkSQL
```
sql
CREATE
TABLE
student
(
sid
INT
,
name
STRING
,
PRIMARY
KEY
(
sid
)
NOT
ENFORCED
)
WITH
(
'connector'
=
'mysql-cdc'
,
'hostname'
=
'127.0.0.1'
,
'port'
=
'3306'
,
'username'
=
'test'
,
'password'
=
'123456'
,
'database-name'
=
'test'
,
'table-name'
=
'student'
);
CREATE
TABLE
score
(
cid
INT
,
sid
INT
,
cls
STRING
,
score
INT
,
PRIMARY
KEY
(
cid
)
NOT
ENFORCED
)
WITH
(
'connector'
=
'mysql-cdc'
,
'hostname'
=
'127.0.0.1'
,
'port'
=
'3306'
,
'username'
=
'test'
,
'password'
=
'123456'
,
'database-name'
=
'test'
,
'table-name'
=
'score'
);
CREATE
TABLE
scoreinfo
(
cid
INT
,
sid
INT
,
name
STRING
,
cls
STRING
,
score
INT
,
PRIMARY
KEY
(
cid
)
NOT
ENFORCED
)
WITH
(
'connector'
=
'doris'
,
'fenodes'
=
'127.0.0.1:8030'
,
'table.identifier'
=
'test.scoreinfo'
,
'username'
=
'root'
,
'password'
=
''
);
insert
into
scoreinfo
select
a
.
cid
,
a
.
sid
,
b
.
name
,
a
.
cls
,
a
.
score
from
score
a
left
join
student
b
on
a
.
sid
=
b
.
sid
```
## 调试
### 在 Dlink 中提交
本示例采用了 yarn-session 的方式进行提交。

### FlinkWebUI

上图可见,流任务已经成功被 Dlink 提交的远程集群了。
### Doris 查询

上图可见,Doris 已经被写入了历史全量数据。
### 增量测试
在 Mysql 中执行新增语句:
```
sql
INSERT
INTO
`score`
VALUES
(
9
,
3
,
'english'
,
100
);
```
Doris 成功被追加:

### 变动测试
在 Mysql 中执行新增语句:
```
sql
update
score
set
score
=
100
where
cid
=
1
```
Doris 成功被修改:

dlink-function/src/main/java/com/dlink/ud/udtaf/Top2WithRetract.java
0 → 100644
View file @
f56c7c52
package
com
.
dlink
.
ud
.
udtaf
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.table.functions.TableAggregateFunction
;
import
org.apache.flink.util.Collector
;
/**
* Top2WithRetract
*
* @author wenmo
* @since 2021/12/17 18:55
*/
public
class
Top2WithRetract
extends
TableAggregateFunction
<
Tuple2
<
Integer
,
Integer
>,
Top2WithRetract
.
Top2WithRetractAccumulator
>
{
public
static
class
Top2WithRetractAccumulator
{
public
Integer
first
;
public
Integer
second
;
public
Integer
oldFirst
;
public
Integer
oldSecond
;
}
@Override
public
Top2WithRetractAccumulator
createAccumulator
()
{
Top2WithRetractAccumulator
acc
=
new
Top2WithRetractAccumulator
();
acc
.
first
=
Integer
.
MIN_VALUE
;
acc
.
second
=
Integer
.
MIN_VALUE
;
acc
.
oldFirst
=
Integer
.
MIN_VALUE
;
acc
.
oldSecond
=
Integer
.
MIN_VALUE
;
return
acc
;
}
public
void
accumulate
(
Top2WithRetractAccumulator
acc
,
Integer
v
)
{
if
(
v
>
acc
.
first
)
{
acc
.
second
=
acc
.
first
;
acc
.
first
=
v
;
}
else
if
(
v
>
acc
.
second
)
{
acc
.
second
=
v
;
}
}
public
void
retract
(
Top2WithRetractAccumulator
acc
,
Integer
v
){
if
(
v
==
acc
.
first
)
{
acc
.
oldFirst
=
acc
.
first
;
acc
.
oldSecond
=
acc
.
second
;
acc
.
first
=
acc
.
second
;
acc
.
second
=
Integer
.
MIN_VALUE
;
}
else
if
(
v
==
acc
.
second
)
{
acc
.
oldSecond
=
acc
.
second
;
acc
.
second
=
Integer
.
MIN_VALUE
;
}
}
public
void
emitValue
(
Top2WithRetractAccumulator
acc
,
Collector
<
Tuple2
<
Integer
,
Integer
>>
out
)
{
// emit the value and rank
if
(
acc
.
first
!=
Integer
.
MIN_VALUE
)
{
out
.
collect
(
Tuple2
.
of
(
acc
.
first
,
1
));
}
if
(
acc
.
second
!=
Integer
.
MIN_VALUE
)
{
out
.
collect
(
Tuple2
.
of
(
acc
.
second
,
2
));
}
}
public
void
emitUpdateWithRetract
(
Top2WithRetractAccumulator
acc
,
RetractableCollector
<
Tuple2
<
Integer
,
Integer
>>
out
)
{
if
(!
acc
.
first
.
equals
(
acc
.
oldFirst
))
{
// if there is an update, retract the old value then emit a new value
if
(
acc
.
oldFirst
!=
Integer
.
MIN_VALUE
)
{
out
.
retract
(
Tuple2
.
of
(
acc
.
oldFirst
,
1
));
}
out
.
collect
(
Tuple2
.
of
(
acc
.
first
,
1
));
acc
.
oldFirst
=
acc
.
first
;
}
if
(!
acc
.
second
.
equals
(
acc
.
oldSecond
))
{
// if there is an update, retract the old value then emit a new value
if
(
acc
.
oldSecond
!=
Integer
.
MIN_VALUE
)
{
out
.
retract
(
Tuple2
.
of
(
acc
.
oldSecond
,
2
));
}
out
.
collect
(
Tuple2
.
of
(
acc
.
second
,
2
));
acc
.
oldSecond
=
acc
.
second
;
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment