Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
e07cc3b7
Commit
e07cc3b7
authored
Jun 29, 2021
by
godkaikai
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
JobManager重构
parent
d9dff4a9
Changes
16
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
279 additions
and
125 deletions
+279
-125
SpringContextUtils.java
...n/src/main/java/com/dlink/context/SpringContextUtils.java
+44
-0
StudioController.java
.../src/main/java/com/dlink/controller/StudioController.java
+2
-1
StudioExecuteDTO.java
...k-admin/src/main/java/com/dlink/dto/StudioExecuteDTO.java
+13
-4
Job2MysqlHandler.java
...k-admin/src/main/java/com/dlink/job/Job2MysqlHandler.java
+28
-3
History.java
dlink-admin/src/main/java/com/dlink/model/History.java
+3
-2
StudioService.java
...-admin/src/main/java/com/dlink/service/StudioService.java
+3
-0
StudioServiceImpl.java
...c/main/java/com/dlink/service/impl/StudioServiceImpl.java
+23
-0
HistoryMapper.xml
dlink-admin/src/main/resources/mapper/HistoryMapper.xml
+1
-4
Executor.java
dlink-core/src/main/java/com/dlink/executor/Executor.java
+2
-2
CABuilder.java
...-core/src/main/java/com/dlink/explainer/ca/CABuilder.java
+49
-0
Job.java
dlink-core/src/main/java/com/dlink/job/Job.java
+5
-14
JobConfig.java
dlink-core/src/main/java/com/dlink/job/JobConfig.java
+31
-5
JobManager.java
dlink-core/src/main/java/com/dlink/job/JobManager.java
+49
-50
RunTime.java
dlink-core/src/main/java/com/dlink/job/RunTime.java
+1
-1
dlink.sql
dlink-doc/sql/dlink.sql
+25
-0
dlink_flink_document.sql
dlink-doc/sql/dlink_flink_document.sql
+0
-39
No files found.
dlink-admin/src/main/java/com/dlink/context/SpringContextUtils.java
0 → 100644
View file @
e07cc3b7
package
com
.
dlink
.
context
;
import
org.springframework.beans.BeansException
;
import
org.springframework.context.ApplicationContext
;
import
org.springframework.context.ApplicationContextAware
;
import
org.springframework.stereotype.Component
;
/**
* SpringContextUtils
*
* @author qiwenkai
* @since 2021/6/29 15:36
**/
@Component
public
class
SpringContextUtils
implements
ApplicationContextAware
{
public
static
ApplicationContext
applicationContext
;
@Override
public
void
setApplicationContext
(
ApplicationContext
applicationContext
)
throws
BeansException
{
SpringContextUtils
.
applicationContext
=
applicationContext
;
}
public
static
Object
getBean
(
String
name
)
{
return
applicationContext
.
getBean
(
name
);
}
public
static
<
T
>
T
getBean
(
String
name
,
Class
<
T
>
requiredType
)
{
return
applicationContext
.
getBean
(
name
,
requiredType
);
}
public
static
boolean
containsBean
(
String
name
)
{
return
applicationContext
.
containsBean
(
name
);
}
public
static
boolean
isSingleton
(
String
name
)
{
return
applicationContext
.
isSingleton
(
name
);
}
public
static
Class
<?
extends
Object
>
getType
(
String
name
)
{
return
applicationContext
.
getType
(
name
);
}
}
dlink-admin/src/main/java/com/dlink/controller/StudioController.java
View file @
e07cc3b7
...
...
@@ -34,7 +34,8 @@ public class StudioController {
*/
@PostMapping
(
"/executeSql"
)
public
Result
executeSql
(
@RequestBody
StudioExecuteDTO
studioExecuteDTO
)
{
RunResult
runResult
=
studioService
.
executeSql
(
studioExecuteDTO
);
// RunResult runResult = studioService.executeSql(studioExecuteDTO);
Integer
runResult
=
studioService
.
executeSqlTest
(
studioExecuteDTO
);
return
Result
.
succeed
(
runResult
,
"执行成功"
);
}
...
...
dlink-admin/src/main/java/com/dlink/dto/StudioExecuteDTO.java
View file @
e07cc3b7
package
com
.
dlink
.
dto
;
import
com.dlink.job.JobConfig
;
import
lombok.Getter
;
import
lombok.Setter
;
...
...
@@ -12,13 +13,21 @@ import lombok.Setter;
@Getter
@Setter
public
class
StudioExecuteDTO
{
private
boolean
isResult
=
true
;
private
boolean
isSession
=
false
;
private
String
session
;
private
boolean
isRemote
=
false
;
private
Integer
clusterId
;
private
boolean
fragment
=
false
;
private
String
statement
;
private
Integer
clusterId
=
0
;
private
String
jobName
;
private
Integer
taskId
;
private
Integer
maxRowNum
=
100
;
private
Integer
checkPoint
=
0
;
private
Integer
parallelism
=
1
;
private
Integer
maxRowNum
=
100
;
private
boolean
fragment
=
false
;
private
String
savePointPath
;
private
String
jobName
;
public
JobConfig
getJobConfig
(){
return
new
JobConfig
(
isResult
,
isSession
,
session
,
isRemote
,
clusterId
,
taskId
,
jobName
,
fragment
,
maxRowNum
,
checkPoint
,
parallelism
,
savePointPath
);
}
}
dlink-admin/src/main/java/com/dlink/job/Job2MysqlHandler.java
View file @
e07cc3b7
package
com
.
dlink
.
job
;
import
cn.hutool.extra.spring.SpringUtil
;
import
cn.hutool.json.JSONUtil
;
import
com.dlink.context.SpringContextUtils
;
import
com.dlink.model.History
;
import
com.dlink.service.HistoryService
;
import
org.springframework.context.annotation.DependsOn
;
/**
* Job2MysqlHandler
...
...
@@ -10,8 +12,15 @@ import com.dlink.service.HistoryService;
* @author wenmo
* @since 2021/6/27 0:04
*/
@DependsOn
(
"springContextUtils"
)
public
class
Job2MysqlHandler
implements
JobHandler
{
private
static
HistoryService
historyService
;
static
{
historyService
=
SpringContextUtils
.
getBean
(
"historyServiceImpl"
,
HistoryService
.
class
);
}
@Override
public
boolean
init
()
{
Job
job
=
JobContextHolder
.
getJob
();
...
...
@@ -22,10 +31,10 @@ public class Job2MysqlHandler implements JobHandler {
history
.
setSession
(
job
.
getJobConfig
().
getSession
());
history
.
setStatus
(
job
.
getStatus
().
ordinal
());
history
.
setStartTime
(
job
.
getStartTime
());
history
.
setType
(
job
.
getType
().
ordinal
());
history
.
setTaskId
(
job
.
getJobConfig
().
getTaskId
());
HistoryService
historyService
=
SpringUtil
.
getBean
(
HistoryService
.
class
);
history
.
setConfig
(
JSONUtil
.
toJsonStr
(
job
.
getJobConfig
())
);
historyService
.
save
(
history
);
job
.
setId
(
history
.
getId
());
return
true
;
}
...
...
@@ -41,11 +50,27 @@ public class Job2MysqlHandler implements JobHandler {
@Override
public
boolean
success
()
{
Job
job
=
JobContextHolder
.
getJob
();
History
history
=
new
History
();
history
.
setId
(
job
.
getId
());
history
.
setJobId
(
job
.
getJobId
());
history
.
setStatus
(
job
.
getStatus
().
ordinal
());
history
.
setEndTime
(
job
.
getEndTime
());
history
.
setResult
(
JSONUtil
.
toJsonStr
(
job
.
getResult
()));
historyService
.
updateById
(
history
);
return
true
;
}
@Override
public
boolean
failed
()
{
Job
job
=
JobContextHolder
.
getJob
();
History
history
=
new
History
();
history
.
setId
(
job
.
getId
());
history
.
setJobId
(
job
.
getJobId
());
history
.
setStatus
(
job
.
getStatus
().
ordinal
());
history
.
setEndTime
(
job
.
getEndTime
());
history
.
setError
(
job
.
getError
());
historyService
.
updateById
(
history
);
return
true
;
}
...
...
dlink-admin/src/main/java/com/dlink/model/History.java
View file @
e07cc3b7
package
com
.
dlink
.
model
;
import
com.baomidou.mybatisplus.annotation.TableField
;
import
com.baomidou.mybatisplus.annotation.TableName
;
import
lombok.Data
;
import
lombok.EqualsAndHashCode
;
...
...
@@ -28,13 +29,13 @@ public class History implements Serializable {
private
String
jobManagerAddress
;
private
Integer
status
;
private
String
statement
;
private
Integer
type
;
private
String
error
;
private
String
result
;
private
String
config
;
private
LocalDate
startTime
;
private
LocalDate
endTime
;
private
String
msg
;
private
Integer
taskId
;
@TableField
(
exist
=
false
)
private
String
statusText
;
}
dlink-admin/src/main/java/com/dlink/service/StudioService.java
View file @
e07cc3b7
...
...
@@ -4,6 +4,7 @@ import com.dlink.dto.StudioDDLDTO;
import
com.dlink.dto.StudioExecuteDTO
;
import
com.dlink.explainer.ca.TableCANode
;
import
com.dlink.result.RunResult
;
import
org.apache.flink.table.planner.expressions.In
;
import
java.util.List
;
...
...
@@ -16,6 +17,8 @@ import java.util.List;
public
interface
StudioService
{
RunResult
executeSql
(
StudioExecuteDTO
studioExecuteDTO
);
Integer
executeSqlTest
(
StudioExecuteDTO
studioExecuteDTO
);
RunResult
executeDDL
(
StudioDDLDTO
studioDDLDTO
);
boolean
clearSession
(
String
session
);
...
...
dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java
View file @
e07cc3b7
...
...
@@ -5,10 +5,12 @@ import com.dlink.cluster.FlinkCluster;
import
com.dlink.dto.StudioDDLDTO
;
import
com.dlink.dto.StudioExecuteDTO
;
import
com.dlink.exception.BusException
;
import
com.dlink.exception.JobException
;
import
com.dlink.executor.Executor
;
import
com.dlink.executor.ExecutorSetting
;
import
com.dlink.explainer.ca.CABuilder
;
import
com.dlink.explainer.ca.TableCANode
;
import
com.dlink.job.JobConfig
;
import
com.dlink.job.JobManager
;
import
com.dlink.model.Cluster
;
import
com.dlink.result.RunResult
;
...
...
@@ -65,6 +67,27 @@ public class StudioServiceImpl implements StudioService {
return
jobManager
.
execute
(
studioExecuteDTO
.
getStatement
());
}
@Override
public
Integer
executeSqlTest
(
StudioExecuteDTO
studioExecuteDTO
)
{
studioExecuteDTO
.
setSession
(
studioExecuteDTO
.
getClusterId
()+
"_"
+
studioExecuteDTO
.
getSession
());
JobConfig
config
=
studioExecuteDTO
.
getJobConfig
();
Cluster
cluster
=
clusterService
.
getById
(
studioExecuteDTO
.
getClusterId
());
if
(
cluster
==
null
){
throw
new
JobException
(
"未获取到集群信息"
);
}
else
{
Assert
.
check
(
cluster
);
String
host
=
FlinkCluster
.
testFlinkJobManagerIP
(
cluster
.
getHosts
(),
cluster
.
getJobManagerHost
());
Assert
.
checkHost
(
host
);
if
(!
host
.
equals
(
cluster
.
getJobManagerHost
())){
cluster
.
setJobManagerHost
(
host
);
clusterService
.
updateById
(
cluster
);
}
config
.
setHost
(
host
);
}
JobManager
jobManager
=
JobManager
.
build
(
config
);
return
jobManager
.
executeSql
(
studioExecuteDTO
.
getStatement
());
}
@Override
public
RunResult
executeDDL
(
StudioDDLDTO
studioDDLDTO
)
{
studioDDLDTO
.
setSession
(
studioDDLDTO
.
getClusterId
()+
"_"
+
studioDDLDTO
.
getSession
());
...
...
dlink-admin/src/main/resources/mapper/HistoryMapper.xml
View file @
e07cc3b7
...
...
@@ -12,13 +12,11 @@
<result
column=
"job_manager_address"
property=
"jobManagerAddress"
/>
<result
column=
"status"
property=
"status"
/>
<result
column=
"statement"
property=
"statement"
/>
<result
column=
"type"
property=
"type"
/>
<result
column=
"error"
property=
"error"
/>
<result
column=
"result"
property=
"result"
/>
<result
column=
"config"
property=
"config"
/>
<result
column=
"start_time"
property=
"startTime"
/>
<result
column=
"end_time"
property=
"endTime"
/>
<result
column=
"msg"
property=
"msg"
/>
<result
column=
"task_id"
property=
"taskId"
/>
</resultMap>
...
...
@@ -26,8 +24,7 @@
<sql
id=
"Base_Column_List"
>
id,cluster_id,session,jod_id,job_name,
job_manager_address,status,statement,type, error,
result,result,config,start_time,end_time,
msg,task_id
result,config,start_time,end_time,task_id
</sql>
...
...
dlink-core/src/main/java/com/dlink/executor/Executor.java
View file @
e07cc3b7
...
...
@@ -58,8 +58,8 @@ public abstract class Executor {
return
environmentSetting
;
}
public
JobExecutionResult
execute
(
String
statement
)
throws
Exception
{
return
stEnvironment
.
execute
(
statement
);
public
JobExecutionResult
execute
(
String
jobName
)
throws
Exception
{
return
stEnvironment
.
execute
(
jobName
);
}
public
TableResult
executeSql
(
String
statement
){
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/CABuilder.java
View file @
e07cc3b7
...
...
@@ -4,6 +4,7 @@ import com.dlink.plus.FlinkSqlPlus;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Set
;
/**
* CABuilder
...
...
@@ -51,4 +52,52 @@ public class CABuilder {
}
return
tableCANodes
;
}
public
static
List
<
ColumnCANode
>
getColumnCAByStatement
(
String
statement
){
List
<
ColumnCANode
>
columnCANodes
=
new
ArrayList
<>();
FlinkSqlPlus
plus
=
FlinkSqlPlus
.
build
();
List
<
ColumnCAResult
>
columnCAResults
=
plus
.
explainSqlColumnCA
(
statement
);
for
(
int
j
=
0
;
j
<
columnCAResults
.
size
();
j
++)
{
ColumnCAResult
result
=
columnCAResults
.
get
(
j
);
ColumnCANode
node
=
new
ColumnCANode
();
List
<
Integer
>
sinkColumns
=
result
.
getSinkColumns
();
for
(
int
k
=
0
;
k
<
sinkColumns
.
size
();
k
++)
{
ColumnCA
columnCA
=
(
ColumnCA
)
result
.
getColumnCASMaps
().
get
(
sinkColumns
.
get
(
k
));
node
.
setName
(
columnCA
.
getAlias
());
node
.
setType
(
columnCA
.
getType
());
node
.
setTitle
(
columnCA
.
getAlias
());
node
.
setOperation
(
columnCA
.
getOperation
());
List
<
ColumnCANode
>
children
=
new
ArrayList
<>();
buildColumnCANodeChildren
(
children
,
result
,
sinkColumns
.
get
(
k
),
columnCA
.
getOperation
());
node
.
setChildren
(
children
);
}
columnCANodes
.
add
(
node
);
}
return
columnCANodes
;
}
private
static
void
buildColumnCANodeChildren
(
List
<
ColumnCANode
>
children
,
ColumnCAResult
result
,
Integer
columnId
,
String
operation
){
Set
<
NodeRel
>
columnCASRel
=
result
.
getColumnCASRel
();
boolean
hasChildren
=
false
;
for
(
NodeRel
nodeRel
:
columnCASRel
)
{
if
(
columnId
==
nodeRel
.
getSufId
()){
ColumnCA
childca
=
(
ColumnCA
)
result
.
getColumnCASMaps
().
get
(
nodeRel
.
getPreId
());
// operation = operation.replaceAll(childca.getAlias().replaceAll("\\$","\\\\$"),childca.getOperation());
operation
=
operation
.
replaceAll
(
childca
.
getAlias
()
.
replaceAll
(
"\\)"
,
""
),
childca
.
getOperation
());
buildColumnCANodeChildren
(
children
,
result
,
nodeRel
.
getPreId
(),
operation
);
hasChildren
=
true
;
}
}
if
(!
hasChildren
){
ColumnCA
columnCA
=
(
ColumnCA
)
result
.
getColumnCASMaps
().
get
(
columnId
);
if
(
result
.
getSourColumns
().
contains
(
columnCA
.
getId
()))
{
ColumnCANode
columnCANode
=
new
ColumnCANode
();
columnCANode
.
setName
(
columnCA
.
getName
());
columnCANode
.
setTitle
(
columnCA
.
getName
());
columnCANode
.
setOperation
(
operation
);
children
.
add
(
columnCANode
);
}
}
}
}
dlink-core/src/main/java/com/dlink/job/Job.java
View file @
e07cc3b7
...
...
@@ -2,6 +2,7 @@ package com.dlink.job;
import
com.dlink.executor.Executor
;
import
com.dlink.executor.ExecutorSetting
;
import
com.dlink.result.IResult
;
import
lombok.Getter
;
import
lombok.Setter
;
...
...
@@ -16,26 +17,19 @@ import java.time.LocalDate;
@Getter
@Setter
public
class
Job
{
private
Integer
id
;
private
JobConfig
jobConfig
;
private
String
jobManagerAddress
;
private
boolean
isRemote
;
private
boolean
isSession
;
private
JobStatus
status
;
private
String
statement
;
private
JobType
type
;
private
String
jobId
;
private
String
error
;
private
String
result
;
private
IResult
result
;
private
ExecutorSetting
executorSetting
;
private
LocalDate
startTime
;
private
LocalDate
endTime
;
private
String
msg
;
private
Executor
executor
;
enum
JobType
{
EXECUTE
,
SUBMIT
}
enum
JobStatus
{
INITIALIZE
,
RUNNING
,
...
...
@@ -44,14 +38,11 @@ public class Job {
CANCEL
}
public
Job
(
JobConfig
jobConfig
,
String
jobManagerAddress
,
boolean
isRemote
,
boolean
isSession
,
JobStatus
status
,
String
statement
,
JobType
type
,
ExecutorSetting
executorSetting
,
LocalDate
startTime
,
Executor
executor
)
{
public
Job
(
JobConfig
jobConfig
,
String
jobManagerAddress
,
JobStatus
status
,
String
statement
,
ExecutorSetting
executorSetting
,
LocalDate
startTime
,
Executor
executor
)
{
this
.
jobConfig
=
jobConfig
;
this
.
jobManagerAddress
=
jobManagerAddress
;
this
.
isRemote
=
isRemote
;
this
.
isSession
=
isSession
;
this
.
status
=
status
;
this
.
statement
=
statement
;
this
.
type
=
type
;
this
.
executorSetting
=
executorSetting
;
this
.
startTime
=
startTime
;
this
.
executor
=
executor
;
...
...
dlink-core/src/main/java/com/dlink/job/JobConfig.java
View file @
e07cc3b7
package
com
.
dlink
.
job
;
import
com.dlink.executor.Executor
;
import
com.dlink.executor.ExecutorSetting
;
import
lombok.Getter
;
import
lombok.Setter
;
...
...
@@ -13,18 +14,43 @@ import lombok.Setter;
@Getter
@Setter
public
class
JobConfig
{
private
String
host
;
private
boolean
isResult
;
private
boolean
isSession
;
private
String
session
;
private
String
type
;
private
Integer
taskId
;
private
boolean
isRemote
;
private
Integer
clusterId
;
private
String
host
;
private
Integer
taskId
;
private
String
jobName
;
private
boolean
useSqlFragment
;
private
Integer
maxRowNum
;
private
Integer
checkpoint
;
private
Integer
parallelism
;
private
boolean
useSqlFragment
;
private
String
savePointPath
;
private
String
jobName
;
public
JobConfig
(
boolean
isResult
,
boolean
isSession
,
String
session
,
boolean
isRemote
,
Integer
clusterId
,
Integer
taskId
,
String
jobName
,
boolean
useSqlFragment
,
Integer
maxRowNum
,
Integer
checkpoint
,
Integer
parallelism
,
String
savePointPath
)
{
this
.
isResult
=
isResult
;
this
.
isSession
=
isSession
;
this
.
session
=
session
;
this
.
isRemote
=
isRemote
;
this
.
clusterId
=
clusterId
;
this
.
taskId
=
taskId
;
this
.
jobName
=
jobName
;
this
.
useSqlFragment
=
useSqlFragment
;
this
.
maxRowNum
=
maxRowNum
;
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
savePointPath
=
savePointPath
;
}
public
ExecutorSetting
getExecutorSetting
(){
String
type
=
Executor
.
LOCAL
;
if
(
isRemote
){
type
=
Executor
.
REMOTE
;
}
return
new
ExecutorSetting
(
host
,
type
,
checkpoint
,
parallelism
,
useSqlFragment
,
savePointPath
,
jobName
);
}
}
dlink-core/src/main/java/com/dlink/job/JobManager.java
View file @
e07cc3b7
...
...
@@ -28,13 +28,11 @@ import java.util.*;
**/
public
class
JobManager
extends
RunTime
{
private
JobHandler
handler
=
JobHandler
.
build
()
;
private
JobHandler
handler
;
private
String
flinkHost
;
private
String
jobManagerHost
;
private
Integer
jobManagerPort
;
private
Integer
port
;
private
boolean
isRemote
;
private
boolean
isSession
;
private
String
sessionId
;
private
Integer
maxRowNum
=
100
;
private
ExecutorSetting
executorSetting
;
...
...
@@ -93,21 +91,16 @@ public class JobManager extends RunTime {
}
private
Executor
createExecutor
()
{
if
(!
isSession
)
{
if
(
isRemote
)
{
executor
=
Executor
.
build
(
new
EnvironmentSetting
(
jobManagerHost
,
jobManagerPort
),
config
.
getExecutorSetting
());
return
executor
;
}
else
{
executor
=
Executor
.
build
(
null
,
executorSetting
);
return
executor
;
}
if
(
config
.
isRemote
())
{
executor
=
Executor
.
build
(
new
EnvironmentSetting
(
jobManagerHost
,
jobManagerPort
),
config
.
getExecutorSetting
());
return
executor
;
}
else
{
createExecutorWithSession
(
);
executor
=
Executor
.
build
(
null
,
executorSetting
);
return
executor
;
}
}
private
boolean
checkSession
()
{
/*
private boolean checkSession() {
if (config != null) {
String session = config.getSession();
if (session != null && !"".equals(session)) {
...
...
@@ -120,23 +113,28 @@ public class JobManager extends RunTime {
}
isSession = false;
return false;
}
}
*/
private
Executor
createExecutorWithSession
()
{
ExecutorEntity
executorEntity
=
SessionPool
.
get
(
config
.
getSession
());
if
(
executorEntity
!=
null
)
{
executor
=
executorEntity
.
getExecutor
();
}
else
{
if
(
config
.
isSession
())
{
ExecutorEntity
executorEntity
=
SessionPool
.
get
(
config
.
getSession
());
if
(
executorEntity
!=
null
)
{
executor
=
executorEntity
.
getExecutor
();
}
else
{
createExecutor
();
SessionPool
.
push
(
new
ExecutorEntity
(
config
.
getSession
(),
executor
));
}
}
else
{
createExecutor
();
SessionPool
.
push
(
new
ExecutorEntity
(
sessionId
,
executor
));
}
return
executor
;
}
@Override
public
boolean
init
()
{
handler
=
JobHandler
.
build
();
String
host
=
config
.
getHost
();
if
(
host
!=
null
&&
!(
""
).
equals
(
host
))
{
if
(
config
.
isRemote
()
&&
host
!=
null
&&
!(
""
).
equals
(
host
))
{
String
[]
strs
=
host
.
split
(
NetConstant
.
COLON
);
if
(
strs
.
length
>=
2
)
{
jobManagerHost
=
strs
[
0
];
...
...
@@ -145,12 +143,8 @@ public class JobManager extends RunTime {
jobManagerHost
=
strs
[
0
];
jobManagerPort
=
FlinkConstant
.
PORT
;
}
isRemote
=
true
;
}
else
{
isRemote
=
false
;
}
checkSession
();
createExecutor
();
createExecutorWithSession
();
return
false
;
}
...
...
@@ -161,16 +155,17 @@ public class JobManager extends RunTime {
@Override
public
boolean
success
()
{
return
false
;
return
handler
.
success
()
;
}
@Override
public
boolean
error
()
{
return
false
;
public
boolean
failed
()
{
return
handler
.
failed
()
;
}
@Override
public
boolean
close
()
{
JobContextHolder
.
clear
();
return
false
;
}
...
...
@@ -281,37 +276,37 @@ public class JobManager extends RunTime {
return
result
;
}
public
void
executeSql
(
String
statement
)
{
RunResult
runResult
=
new
RunResult
(
sessionId
,
statement
,
flinkHost
,
port
,
executorSetting
,
executorSetting
.
getJobName
());
Job
job
=
new
Job
(
config
,
jobManagerHost
+
NetConstant
.
COLON
+
jobManagerPort
,
isRemote
,
isSession
,
Job
.
JobStatus
.
INITIALIZE
,
statement
,
Job
.
JobType
.
EXECUTE
,
executorSetting
,
LocalDate
.
now
(),
executor
);
public
Integer
executeSql
(
String
statement
)
{
Job
job
=
new
Job
(
config
,
jobManagerHost
+
NetConstant
.
COLON
+
jobManagerPort
,
Job
.
JobStatus
.
INITIALIZE
,
statement
,
executorSetting
,
LocalDate
.
now
(),
executor
);
JobContextHolder
.
setJob
(
job
);
ready
();
String
[]
S
tatements
=
statement
.
split
(
";"
);
String
[]
s
tatements
=
statement
.
split
(
";"
);
int
currentIndex
=
0
;
try
{
for
(
String
item
:
S
tatements
)
{
for
(
String
item
:
s
tatements
)
{
if
(
item
.
trim
().
isEmpty
())
{
continue
;
}
currentIndex
++;
String
operationType
=
Operations
.
getOperationType
(
item
);
long
start
=
System
.
currentTimeMillis
();
CustomTableEnvironmentImpl
stEnvironment
=
executor
.
getCustomTableEnvironmentImpl
();
if
(!
FlinkInterceptor
.
build
(
stEnvironment
,
item
))
{
if
(!
FlinkInterceptor
.
build
(
executor
.
getCustomTableEnvironmentImpl
(),
item
))
{
TableResult
tableResult
=
executor
.
executeSql
(
item
);
if
(
tableResult
.
getJobClient
().
isPresent
())
{
runResult
.
setJobId
(
tableResult
.
getJobClient
().
get
().
getJobID
().
toHexString
());
job
.
setJobId
(
tableResult
.
getJobClient
().
get
().
getJobID
().
toHexString
());
}
if
(
config
.
isResult
())
{
IResult
result
=
ResultBuilder
.
build
(
operationType
,
maxRowNum
,
""
,
false
).
getResult
(
tableResult
);
job
.
setResult
(
result
);
}
IResult
result
=
ResultBuilder
.
build
(
operationType
,
maxRowNum
,
""
,
false
).
getResult
(
tableResult
);
runResult
.
setResult
(
result
);
}
long
finish
=
System
.
currentTimeMillis
();
long
timeElapsed
=
finish
-
start
;
runResult
.
setTime
(
timeElapsed
);
runResult
.
setFinishDate
(
LocalDateTime
.
now
());
runResult
.
setSuccess
(
true
);
if
(
FlinkSQLConstant
.
INSERT
.
equals
(
operationType
)||
FlinkSQLConstant
.
SELECT
.
equals
(
operationType
)){
break
;
}
}
job
.
setEndTime
(
LocalDate
.
now
());
job
.
setStatus
(
Job
.
JobStatus
.
SUCCESS
);
success
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
StackTraceElement
[]
trace
=
e
.
getStackTrace
();
...
...
@@ -319,11 +314,15 @@ public class JobManager extends RunTime {
for
(
StackTraceElement
s
:
trace
)
{
resMsg
.
append
(
" \n "
+
s
+
" "
);
}
runResult
.
setFinishDate
(
LocalDateTime
.
now
());
runResult
.
setSuccess
(
false
);
// runResult.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage());
runResult
.
setError
(
LocalDateTime
.
now
().
toString
()
+
":"
+
"运行第"
+
currentIndex
+
"行sql时出现异常:"
+
e
.
getMessage
()
+
" \n >>>堆栈信息<<<"
+
resMsg
.
toString
());
// runResult.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>异常原因<<< \n" + e.getCause().toString());
LocalDate
now
=
LocalDate
.
now
();
job
.
setEndTime
(
now
);
job
.
setStatus
(
Job
.
JobStatus
.
FAILED
);
String
error
=
now
.
toString
()
+
":"
+
"运行第"
+
currentIndex
+
"个sql时出现异常:"
+
e
.
getMessage
()
+
" \n >>>堆栈信息<<<"
+
resMsg
.
toString
();
job
.
setError
(
error
);
failed
();
close
();
}
close
();
return
job
.
getId
();
}
}
dlink-core/src/main/java/com/dlink/job/RunTime.java
View file @
e07cc3b7
...
...
@@ -14,7 +14,7 @@ public abstract class RunTime {
abstract
boolean
success
();
abstract
boolean
error
();
abstract
boolean
failed
();
abstract
boolean
close
();
}
dlink-doc/sql/dlink.sql
View file @
e07cc3b7
...
...
@@ -302,4 +302,29 @@ INSERT INTO `dlink_flink_document` VALUES (209, 'function', '内置函数', '聚
INSERT
INTO
`dlink_flink_document`
VALUES
(
210
,
'function'
,
'内置函数'
,
'列函数'
,
'withColumns(…)'
,
'选择的列'
,
'1.12'
,
0
,
1
,
'2021-02-22 15:46:48'
,
'2021-02-22 15:47:21'
);
INSERT
INTO
`dlink_flink_document`
VALUES
(
211
,
'function'
,
'内置函数'
,
'列函数'
,
'withoutColumns(…)'
,
'不选择的列'
,
'1.12'
,
0
,
1
,
'2021-02-22 15:46:48'
,
'2021-02-22 15:47:21'
);
INSERT
INTO
`dlink_flink_document`
VALUES
(
262
,
'function'
,
'UDF'
,
'表值聚合函数'
,
'TO_MAP(string1,object2[, string3])'
,
'将非规则一维表转化为规则二维表,string1是key。string2是value。string3为非必填项,表示key的值域(维度),用英文逗号分割。'
,
'1.12'
,
8
,
1
,
'2021-05-20 19:59:22'
,
'2021-05-20 20:00:54'
);
-- ----------------------------
-- Table structure for dlink_history
-- ----------------------------
DROP
TABLE
IF
EXISTS
`dlink_history`
;
CREATE
TABLE
`dlink_history`
(
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
COMMENT
'ID'
,
`cluster_id`
int
(
11
)
NOT
NULL
DEFAULT
0
COMMENT
'集群ID'
,
`session`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
COMMENT
'会话'
,
`job_id`
varchar
(
50
)
NULL
DEFAULT
NULL
COMMENT
'JobID'
,
`job_name`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
COMMENT
'作业名'
,
`job_manager_address`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
COMMENT
'JM地址'
,
`status`
int
(
1
)
NOT
NULL
DEFAULT
0
COMMENT
'状态'
,
`statement`
text
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
COMMENT
'语句集'
,
`error`
text
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
COMMENT
'异常信息'
,
`result`
text
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
COMMENT
'结果集'
,
`config`
text
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
COMMENT
'配置'
,
`start_time`
datetime
(
0
)
NULL
DEFAULT
NULL
COMMENT
'开始时间'
,
`end_time`
datetime
(
0
)
NULL
DEFAULT
NULL
COMMENT
'结束时间'
,
`task_id`
int
(
11
)
NULL
DEFAULT
NULL
COMMENT
'作业ID'
,
PRIMARY
KEY
(
`id`
)
USING
BTREE
,
INDEX
`task_index`
(
`task_id`
)
USING
BTREE
,
INDEX
`cluster_index`
(
`cluster_id`
)
USING
BTREE
)
ENGINE
=
InnoDB
CHARACTER
SET
=
utf8
COLLATE
=
utf8_general_ci
COMMENT
=
'执行历史'
ROW_FORMAT
=
Dynamic
;
SET
FOREIGN_KEY_CHECKS
=
1
;
dlink-doc/sql/dlink_flink_document.sql
deleted
100644 → 0
View file @
d9dff4a9
/*
Navicat Premium Data Transfer
Source Server : hetl
Source Server Type : MySQL
Source Server Version : 80015
Source Host : localhost:3306
Source Schema : dlink
Target Server Type : MySQL
Target Server Version : 80015
File Encoding : 65001
Date: 27/06/2021 23:28:39
*/
SET
NAMES
utf8mb4
;
SET
FOREIGN_KEY_CHECKS
=
0
;
-- ----------------------------
-- Table structure for dlink_flink_document
-- ----------------------------
DROP
TABLE
IF
EXISTS
`dlink_flink_document`
;
CREATE
TABLE
`dlink_flink_document`
(
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
COMMENT
'主键'
,
`category`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
COMMENT
'文档类型'
,
`type`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
COMMENT
'类型'
,
`subtype`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
COMMENT
'子类型'
,
`name`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
COMMENT
'信息'
,
`description`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
COMMENT
'描述'
,
`version`
varchar
(
255
)
CHARACTER
SET
utf8
COLLATE
utf8_general_ci
NULL
DEFAULT
NULL
COMMENT
'版本号'
,
`like_num`
int
(
255
)
NULL
DEFAULT
0
COMMENT
'喜爱值'
,
`enabled`
tinyint
(
1
)
NOT
NULL
DEFAULT
0
COMMENT
'是否启用'
,
`create_time`
datetime
(
0
)
NULL
DEFAULT
NULL
COMMENT
'创建时间'
,
`update_time`
datetime
(
0
)
NULL
DEFAULT
NULL
COMMENT
'更新时间'
,
PRIMARY
KEY
(
`id`
)
USING
BTREE
)
ENGINE
=
InnoDB
AUTO_INCREMENT
=
262
CHARACTER
SET
=
utf8
COLLATE
=
utf8_general_ci
COMMENT
=
'文档管理'
ROW_FORMAT
=
Dynamic
;
SET
FOREIGN_KEY_CHECKS
=
1
;
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment