Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
f5742858
Commit
f5742858
authored
Dec 12, 2021
by
coderTomato
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/dev' into dev
parents
a975d2cd
af1f18f9
Changes
29
Hide whitespace changes
Inline
Side-by-side
Showing
29 changed files
with
361 additions
and
78 deletions
+361
-78
README.md
README.md
+6
-0
pom.xml
dlink-admin/pom.xml
+0
-15
APIController.java
...min/src/main/java/com/dlink/controller/APIController.java
+24
-2
TaskController.java
...in/src/main/java/com/dlink/controller/TaskController.java
+0
-8
APICancelDTO.java
dlink-admin/src/main/java/com/dlink/dto/APICancelDTO.java
+27
-0
APIExecuteJarDTO.java
...k-admin/src/main/java/com/dlink/dto/APIExecuteJarDTO.java
+32
-0
APISavePointDTO.java
dlink-admin/src/main/java/com/dlink/dto/APISavePointDTO.java
+29
-0
APIService.java
dlink-admin/src/main/java/com/dlink/service/APIService.java
+8
-2
APIServiceImpl.java
.../src/main/java/com/dlink/service/impl/APIServiceImpl.java
+33
-2
StudioServiceImpl.java
...c/main/java/com/dlink/service/impl/StudioServiceImpl.java
+1
-3
openapi_cancel.json
dlink-admin/src/main/resources/json/openapi_cancel.json
+17
-0
openapi_executeJar_application.json
...c/main/resources/json/openapi_executeJar_application.json
+27
-0
openapi_executesql_local.json
...min/src/main/resources/json/openapi_executesql_local.json
+22
-0
openapi_executesql_standalone.json
...rc/main/resources/json/openapi_executesql_standalone.json
+23
-0
openapi_executesql_yarnsession.json
...c/main/resources/json/openapi_executesql_yarnsession.json
+2
-2
openapi_getjobdata.json
dlink-admin/src/main/resources/json/openapi_getjobdata.json
+1
-0
openapi_savepoint.json
dlink-admin/src/main/resources/json/openapi_savepoint.json
+19
-0
openapi_submittask.json
dlink-admin/src/main/resources/json/openapi_submittask.json
+1
-0
SystemConfiguration.java
...on/src/main/java/com/dlink/model/SystemConfiguration.java
+16
-0
Explainer.java
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
+10
-8
JobConfig.java
dlink-core/src/main/java/com/dlink/job/JobConfig.java
+0
-1
JobManager.java
dlink-core/src/main/java/com/dlink/job/JobManager.java
+22
-14
jetbrains.svg
dlink-doc/images/main/jetbrains.svg
+1
-0
SqlUtil.java
dlink-executor/src/main/java/com/dlink/utils/SqlUtil.java
+2
-2
Gateway.java
dlink-gateway/src/main/java/com/dlink/gateway/Gateway.java
+4
-0
YarnGateway.java
...way/src/main/java/com/dlink/gateway/yarn/YarnGateway.java
+11
-14
flinkConfig.tsx
dlink-web/src/pages/Settings/components/flinkConfig.tsx
+15
-1
model.ts
dlink-web/src/pages/Settings/model.ts
+5
-4
Welcome.tsx
dlink-web/src/pages/Welcome.tsx
+3
-0
No files found.
README.md
View file @
f5742858
...
@@ -319,6 +319,12 @@ AGG BY TOP2(value) as (value,rank);
...
@@ -319,6 +319,12 @@ AGG BY TOP2(value) as (value,rank);
[
SpringBoot
](
)
[
SpringBoot
](
)
## 致谢
感谢
[
JetBrains
](
https://www.jetbrains.com/?from=dlink
)
提供的免费开源 License 赞助
[

](https://www.jetbrains.com/?from=dlink)
## 近期计划
## 近期计划
1.
支持同时托管多版本的Flink实例
1.
支持同时托管多版本的Flink实例
...
...
dlink-admin/pom.xml
View file @
f5742858
...
@@ -130,21 +130,6 @@
...
@@ -130,21 +130,6 @@
<groupId>
com.dlink
</groupId>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-gateway
</artifactId>
<artifactId>
dlink-gateway
</artifactId>
</dependency>
</dependency>
<!--<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<scope>provided</scope>
</dependency>-->
</dependencies>
</dependencies>
<build>
<build>
<plugins>
<plugins>
...
...
dlink-admin/src/main/java/com/dlink/controller/APIController.java
View file @
f5742858
package
com
.
dlink
.
controller
;
package
com
.
dlink
.
controller
;
import
com.dlink.common.result.Result
;
import
com.dlink.common.result.Result
;
import
com.dlink.dto.APIExecuteSqlDTO
;
import
com.dlink.dto.*
;
import
com.dlink.dto.APIExplainSqlDTO
;
import
com.dlink.service.APIService
;
import
com.dlink.service.APIService
;
import
com.dlink.service.StudioService
;
import
com.dlink.service.StudioService
;
import
com.dlink.service.TaskService
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.*
;
import
org.springframework.web.bind.annotation.*
;
...
@@ -24,6 +24,13 @@ public class APIController {
...
@@ -24,6 +24,13 @@ public class APIController {
private
APIService
apiService
;
private
APIService
apiService
;
@Autowired
@Autowired
private
StudioService
studioService
;
private
StudioService
studioService
;
@Autowired
private
TaskService
taskService
;
@GetMapping
(
value
=
"/submitTask"
)
public
Result
submitTask
(
@RequestParam
Integer
id
)
{
return
Result
.
succeed
(
taskService
.
submitByTaskId
(
id
),
"执行成功"
);
}
@PostMapping
(
"/executeSql"
)
@PostMapping
(
"/executeSql"
)
public
Result
executeSql
(
@RequestBody
APIExecuteSqlDTO
apiExecuteSqlDTO
)
{
public
Result
executeSql
(
@RequestBody
APIExecuteSqlDTO
apiExecuteSqlDTO
)
{
...
@@ -49,4 +56,19 @@ public class APIController {
...
@@ -49,4 +56,19 @@ public class APIController {
public
Result
getJobData
(
@RequestParam
String
jobId
)
{
public
Result
getJobData
(
@RequestParam
String
jobId
)
{
return
Result
.
succeed
(
studioService
.
getJobData
(
jobId
),
"获取成功"
);
return
Result
.
succeed
(
studioService
.
getJobData
(
jobId
),
"获取成功"
);
}
}
@PostMapping
(
"/cancel"
)
public
Result
cancel
(
@RequestBody
APICancelDTO
apiCancelDTO
)
{
return
Result
.
succeed
(
apiService
.
cancel
(
apiCancelDTO
),
"执行成功"
);
}
@PostMapping
(
"/savepoint"
)
public
Result
savepoint
(
@RequestBody
APISavePointDTO
apiSavePointDTO
)
{
return
Result
.
succeed
(
apiService
.
savepoint
(
apiSavePointDTO
),
"执行成功"
);
}
@PostMapping
(
"/executeJar"
)
public
Result
executeJar
(
@RequestBody
APIExecuteJarDTO
apiExecuteJarDTO
)
{
return
Result
.
succeed
(
apiService
.
executeJar
(
apiExecuteJarDTO
),
"执行成功"
);
}
}
}
dlink-admin/src/main/java/com/dlink/controller/TaskController.java
View file @
f5742858
...
@@ -105,13 +105,5 @@ public class TaskController {
...
@@ -105,13 +105,5 @@ public class TaskController {
Task
task
=
taskService
.
getTaskInfoById
(
id
);
Task
task
=
taskService
.
getTaskInfoById
(
id
);
return
Result
.
succeed
(
task
,
"获取成功"
);
return
Result
.
succeed
(
task
,
"获取成功"
);
}
}
/**
* 提交作业
*/
/*@GetMapping(value = "/submitApplication")
public Result submitApplicationByTaskId(@RequestParam Integer id) {
return taskService.submitApplicationByTaskId(id);
}*/
}
}
dlink-admin/src/main/java/com/dlink/dto/APICancelDTO.java
0 → 100644
View file @
f5742858
package
com
.
dlink
.
dto
;
import
com.dlink.gateway.config.GatewayConfig
;
import
com.dlink.job.JobConfig
;
import
lombok.Getter
;
import
lombok.Setter
;
/**
* APICancelDTO
*
* @author wenmo
* @since 2021/12/12 18:53
*/
@Getter
@Setter
public
class
APICancelDTO
{
private
String
jobId
;
private
String
address
;
private
GatewayConfig
gatewayConfig
;
public
JobConfig
getJobConfig
()
{
JobConfig
config
=
new
JobConfig
();
config
.
setAddress
(
address
);
config
.
setGatewayConfig
(
gatewayConfig
);
return
config
;
}
}
dlink-admin/src/main/java/com/dlink/dto/APIExecuteJarDTO.java
0 → 100644
View file @
f5742858
package
com
.
dlink
.
dto
;
import
com.dlink.gateway.config.GatewayConfig
;
import
com.dlink.gateway.config.SavePointStrategy
;
import
com.dlink.job.JobConfig
;
import
lombok.Getter
;
import
lombok.Setter
;
/**
* APIExecuteJarDTO
*
* @author wenmo
* @since 2021/12/12 19:46
*/
@Getter
@Setter
public
class
APIExecuteJarDTO
{
private
String
type
;
private
String
jobName
;
private
String
savePointPath
;
private
GatewayConfig
gatewayConfig
;
public
JobConfig
getJobConfig
()
{
JobConfig
config
=
new
JobConfig
();
config
.
setType
(
type
);
config
.
setJobName
(
jobName
);
config
.
setSavePointStrategy
(
SavePointStrategy
.
CUSTOM
);
config
.
setSavePointPath
(
savePointPath
);
config
.
setGatewayConfig
(
gatewayConfig
);
return
config
;
}
}
dlink-admin/src/main/java/com/dlink/dto/APISavePointDTO.java
0 → 100644
View file @
f5742858
package
com
.
dlink
.
dto
;
import
com.dlink.gateway.config.GatewayConfig
;
import
com.dlink.job.JobConfig
;
import
lombok.Getter
;
import
lombok.Setter
;
/**
* APISavePointDTO
*
* @author wenmo
* @since 2021/12/12 19:09
*/
@Getter
@Setter
public
class
APISavePointDTO
{
private
String
jobId
;
private
String
savePointType
;
private
String
savePoint
;
private
String
address
;
private
GatewayConfig
gatewayConfig
;
public
JobConfig
getJobConfig
()
{
JobConfig
config
=
new
JobConfig
();
config
.
setAddress
(
address
);
config
.
setGatewayConfig
(
gatewayConfig
);
return
config
;
}
}
dlink-admin/src/main/java/com/dlink/service/APIService.java
View file @
f5742858
package
com
.
dlink
.
service
;
package
com
.
dlink
.
service
;
import
com.dlink.dto.
APIExecuteSqlDTO
;
import
com.dlink.dto.
*
;
import
com.dlink.
dto.APIExplainSqlDTO
;
import
com.dlink.
gateway.result.SavePointResult
;
import
com.dlink.result.APIJobResult
;
import
com.dlink.result.APIJobResult
;
import
com.dlink.result.ExplainResult
;
import
com.dlink.result.ExplainResult
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
...
@@ -21,4 +21,10 @@ public interface APIService {
...
@@ -21,4 +21,10 @@ public interface APIService {
ObjectNode
getJobPlan
(
APIExplainSqlDTO
apiExplainSqlDTO
);
ObjectNode
getJobPlan
(
APIExplainSqlDTO
apiExplainSqlDTO
);
ObjectNode
getStreamGraph
(
APIExplainSqlDTO
apiExplainSqlDTO
);
ObjectNode
getStreamGraph
(
APIExplainSqlDTO
apiExplainSqlDTO
);
boolean
cancel
(
APICancelDTO
apiCancelDTO
);
SavePointResult
savepoint
(
APISavePointDTO
apiSavePointDTO
);
APIJobResult
executeJar
(
APIExecuteJarDTO
apiExecuteJarDTO
);
}
}
dlink-admin/src/main/java/com/dlink/service/impl/APIServiceImpl.java
View file @
f5742858
package
com
.
dlink
.
service
.
impl
;
package
com
.
dlink
.
service
.
impl
;
import
com.dlink.
dto.APIExecuteSqlDTO
;
import
com.dlink.
assertion.Asserts
;
import
com.dlink.dto.
APIExplainSqlDTO
;
import
com.dlink.dto.
*
;
import
com.dlink.gateway.GatewayType
;
import
com.dlink.gateway.GatewayType
;
import
com.dlink.gateway.result.SavePointResult
;
import
com.dlink.job.JobConfig
;
import
com.dlink.job.JobConfig
;
import
com.dlink.job.JobManager
;
import
com.dlink.job.JobManager
;
import
com.dlink.job.JobResult
;
import
com.dlink.job.JobResult
;
...
@@ -15,6 +16,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
...
@@ -15,6 +16,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
java.util.Map
;
/**
/**
* APIServiceImpl
* APIServiceImpl
*
*
...
@@ -68,4 +71,32 @@ public class APIServiceImpl implements APIService {
...
@@ -68,4 +71,32 @@ public class APIServiceImpl implements APIService {
RunTimeUtil
.
recovery
(
jobManager
);
RunTimeUtil
.
recovery
(
jobManager
);
return
streamGraph
;
return
streamGraph
;
}
}
@Override
public
boolean
cancel
(
APICancelDTO
apiCancelDTO
)
{
JobConfig
jobConfig
=
apiCancelDTO
.
getJobConfig
();
JobManager
jobManager
=
JobManager
.
build
(
jobConfig
);
boolean
cancel
=
jobManager
.
cancel
(
apiCancelDTO
.
getJobId
());
RunTimeUtil
.
recovery
(
jobManager
);
return
cancel
;
}
@Override
public
SavePointResult
savepoint
(
APISavePointDTO
apiSavePointDTO
)
{
JobConfig
jobConfig
=
apiSavePointDTO
.
getJobConfig
();
JobManager
jobManager
=
JobManager
.
build
(
jobConfig
);
SavePointResult
savepoint
=
jobManager
.
savepoint
(
apiSavePointDTO
.
getJobId
(),
apiSavePointDTO
.
getSavePointType
(),
apiSavePointDTO
.
getSavePoint
());
RunTimeUtil
.
recovery
(
jobManager
);
return
savepoint
;
}
@Override
public
APIJobResult
executeJar
(
APIExecuteJarDTO
apiExecuteJarDTO
)
{
JobConfig
config
=
apiExecuteJarDTO
.
getJobConfig
();
JobManager
jobManager
=
JobManager
.
build
(
config
);
JobResult
jobResult
=
jobManager
.
executeJar
();
APIJobResult
apiJobResult
=
APIJobResult
.
build
(
jobResult
);
RunTimeUtil
.
recovery
(
jobManager
);
return
apiJobResult
;
}
}
}
dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java
View file @
f5742858
...
@@ -198,7 +198,6 @@ public class StudioServiceImpl implements StudioService {
...
@@ -198,7 +198,6 @@ public class StudioServiceImpl implements StudioService {
Map
<
String
,
Object
>
gatewayConfig
=
clusterConfigurationService
.
getGatewayConfig
(
cluster
.
getClusterConfigurationId
());
Map
<
String
,
Object
>
gatewayConfig
=
clusterConfigurationService
.
getGatewayConfig
(
cluster
.
getClusterConfigurationId
());
jobConfig
.
buildGatewayConfig
(
gatewayConfig
);
jobConfig
.
buildGatewayConfig
(
gatewayConfig
);
}
}
jobConfig
.
setUseRestAPI
(
SystemConfiguration
.
getInstances
().
isUseRestAPI
());
JobManager
jobManager
=
JobManager
.
build
(
jobConfig
);
JobManager
jobManager
=
JobManager
.
build
(
jobConfig
);
return
jobManager
.
cancel
(
jobId
);
return
jobManager
.
cancel
(
jobId
);
}
}
...
@@ -216,10 +215,9 @@ public class StudioServiceImpl implements StudioService {
...
@@ -216,10 +215,9 @@ public class StudioServiceImpl implements StudioService {
jobConfig
.
getGatewayConfig
().
getClusterConfig
().
setAppId
(
cluster
.
getName
());
jobConfig
.
getGatewayConfig
().
getClusterConfig
().
setAppId
(
cluster
.
getName
());
jobConfig
.
setTaskId
(
cluster
.
getTaskId
());
jobConfig
.
setTaskId
(
cluster
.
getTaskId
());
}
}
jobConfig
.
setUseRestAPI
(
SystemConfiguration
.
getInstances
().
isUseRestAPI
());
JobManager
jobManager
=
JobManager
.
build
(
jobConfig
);
JobManager
jobManager
=
JobManager
.
build
(
jobConfig
);
jobManager
.
setUseGateway
(
true
);
jobManager
.
setUseGateway
(
true
);
SavePointResult
savePointResult
=
jobManager
.
savepoint
(
jobId
,
savePointType
);
SavePointResult
savePointResult
=
jobManager
.
savepoint
(
jobId
,
savePointType
,
null
);
if
(
Asserts
.
isNotNull
(
savePointResult
)){
if
(
Asserts
.
isNotNull
(
savePointResult
)){
for
(
JobInfo
item
:
savePointResult
.
getJobInfos
()){
for
(
JobInfo
item
:
savePointResult
.
getJobInfos
()){
if
(
Asserts
.
isEqualsIgnoreCase
(
jobId
,
item
.
getJobId
())){
if
(
Asserts
.
isEqualsIgnoreCase
(
jobId
,
item
.
getJobId
())){
...
...
dlink-admin/src/main/resources/json/openapi_cancel.json
0 → 100644
View file @
f5742858
/*
http
:
//
127.0
.
0.1
:
8888
/openapi/explainSql
*/
{
/*
required-start
*/
"jobId"
:
"195352b0a4518e16699983a13205f059"
,
/*
required-end
*/
/*
custom-start
*/
"address"
:
"127.0.0.1:8081"
,
"gatewayConfig"
:{
"clusterConfig"
:{
"appId"
:
"application_1637739262398_0032"
,
"flinkConfigPath"
:
"/opt/src/flink-1.13.3_conf/conf"
,
"flinkLibPath"
:
"hdfs:///flink13/lib/flinklib"
,
"yarnConfigPath"
:
"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
}
}
/*
custom-start
*/
}
\ No newline at end of file
dlink-admin/src/main/resources/json/openapi_executeJar_application.json
0 → 100644
View file @
f5742858
/*
http
:
//
127.0
.
0.1
:
8888
/openapi/executeJar
*/
{
/*
required-start
*/
"type"
:
"yarn-application"
,
"gatewayConfig"
:{
"clusterConfig"
:{
"flinkConfigPath"
:
"/opt/src/flink-1.13.3_conf/conf"
,
"flinkLibPath"
:
"hdfs:///flink13/lib/flinklib"
,
"yarnConfigPath"
:
"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
},
"appConfig"
:{
"userJarPath"
:
"hdfs:///flink12/jar/currencyAppJar.jar"
,
"userJarParas"
:[
"--id"
,
"2774,2775,2776"
,
" --type"
,
"dwd"
],
"userJarMainAppClass"
:
"com.app.MainApp"
},
"flinkConfig"
:
{
"configuration"
:{
"parallelism.default"
:
1
}
}
},
/*
required-end
*/
/*
custom-start
*/
"jobName"
:
"openapitest"
,
"savePointPath"
:
"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843"
/*
custom-end
*/
}
\ No newline at end of file
dlink-admin/src/main/resources/json/openapi_executesql_local.json
0 → 100644
View file @
f5742858
/*
http
:
//
127.0
.
0.1
:
8888
/openapi/executeSql
*/
{
/*
required-start
*/
"type"
:
"local"
,
"statement"
:
"CREATE TABLE Orders (
\r\n
order_number INT,
\r\n
price DECIMAL(32,2),
\r\n
order_time TIMESTAMP(3)
\r\n
) WITH (
\r\n
'connector' = 'datagen',
\r\n
'rows-per-second' = '1',
\r\n
'fields.order_number.kind' = 'sequence',
\r\n
'fields.order_number.start' = '1',
\r\n
'fields.order_number.end' = '1000'
\r\n
);
\r\n
CREATE TABLE pt (
\r\n
ordertotal INT,
\r\n
numtotal INT
\r\n
) WITH (
\r\n
'connector' = 'print'
\r\n
);
\r\n
insert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders"
,
/*
required-end
*/
/*
default-start
*/
"useResult"
:
false
,
"useStatementSet"
:
false
,
"fragment"
:
false
,
"maxRowNum"
:
100
,
"checkPoint"
:
0
,
"parallelism"
:
1
,
/*
default-start
*/
/*
custom-start
*/
"jobName"
:
"openapitest"
,
"savePointPath"
:
"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843"
,
"configuration"
:{
"table.exec.resource.default-parallelism"
:
2
}
/*
custom-end
*/
}
\ No newline at end of file
dlink-admin/src/main/resources/json/openapi_executesql_standalone.json
0 → 100644
View file @
f5742858
/*
http
:
//
127.0
.
0.1
:
8888
/openapi/executeSql
*/
{
/*
required-start
*/
"type"
:
"standalone"
,
"address"
:
"127.0.0.1:8081"
,
"statement"
:
"CREATE TABLE Orders (
\r\n
order_number INT,
\r\n
price DECIMAL(32,2),
\r\n
order_time TIMESTAMP(3)
\r\n
) WITH (
\r\n
'connector' = 'datagen',
\r\n
'rows-per-second' = '1',
\r\n
'fields.order_number.kind' = 'sequence',
\r\n
'fields.order_number.start' = '1',
\r\n
'fields.order_number.end' = '1000'
\r\n
);
\r\n
CREATE TABLE pt (
\r\n
ordertotal INT,
\r\n
numtotal INT
\r\n
) WITH (
\r\n
'connector' = 'print'
\r\n
);
\r\n
insert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders"
,
/*
required-end
*/
/*
default-start
*/
"useResult"
:
false
,
"useStatementSet"
:
false
,
"fragment"
:
false
,
"maxRowNum"
:
100
,
"checkPoint"
:
0
,
"parallelism"
:
1
,
/*
default-start
*/
/*
custom-start
*/
"jobName"
:
"openapitest"
,
"savePointPath"
:
"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843"
,
"configuration"
:{
"table.exec.resource.default-parallelism"
:
2
}
/*
custom-end
*/
}
\ No newline at end of file
dlink-admin/src/main/resources/json/openapi_executesql_yarnsession.json
View file @
f5742858
/*
http
:
//
127.0
.
0.1
:
8888
/openapi/executeSql
*/
/*
http
:
//
127.0
.
0.1
:
8888
/openapi/executeSql
*/
{
{
/*
required-start
*/
/*
required-start
*/
"type"
:
"yarn-session"
,
//
standalone|local
"type"
:
"yarn-session"
,
"address"
:
"1
0.1.51.24
:8081"
,
"address"
:
"1
27.0.0.1
:8081"
,
"statement"
:
"CREATE TABLE Orders (
\r\n
order_number INT,
\r\n
price DECIMAL(32,2),
\r\n
order_time TIMESTAMP(3)
\r\n
) WITH (
\r\n
'connector' = 'datagen',
\r\n
'rows-per-second' = '1',
\r\n
'fields.order_number.kind' = 'sequence',
\r\n
'fields.order_number.start' = '1',
\r\n
'fields.order_number.end' = '1000'
\r\n
);
\r\n
CREATE TABLE pt (
\r\n
ordertotal INT,
\r\n
numtotal INT
\r\n
) WITH (
\r\n
'connector' = 'print'
\r\n
);
\r\n
insert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders"
,
"statement"
:
"CREATE TABLE Orders (
\r\n
order_number INT,
\r\n
price DECIMAL(32,2),
\r\n
order_time TIMESTAMP(3)
\r\n
) WITH (
\r\n
'connector' = 'datagen',
\r\n
'rows-per-second' = '1',
\r\n
'fields.order_number.kind' = 'sequence',
\r\n
'fields.order_number.start' = '1',
\r\n
'fields.order_number.end' = '1000'
\r\n
);
\r\n
CREATE TABLE pt (
\r\n
ordertotal INT,
\r\n
numtotal INT
\r\n
) WITH (
\r\n
'connector' = 'print'
\r\n
);
\r\n
insert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders"
,
/*
required-end
*/
/*
required-end
*/
/*
default-start
*/
/*
default-start
*/
...
...
dlink-admin/src/main/resources/json/openapi_getjobdata.json
0 → 100644
View file @
f5742858
/*
http
:
//
127.0
.
0.1
:
8888
/openapi/getJobData?jobId=
195352
b
0
a
4518e16699983
a
13205
f
059
*/
dlink-admin/src/main/resources/json/openapi_savepoint.json
0 → 100644
View file @
f5742858
/*
http
:
//
127.0
.
0.1
:
8888
/openapi/explainSql
*/
{
/*
required-start
*/
"jobId"
:
"195352b0a4518e16699983a13205f059"
,
"savePointType"
:
"trigger"
,
//
trigger
|
stop
|
cancel
/*
required-end
*/
/*
custom-start
*/
"savePoint"
:
"195352b0a4518e16699983a13205f059"
,
"address"
:
"127.0.0.1:8081"
,
"gatewayConfig"
:{
"clusterConfig"
:{
"appId"
:
"application_1637739262398_0032"
,
"flinkConfigPath"
:
"/opt/src/flink-1.13.3_conf/conf"
,
"flinkLibPath"
:
"hdfs:///flink13/lib/flinklib"
,
"yarnConfigPath"
:
"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
}
}
/*
custom-start
*/
}
\ No newline at end of file
dlink-admin/src/main/resources/json/openapi_submittask.json
0 → 100644
View file @
f5742858
/*
http
:
//
127.0
.
0.1
:
8888
/openapi/submitTask?id=
1
*/
\ No newline at end of file
dlink-common/src/main/java/com/dlink/model/SystemConfiguration.java
View file @
f5742858
...
@@ -24,6 +24,7 @@ public class SystemConfiguration {
...
@@ -24,6 +24,7 @@ public class SystemConfiguration {
add
(
systemConfiguration
.
sqlSubmitJarParas
);
add
(
systemConfiguration
.
sqlSubmitJarParas
);
add
(
systemConfiguration
.
sqlSubmitJarMainAppClass
);
add
(
systemConfiguration
.
sqlSubmitJarMainAppClass
);
add
(
systemConfiguration
.
useRestAPI
);
add
(
systemConfiguration
.
useRestAPI
);
add
(
systemConfiguration
.
sqlSeparator
);
}};
}};
private
Configuration
sqlSubmitJarPath
=
new
Configuration
(
private
Configuration
sqlSubmitJarPath
=
new
Configuration
(
...
@@ -54,6 +55,13 @@ public class SystemConfiguration {
...
@@ -54,6 +55,13 @@ public class SystemConfiguration {
true
,
true
,
"在运维 Flink 任务时是否使用 RestAPI"
"在运维 Flink 任务时是否使用 RestAPI"
);
);
private
Configuration
sqlSeparator
=
new
Configuration
(
"sqlSeparator"
,
"FlinkSQL语句分割符"
,
ValueType
.
STRING
,
";"
,
"Flink SQL 的语句分割符"
);
public
void
setConfiguration
(
JsonNode
jsonNode
){
public
void
setConfiguration
(
JsonNode
jsonNode
){
for
(
Configuration
item
:
CONFIGURATION_LIST
){
for
(
Configuration
item
:
CONFIGURATION_LIST
){
...
@@ -116,6 +124,14 @@ public class SystemConfiguration {
...
@@ -116,6 +124,14 @@ public class SystemConfiguration {
this
.
useRestAPI
.
setValue
(
useRestAPI
);
this
.
useRestAPI
.
setValue
(
useRestAPI
);
}
}
public
String
getSqlSeparator
()
{
return
sqlSeparator
.
getValue
().
toString
();
}
public
void
setSqlSeparator
(
String
sqlSeparator
)
{
this
.
sqlSeparator
.
setValue
(
sqlSeparator
);
}
enum
ValueType
{
enum
ValueType
{
STRING
,
INT
,
DOUBLE
,
FLOAT
,
BOOLEAN
,
DATE
STRING
,
INT
,
DOUBLE
,
FLOAT
,
BOOLEAN
,
DATE
}
}
...
...
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
View file @
f5742858
...
@@ -35,23 +35,25 @@ public class Explainer {
...
@@ -35,23 +35,25 @@ public class Explainer {
private
Executor
executor
;
private
Executor
executor
;
private
boolean
useStatementSet
;
private
boolean
useStatementSet
;
private
String
sqlSeparator
=
FlinkSQLConstant
.
SEPARATOR
;
private
ObjectMapper
mapper
=
new
ObjectMapper
();
private
ObjectMapper
mapper
=
new
ObjectMapper
();
public
Explainer
(
Executor
executor
)
{
public
Explainer
(
Executor
executor
)
{
this
.
executor
=
executor
;
this
.
executor
=
executor
;
}
}
public
Explainer
(
Executor
executor
,
boolean
useStatementSet
)
{
public
Explainer
(
Executor
executor
,
boolean
useStatementSet
,
String
sqlSeparator
)
{
this
.
executor
=
executor
;
this
.
executor
=
executor
;
this
.
useStatementSet
=
useStatementSet
;
this
.
useStatementSet
=
useStatementSet
;
this
.
sqlSeparator
=
sqlSeparator
;
}
}
public
static
Explainer
build
(
Executor
executor
){
public
static
Explainer
build
(
Executor
executor
){
return
new
Explainer
(
executor
,
false
);
return
new
Explainer
(
executor
,
false
,
";"
);
}
}
public
static
Explainer
build
(
Executor
executor
,
boolean
useStatementSet
){
public
static
Explainer
build
(
Executor
executor
,
boolean
useStatementSet
,
String
sqlSeparator
){
return
new
Explainer
(
executor
,
useStatementSet
);
return
new
Explainer
(
executor
,
useStatementSet
,
sqlSeparator
);
}
}
public
JobParam
pretreatStatements
(
String
[]
statements
)
{
public
JobParam
pretreatStatements
(
String
[]
statements
)
{
...
@@ -77,7 +79,7 @@ public class Explainer {
...
@@ -77,7 +79,7 @@ public class Explainer {
@Deprecated
@Deprecated
public
List
<
SqlExplainResult
>
explainSqlResult
(
String
statement
)
{
public
List
<
SqlExplainResult
>
explainSqlResult
(
String
statement
)
{
String
[]
sqls
=
SqlUtil
.
getStatements
(
statement
);
String
[]
sqls
=
SqlUtil
.
getStatements
(
statement
,
sqlSeparator
);
List
<
SqlExplainResult
>
sqlExplainRecords
=
new
ArrayList
<>();
List
<
SqlExplainResult
>
sqlExplainRecords
=
new
ArrayList
<>();
int
index
=
1
;
int
index
=
1
;
for
(
String
item
:
sqls
)
{
for
(
String
item
:
sqls
)
{
...
@@ -121,7 +123,7 @@ public class Explainer {
...
@@ -121,7 +123,7 @@ public class Explainer {
}
}
public
ExplainResult
explainSql
(
String
statement
)
{
public
ExplainResult
explainSql
(
String
statement
)
{
JobParam
jobParam
=
pretreatStatements
(
SqlUtil
.
getStatements
(
statement
));
JobParam
jobParam
=
pretreatStatements
(
SqlUtil
.
getStatements
(
statement
,
sqlSeparator
));
List
<
SqlExplainResult
>
sqlExplainRecords
=
new
ArrayList
<>();
List
<
SqlExplainResult
>
sqlExplainRecords
=
new
ArrayList
<>();
int
index
=
1
;
int
index
=
1
;
boolean
correct
=
true
;
boolean
correct
=
true
;
...
@@ -211,7 +213,7 @@ public class Explainer {
...
@@ -211,7 +213,7 @@ public class Explainer {
for
(
SqlExplainResult
item
:
sqlExplainRecords
)
{
for
(
SqlExplainResult
item
:
sqlExplainRecords
)
{
if
(
Asserts
.
isNotNull
(
item
.
getType
())
if
(
Asserts
.
isNotNull
(
item
.
getType
())
&&
item
.
getType
().
contains
(
FlinkSQLConstant
.
DML
))
{
&&
item
.
getType
().
contains
(
FlinkSQLConstant
.
DML
))
{
String
[]
statements
=
SqlUtil
.
getStatements
(
item
.
getSql
());
String
[]
statements
=
SqlUtil
.
getStatements
(
item
.
getSql
()
,
sqlSeparator
);
for
(
String
str
:
statements
){
for
(
String
str
:
statements
){
strPlans
.
add
(
str
);
strPlans
.
add
(
str
);
}
}
...
@@ -230,7 +232,7 @@ public class Explainer {
...
@@ -230,7 +232,7 @@ public class Explainer {
for
(
SqlExplainResult
item
:
sqlExplainRecords
)
{
for
(
SqlExplainResult
item
:
sqlExplainRecords
)
{
if
(
Asserts
.
isNotNull
(
item
.
getType
())
if
(
Asserts
.
isNotNull
(
item
.
getType
())
&&
item
.
getType
().
contains
(
FlinkSQLConstant
.
DML
))
{
&&
item
.
getType
().
contains
(
FlinkSQLConstant
.
DML
))
{
String
[]
statements
=
SqlUtil
.
getStatements
(
item
.
getSql
());
String
[]
statements
=
SqlUtil
.
getStatements
(
item
.
getSql
()
,
sqlSeparator
);
for
(
String
str
:
statements
){
for
(
String
str
:
statements
){
strPlans
.
add
(
str
);
strPlans
.
add
(
str
);
}
}
...
...
dlink-core/src/main/java/com/dlink/job/JobConfig.java
View file @
f5742858
...
@@ -42,7 +42,6 @@ public class JobConfig {
...
@@ -42,7 +42,6 @@ public class JobConfig {
private
SavePointStrategy
savePointStrategy
;
private
SavePointStrategy
savePointStrategy
;
private
String
savePointPath
;
private
String
savePointPath
;
private
GatewayConfig
gatewayConfig
;
private
GatewayConfig
gatewayConfig
;
private
boolean
useRestAPI
;
private
Map
<
String
,
String
>
config
;
private
Map
<
String
,
String
>
config
;
...
...
dlink-core/src/main/java/com/dlink/job/JobManager.java
View file @
f5742858
...
@@ -6,7 +6,6 @@ import com.dlink.constant.FlinkSQLConstant;
...
@@ -6,7 +6,6 @@ import com.dlink.constant.FlinkSQLConstant;
import
com.dlink.executor.EnvironmentSetting
;
import
com.dlink.executor.EnvironmentSetting
;
import
com.dlink.executor.Executor
;
import
com.dlink.executor.Executor
;
import
com.dlink.executor.ExecutorSetting
;
import
com.dlink.executor.ExecutorSetting
;
import
com.dlink.executor.custom.CustomTableEnvironmentImpl
;
import
com.dlink.explainer.Explainer
;
import
com.dlink.explainer.Explainer
;
import
com.dlink.gateway.Gateway
;
import
com.dlink.gateway.Gateway
;
import
com.dlink.gateway.GatewayType
;
import
com.dlink.gateway.GatewayType
;
...
@@ -17,6 +16,7 @@ import com.dlink.gateway.result.GatewayResult;
...
@@ -17,6 +16,7 @@ import com.dlink.gateway.result.GatewayResult;
import
com.dlink.gateway.result.SavePointResult
;
import
com.dlink.gateway.result.SavePointResult
;
import
com.dlink.gateway.result.TestResult
;
import
com.dlink.gateway.result.TestResult
;
import
com.dlink.interceptor.FlinkInterceptor
;
import
com.dlink.interceptor.FlinkInterceptor
;
import
com.dlink.model.SystemConfiguration
;
import
com.dlink.parser.SqlType
;
import
com.dlink.parser.SqlType
;
import
com.dlink.result.*
;
import
com.dlink.result.*
;
import
com.dlink.session.ExecutorEntity
;
import
com.dlink.session.ExecutorEntity
;
...
@@ -27,14 +27,12 @@ import com.dlink.trans.Operations;
...
@@ -27,14 +27,12 @@ import com.dlink.trans.Operations;
import
com.dlink.utils.SqlUtil
;
import
com.dlink.utils.SqlUtil
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.table.api.StatementSet
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.api.TableResult
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
java.time.LocalDateTime
;
import
java.time.LocalDateTime
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.List
;
/**
/**
...
@@ -57,6 +55,7 @@ public class JobManager {
...
@@ -57,6 +55,7 @@ public class JobManager {
private
boolean
isPlanMode
=
false
;
private
boolean
isPlanMode
=
false
;
private
boolean
useStatementSet
=
false
;
private
boolean
useStatementSet
=
false
;
private
boolean
useRestAPI
=
false
;
private
boolean
useRestAPI
=
false
;
private
String
sqlSeparator
=
FlinkSQLConstant
.
SEPARATOR
;
private
GatewayType
runMode
=
GatewayType
.
LOCAL
;
private
GatewayType
runMode
=
GatewayType
.
LOCAL
;
public
JobManager
()
{
public
JobManager
()
{
...
@@ -94,6 +93,14 @@ public class JobManager {
...
@@ -94,6 +93,14 @@ public class JobManager {
this
.
useRestAPI
=
useRestAPI
;
this
.
useRestAPI
=
useRestAPI
;
}
}
public
String
getSqlSeparator
()
{
return
sqlSeparator
;
}
public
void
setSqlSeparator
(
String
sqlSeparator
)
{
this
.
sqlSeparator
=
sqlSeparator
;
}
public
JobManager
(
JobConfig
config
)
{
public
JobManager
(
JobConfig
config
)
{
this
.
config
=
config
;
this
.
config
=
config
;
}
}
...
@@ -179,7 +186,8 @@ public class JobManager {
...
@@ -179,7 +186,8 @@ public class JobManager {
handler
=
JobHandler
.
build
();
handler
=
JobHandler
.
build
();
}
}
useStatementSet
=
config
.
isUseStatementSet
();
useStatementSet
=
config
.
isUseStatementSet
();
useRestAPI
=
config
.
isUseRestAPI
();
useRestAPI
=
SystemConfiguration
.
getInstances
().
isUseRestAPI
();
sqlSeparator
=
SystemConfiguration
.
getInstances
().
getSqlSeparator
();
initExecutorSetting
();
initExecutorSetting
();
createExecutorWithSession
();
createExecutorWithSession
();
return
false
;
return
false
;
...
@@ -211,7 +219,7 @@ public class JobManager {
...
@@ -211,7 +219,7 @@ public class JobManager {
ready
();
ready
();
String
currentSql
=
""
;
String
currentSql
=
""
;
// JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
// JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
JobParam
jobParam
=
Explainer
.
build
(
executor
,
useStatementSet
).
pretreatStatements
(
SqlUtil
.
getStatements
(
statement
));
JobParam
jobParam
=
Explainer
.
build
(
executor
,
useStatementSet
,
sqlSeparator
).
pretreatStatements
(
SqlUtil
.
getStatements
(
statement
,
sqlSeparator
));
try
{
try
{
for
(
StatementParam
item
:
jobParam
.
getDdl
())
{
for
(
StatementParam
item
:
jobParam
.
getDdl
())
{
currentSql
=
item
.
getValue
();
currentSql
=
item
.
getValue
();
...
@@ -223,7 +231,7 @@ public class JobManager {
...
@@ -223,7 +231,7 @@ public class JobManager {
for
(
StatementParam
item
:
jobParam
.
getTrans
())
{
for
(
StatementParam
item
:
jobParam
.
getTrans
())
{
inserts
.
add
(
item
.
getValue
());
inserts
.
add
(
item
.
getValue
());
}
}
currentSql
=
String
.
join
(
FlinkSQLConstant
.
SEPARATOR
,
inserts
);
currentSql
=
String
.
join
(
sqlSeparator
,
inserts
);
JobGraph
jobGraph
=
executor
.
getJobGraphFromInserts
(
inserts
);
JobGraph
jobGraph
=
executor
.
getJobGraphFromInserts
(
inserts
);
GatewayResult
gatewayResult
=
null
;
GatewayResult
gatewayResult
=
null
;
if
(
GatewayType
.
YARN_APPLICATION
.
equals
(
runMode
))
{
if
(
GatewayType
.
YARN_APPLICATION
.
equals
(
runMode
))
{
...
@@ -242,7 +250,7 @@ public class JobManager {
...
@@ -242,7 +250,7 @@ public class JobManager {
}
}
}
}
if
(
inserts
.
size
()
>
0
)
{
if
(
inserts
.
size
()
>
0
)
{
currentSql
=
String
.
join
(
FlinkSQLConstant
.
SEPARATOR
,
inserts
);
currentSql
=
String
.
join
(
sqlSeparator
,
inserts
);
TableResult
tableResult
=
executor
.
executeStatementSet
(
inserts
);
TableResult
tableResult
=
executor
.
executeStatementSet
(
inserts
);
if
(
tableResult
.
getJobClient
().
isPresent
())
{
if
(
tableResult
.
getJobClient
().
isPresent
())
{
job
.
setJobId
(
tableResult
.
getJobClient
().
get
().
getJobID
().
toHexString
());
job
.
setJobId
(
tableResult
.
getJobClient
().
get
().
getJobID
().
toHexString
());
...
@@ -258,7 +266,7 @@ public class JobManager {
...
@@ -258,7 +266,7 @@ public class JobManager {
inserts
.
add
(
item
.
getValue
());
inserts
.
add
(
item
.
getValue
());
break
;
break
;
}
}
currentSql
=
String
.
join
(
FlinkSQLConstant
.
SEPARATOR
,
inserts
);
currentSql
=
String
.
join
(
sqlSeparator
,
inserts
);
JobGraph
jobGraph
=
executor
.
getJobGraphFromInserts
(
inserts
);
JobGraph
jobGraph
=
executor
.
getJobGraphFromInserts
(
inserts
);
GatewayResult
gatewayResult
=
null
;
GatewayResult
gatewayResult
=
null
;
if
(
GatewayType
.
YARN_APPLICATION
.
equalsValue
(
config
.
getType
()))
{
if
(
GatewayType
.
YARN_APPLICATION
.
equalsValue
(
config
.
getType
()))
{
...
@@ -316,7 +324,7 @@ public class JobManager {
...
@@ -316,7 +324,7 @@ public class JobManager {
}
}
public
IResult
executeDDL
(
String
statement
)
{
public
IResult
executeDDL
(
String
statement
)
{
String
[]
statements
=
SqlUtil
.
getStatements
(
statement
);
String
[]
statements
=
SqlUtil
.
getStatements
(
statement
,
sqlSeparator
);
try
{
try
{
for
(
String
item
:
statements
)
{
for
(
String
item
:
statements
)
{
String
newStatement
=
executor
.
pretreatStatement
(
item
);
String
newStatement
=
executor
.
pretreatStatement
(
item
);
...
@@ -363,15 +371,15 @@ public class JobManager {
...
@@ -363,15 +371,15 @@ public class JobManager {
}
}
public
ExplainResult
explainSql
(
String
statement
)
{
public
ExplainResult
explainSql
(
String
statement
)
{
return
Explainer
.
build
(
executor
,
useStatementSet
).
explainSql
(
statement
);
return
Explainer
.
build
(
executor
,
useStatementSet
,
sqlSeparator
).
explainSql
(
statement
);
}
}
public
ObjectNode
getStreamGraph
(
String
statement
)
{
public
ObjectNode
getStreamGraph
(
String
statement
)
{
return
Explainer
.
build
(
executor
,
useStatementSet
).
getStreamGraph
(
statement
);
return
Explainer
.
build
(
executor
,
useStatementSet
,
sqlSeparator
).
getStreamGraph
(
statement
);
}
}
public
String
getJobPlanJson
(
String
statement
)
{
public
String
getJobPlanJson
(
String
statement
)
{
return
Explainer
.
build
(
executor
,
useStatementSet
).
getJobPlanInfo
(
statement
).
getJsonPlan
();
return
Explainer
.
build
(
executor
,
useStatementSet
,
sqlSeparator
).
getJobPlanInfo
(
statement
).
getJsonPlan
();
}
}
public
boolean
cancel
(
String
jobId
)
{
public
boolean
cancel
(
String
jobId
)
{
...
@@ -390,11 +398,11 @@ public class JobManager {
...
@@ -390,11 +398,11 @@ public class JobManager {
}
}
}
}
public
SavePointResult
savepoint
(
String
jobId
,
String
savePointType
)
{
public
SavePointResult
savepoint
(
String
jobId
,
String
savePointType
,
String
savePoint
)
{
if
(
useGateway
&&
!
useRestAPI
)
{
if
(
useGateway
&&
!
useRestAPI
)
{
config
.
getGatewayConfig
().
setFlinkConfig
(
FlinkConfig
.
build
(
jobId
,
ActionType
.
SAVEPOINT
.
getValue
(),
config
.
getGatewayConfig
().
setFlinkConfig
(
FlinkConfig
.
build
(
jobId
,
ActionType
.
SAVEPOINT
.
getValue
(),
savePointType
,
null
));
savePointType
,
null
));
return
Gateway
.
build
(
config
.
getGatewayConfig
()).
savepointJob
();
return
Gateway
.
build
(
config
.
getGatewayConfig
()).
savepointJob
(
savePoint
);
}
else
{
}
else
{
return
FlinkAPI
.
build
(
config
.
getAddress
()).
savepoints
(
jobId
,
savePointType
);
return
FlinkAPI
.
build
(
config
.
getAddress
()).
savepoints
(
jobId
,
savePointType
);
}
}
...
...
dlink-doc/images/main/jetbrains.svg
0 → 100644
View file @
f5742858
<svg
xmlns=
"http://www.w3.org/2000/svg"
width=
"120.1"
height=
"130.2"
><linearGradient
id=
"a"
gradientUnits=
"userSpaceOnUse"
x1=
"31.841"
y1=
"120.558"
x2=
"110.24"
y2=
"73.24"
><stop
offset=
"0"
stop-color=
"#fcee39"
/><stop
offset=
"1"
stop-color=
"#f37b3d"
/></linearGradient><path
d=
"M118.6 71.8c.9-.8 1.4-1.9 1.5-3.2.1-2.6-1.8-4.7-4.4-4.9-1.2-.1-2.4.4-3.3 1.1l-83.8 45.9c-1.9.8-3.6 2.2-4.7 4.1-2.9 4.8-1.3 11 3.6 13.9 3.4 2 7.5 1.8 10.7-.2.2-.2.5-.3.7-.5l78-54.8c.4-.3 1.5-1.1 1.7-1.4z"
fill=
"url(#a)"
/><linearGradient
id=
"b"
gradientUnits=
"userSpaceOnUse"
x1=
"48.361"
y1=
"6.908"
x2=
"119.918"
y2=
"69.555"
><stop
offset=
"0"
stop-color=
"#ef5a6b"
/><stop
offset=
".57"
stop-color=
"#f26f4e"
/><stop
offset=
"1"
stop-color=
"#f37b3d"
/></linearGradient><path
d=
"M118.8 65.1L55 2.5C53.6 1 51.6 0 49.3 0c-4.3 0-7.7 3.5-7.7 7.7 0 2.1.8 3.9 2.1 5.3.4.4.8.7 1.2 1l67.4 57.7c.8.7 1.8 1.2 3 1.3 2.6.1 4.7-1.8 4.9-4.4 0-1.3-.5-2.6-1.4-3.5z"
fill=
"url(#b)"
/><linearGradient
id=
"c"
gradientUnits=
"userSpaceOnUse"
x1=
"52.947"
y1=
"63.641"
x2=
"10.538"
y2=
"37.156"
><stop
offset=
"0"
stop-color=
"#7c59a4"
/><stop
offset=
".385"
stop-color=
"#af4c92"
/><stop
offset=
".765"
stop-color=
"#dc4183"
/><stop
offset=
".957"
stop-color=
"#ed3d7d"
/></linearGradient><path
d=
"M57.1 59.5c-.1 0-39.4-31-40.2-31.5l-1.8-.9c-5.8-2.2-12.2.8-14.4 6.6-1.9 5.1.2 10.7 4.6 13.4.7.4 1.3.7 2 .9.4.2 45.4 18.8 45.4 18.8 1.8.8 3.9.3 5.1-1.2 1.5-1.9 1.2-4.6-.7-6.1z"
fill=
"url(#c)"
/><linearGradient
id=
"d"
gradientUnits=
"userSpaceOnUse"
x1=
"52.174"
y1=
"3.702"
x2=
"10.771"
y2=
"37.897"
><stop
offset=
"0"
stop-color=
"#ef5a6b"
/><stop
offset=
".364"
stop-color=
"#ee4e72"
/><stop
offset=
"1"
stop-color=
"#ed3d7d"
/></linearGradient><path
d=
"M49.3 0c-1.7 0-3.3.6-4.6 1.5L4.9 28.3c-.1.1-.2.1-.2.2h-.1c-1.7 1.2-3.1 3-3.9 5.1-2.2 5.8.8 12.3 6.6 14.4 3.6 1.4 7.5.7 10.4-1.4.7-.5 1.3-1 1.8-1.6l34.6-31.2c1.8-1.4 3-3.6 3-6.1 0-4.2-3.5-7.7-7.8-7.7z"
fill=
"url(#d)"
/><path
d=
"M34.6 37.4h51v51h-51z"
/><path
fill=
"#fff"
d=
"M39 78.8h19.1V82H39z"
/><g
fill=
"#fff"
><path
d=
"M38.8 50.8l1.5-1.4c.4.5.8.8 1.3.8.6 0 .9-.4.9-1.2v-5.3h2.3V49c0 1-.3 1.8-.8 2.3-.5.5-1.3.8-2.3.8-1.5.1-2.3-.5-2.9-1.3zm6.5-7H52v1.9h-4.4V47h4v1.8h-4v1.3h4.5v2h-6.7l-.1-8.3zm9.7 2h-2.5v-2h7.3v2h-2.5v6.3H55v-6.3zM39 54h4.3c1 0 1.8.3 2.3.7.3.3.5.8.5 1.4 0 1-.5 1.5-1.3 1.9 1 .3 1.6.9 1.6 2 0 1.4-1.2 2.3-3.1 2.3H39V54zm4.8 2.6c0-.5-.4-.7-1-.7h-1.5v1.5h1.4c.7-.1 1.1-.3 1.1-.8zM43 59h-1.8v1.5H43c.7 0 1.1-.3 1.1-.8s-.4-.7-1.1-.7zm3.8-5h3.9c1.3 0 2.1.3 2.7.9.5.5.7 1.1.7 1.9 0 1.3-.7 2.1-1.7 2.6l2 2.9h-2.6l-1.7-2.5h-1v2.5h-2.3V54zm3.8 4c.8 0 1.2-.4 1.2-1 0-.7-.5-1-1.2-1h-1.5v2h1.5z"
/><path
d=
"M56.8 54H59l3.5 8.4H60l-.6-1.5h-3.2l-.6 1.5h-2.4l3.6-8.4zm2 5l-.9-2.3L57 59h1.8zm4-5h2.3v8.3h-2.3V54zm2.9 0h2.1l3.4 4.4V54h2.3v8.3h-2L68 57.8v4.6h-2.3V54zm8 7.1l1.3-1.5c.8.7 1.7 1 2.7 1 .6 0 1-.2 1-.6 0-.4-.3-.5-1.4-.8-1.8-.4-3.1-.9-3.1-2.6 0-1.5 1.2-2.7 3.2-2.7 1.4 0 2.5.4 3.4 1.1l-1.2 1.6c-.8-.5-1.6-.8-2.3-.8-.6 0-.8.2-.8.5 0 .4.3.5 1.4.8 1.9.4 3.1 1 3.1 2.6 0 1.7-1.3 2.7-3.4 2.7-1.5.1-2.9-.4-3.9-1.3z"
/></g></svg>
\ No newline at end of file
dlink-executor/src/main/java/com/dlink/utils/SqlUtil.java
View file @
f5742858
...
@@ -11,11 +11,11 @@ import com.dlink.constant.FlinkSQLConstant;
...
@@ -11,11 +11,11 @@ import com.dlink.constant.FlinkSQLConstant;
*/
*/
public
class
SqlUtil
{
public
class
SqlUtil
{
public
static
String
[]
getStatements
(
String
sql
){
public
static
String
[]
getStatements
(
String
sql
,
String
sqlSeparator
){
if
(
Asserts
.
isNullString
(
sql
)){
if
(
Asserts
.
isNullString
(
sql
)){
return
new
String
[
0
];
return
new
String
[
0
];
}
}
return
sql
.
split
(
FlinkSQLConstant
.
SEPARATOR
);
return
sql
.
split
(
sqlSeparator
);
}
}
public
static
String
removeNote
(
String
sql
){
public
static
String
removeNote
(
String
sql
){
...
...
dlink-gateway/src/main/java/com/dlink/gateway/Gateway.java
View file @
f5742858
...
@@ -55,8 +55,12 @@ public interface Gateway {
...
@@ -55,8 +55,12 @@ public interface Gateway {
SavePointResult
savepointCluster
();
SavePointResult
savepointCluster
();
SavePointResult
savepointCluster
(
String
savePoint
);
SavePointResult
savepointJob
();
SavePointResult
savepointJob
();
SavePointResult
savepointJob
(
String
savePoint
);
TestResult
test
();
TestResult
test
();
}
}
dlink-gateway/src/main/java/com/dlink/gateway/yarn/YarnGateway.java
View file @
f5742858
...
@@ -93,6 +93,10 @@ public abstract class YarnGateway extends AbstractGateway {
...
@@ -93,6 +93,10 @@ public abstract class YarnGateway extends AbstractGateway {
}
}
public
SavePointResult
savepointCluster
(){
public
SavePointResult
savepointCluster
(){
return
savepointCluster
(
null
);
}
public
SavePointResult
savepointCluster
(
String
savePoint
){
if
(
Asserts
.
isNull
(
yarnClient
)){
if
(
Asserts
.
isNull
(
yarnClient
)){
init
();
init
();
}
}
...
@@ -123,7 +127,7 @@ public abstract class YarnGateway extends AbstractGateway {
...
@@ -123,7 +127,7 @@ public abstract class YarnGateway extends AbstractGateway {
jobInfo
.
setStatus
(
JobInfo
.
JobStatus
.
RUN
);
jobInfo
.
setStatus
(
JobInfo
.
JobStatus
.
RUN
);
jobInfos
.
add
(
jobInfo
);
jobInfos
.
add
(
jobInfo
);
}
}
runSavePointJob
(
jobInfos
,
clusterClient
);
runSavePointJob
(
jobInfos
,
clusterClient
,
savePoint
);
result
.
setJobInfos
(
jobInfos
);
result
.
setJobInfos
(
jobInfos
);
}
catch
(
Exception
e
){
}
catch
(
Exception
e
){
e
.
printStackTrace
();
e
.
printStackTrace
();
...
@@ -134,6 +138,10 @@ public abstract class YarnGateway extends AbstractGateway {
...
@@ -134,6 +138,10 @@ public abstract class YarnGateway extends AbstractGateway {
}
}
public
SavePointResult
savepointJob
(){
public
SavePointResult
savepointJob
(){
return
savepointJob
(
null
);
}
public
SavePointResult
savepointJob
(
String
savePoint
){
if
(
Asserts
.
isNull
(
yarnClient
)){
if
(
Asserts
.
isNull
(
yarnClient
)){
init
();
init
();
}
}
...
@@ -163,7 +171,7 @@ public abstract class YarnGateway extends AbstractGateway {
...
@@ -163,7 +171,7 @@ public abstract class YarnGateway extends AbstractGateway {
applicationId
).
getClusterClient
()){
applicationId
).
getClusterClient
()){
List
<
JobInfo
>
jobInfos
=
new
ArrayList
<>();
List
<
JobInfo
>
jobInfos
=
new
ArrayList
<>();
jobInfos
.
add
(
new
JobInfo
(
config
.
getFlinkConfig
().
getJobId
(),
JobInfo
.
JobStatus
.
FAIL
));
jobInfos
.
add
(
new
JobInfo
(
config
.
getFlinkConfig
().
getJobId
(),
JobInfo
.
JobStatus
.
FAIL
));
runSavePointJob
(
jobInfos
,
clusterClient
);
runSavePointJob
(
jobInfos
,
clusterClient
,
savePoint
);
result
.
setJobInfos
(
jobInfos
);
result
.
setJobInfos
(
jobInfos
);
}
catch
(
Exception
e
){
}
catch
(
Exception
e
){
e
.
printStackTrace
();
e
.
printStackTrace
();
...
@@ -173,18 +181,7 @@ public abstract class YarnGateway extends AbstractGateway {
...
@@ -173,18 +181,7 @@ public abstract class YarnGateway extends AbstractGateway {
return
result
;
return
result
;
}
}
private
void
runSavePointJob
(
List
<
JobInfo
>
jobInfos
,
ClusterClient
<
ApplicationId
>
clusterClient
)
throws
Exception
{
private
void
runSavePointJob
(
List
<
JobInfo
>
jobInfos
,
ClusterClient
<
ApplicationId
>
clusterClient
,
String
savePoint
)
throws
Exception
{
String
savePoint
=
null
;
/*String savePoint = FlinkConfig.DEFAULT_SAVEPOINT_PREFIX;
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())){
savePoint = config.getFlinkConfig().getSavePoint();
}
if(Asserts.isNotNull(config.getTaskId())){
if(savePoint.lastIndexOf("/")!=savePoint.length()){
savePoint = savePoint + "/";
}
savePoint = savePoint + config.getTaskId();
}*/
for
(
JobInfo
jobInfo:
jobInfos
){
for
(
JobInfo
jobInfo:
jobInfos
){
if
(
ActionType
.
CANCEL
==
config
.
getFlinkConfig
().
getAction
()){
if
(
ActionType
.
CANCEL
==
config
.
getFlinkConfig
().
getAction
()){
clusterClient
.
cancel
(
JobID
.
fromHexString
(
jobInfo
.
getJobId
()));
clusterClient
.
cancel
(
JobID
.
fromHexString
(
jobInfo
.
getJobId
()));
...
...
dlink-web/src/pages/Settings/components/flinkConfig.tsx
View file @
f5742858
...
@@ -9,12 +9,13 @@ type FlinkConfigProps = {
...
@@ -9,12 +9,13 @@ type FlinkConfigProps = {
sqlSubmitJarParas
:
SettingsStateType
[
'sqlSubmitJarParas'
];
sqlSubmitJarParas
:
SettingsStateType
[
'sqlSubmitJarParas'
];
sqlSubmitJarMainAppClass
:
SettingsStateType
[
'sqlSubmitJarMainAppClass'
];
sqlSubmitJarMainAppClass
:
SettingsStateType
[
'sqlSubmitJarMainAppClass'
];
useRestAPI
:
SettingsStateType
[
'useRestAPI'
];
useRestAPI
:
SettingsStateType
[
'useRestAPI'
];
sqlSeparator
:
SettingsStateType
[
'sqlSeparator'
];
dispatch
:
any
;
dispatch
:
any
;
};
};
const
FlinkConfigView
:
React
.
FC
<
FlinkConfigProps
>
=
(
props
)
=>
{
const
FlinkConfigView
:
React
.
FC
<
FlinkConfigProps
>
=
(
props
)
=>
{
const
{
sqlSubmitJarPath
,
sqlSubmitJarParas
,
sqlSubmitJarMainAppClass
,
useRestAPI
,
dispatch
}
=
props
;
const
{
sqlSubmitJarPath
,
sqlSubmitJarParas
,
sqlSubmitJarMainAppClass
,
useRestAPI
,
sqlSeparator
,
dispatch
}
=
props
;
const
[
editName
,
setEditName
]
=
useState
<
string
>
(
''
);
const
[
editName
,
setEditName
]
=
useState
<
string
>
(
''
);
const
[
formValues
,
setFormValues
]
=
useState
(
props
);
const
[
formValues
,
setFormValues
]
=
useState
(
props
);
const
[
form
]
=
Form
.
useForm
();
const
[
form
]
=
Form
.
useForm
();
...
@@ -73,6 +74,18 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
...
@@ -73,6 +74,18 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
<
Switch
checkedChildren=
"启用"
unCheckedChildren=
"禁用"
<
Switch
checkedChildren=
"启用"
unCheckedChildren=
"禁用"
checked=
{
useRestAPI
}
checked=
{
useRestAPI
}
/></
Form
.
Item
>],
/></
Form
.
Item
>],
},{
title
:
'FlinkSQL语句分割符'
,
description
:
(
editName
!=
'sqlSeparator'
?
(
sqlSeparator
?
sqlSeparator
:
'未设置'
):(<
Input
id=
'sqlSeparator'
defaultValue=
{
sqlSeparator
}
onChange=
{
onChange
}
placeholder=
";"
/>)),
actions
:
editName
!=
'sqlSeparator'
?[<
a
onClick=
{
({})
=>
handleEditClick
(
'sqlSeparator'
)
}
>
修改
</
a
>]:
[<
a
onClick=
{
({})
=>
handleSaveClick
(
'sqlSeparator'
)
}
>
保存
</
a
>,
<
a
onClick=
{
({})
=>
handleCancelClick
()
}
>
取消
</
a
>],
},
},
];
];
...
@@ -134,4 +147,5 @@ export default connect(({Settings}: { Settings: SettingsStateType }) => ({
...
@@ -134,4 +147,5 @@ export default connect(({Settings}: { Settings: SettingsStateType }) => ({
sqlSubmitJarParas
:
Settings
.
sqlSubmitJarParas
,
sqlSubmitJarParas
:
Settings
.
sqlSubmitJarParas
,
sqlSubmitJarMainAppClass
:
Settings
.
sqlSubmitJarMainAppClass
,
sqlSubmitJarMainAppClass
:
Settings
.
sqlSubmitJarMainAppClass
,
useRestAPI
:
Settings
.
useRestAPI
,
useRestAPI
:
Settings
.
useRestAPI
,
sqlSeparator
:
Settings
.
sqlSeparator
,
}))(
FlinkConfigView
);
}))(
FlinkConfigView
);
dlink-web/src/pages/Settings/model.ts
View file @
f5742858
import
{
Effect
,
Reducer
}
from
"umi"
;
import
{
Effect
,
Reducer
}
from
"umi"
;
export
type
SettingsStateType
=
{
export
type
SettingsStateType
=
{
sqlSubmitJarPath
:
string
,
sqlSubmitJarPath
:
string
,
sqlSubmitJarParas
:
string
,
sqlSubmitJarParas
:
string
,
sqlSubmitJarMainAppClass
:
string
,
sqlSubmitJarMainAppClass
:
string
,
useRestAPI
:
boolean
,
useRestAPI
:
boolean
,
sqlSeparator
:
string
,
};
};
export
type
ModelType
=
{
export
type
ModelType
=
{
...
...
dlink-web/src/pages/Welcome.tsx
View file @
f5742858
...
@@ -475,6 +475,9 @@ export default (): React.ReactNode => {
...
@@ -475,6 +475,9 @@ export default (): React.ReactNode => {
<
li
>
<
li
>
<
Link
>
新增 OpenAPI 的执行sql、校验sql、获取计划图、获取StreamGraph、获取预览数据接口
</
Link
>
<
Link
>
新增 OpenAPI 的执行sql、校验sql、获取计划图、获取StreamGraph、获取预览数据接口
</
Link
>
</
li
>
</
li
>
<
li
>
<
Link
>
新增 OpenAPI 的执行Jar、停止、SavePoint接口
</
Link
>
</
li
>
</
ul
>
</
ul
>
</
Paragraph
>
</
Paragraph
>
</
Timeline
.
Item
>
</
Timeline
.
Item
>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment