Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
df3c6187
Commit
df3c6187
authored
Mar 18, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
实时监控容错机制
parent
b7d9b0dc
Changes
11
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
146 additions
and
73 deletions
+146
-73
TaskController.java
...in/src/main/java/com/dlink/controller/TaskController.java
+6
-3
FlinkJobTask.java
dlink-admin/src/main/java/com/dlink/job/FlinkJobTask.java
+7
-2
JobInstanceMapper.java
...min/src/main/java/com/dlink/mapper/JobInstanceMapper.java
+3
-0
JobInstanceService.java
...n/src/main/java/com/dlink/service/JobInstanceService.java
+2
-0
TaskService.java
dlink-admin/src/main/java/com/dlink/service/TaskService.java
+3
-1
JobInstanceServiceImpl.java
...n/java/com/dlink/service/impl/JobInstanceServiceImpl.java
+5
-0
TaskServiceImpl.java
...src/main/java/com/dlink/service/impl/TaskServiceImpl.java
+65
-26
JobInstanceMapper.xml
dlink-admin/src/main/resources/mapper/JobInstanceMapper.xml
+21
-19
JobStatus.tsx
dlink-web/src/components/Common/JobStatus.tsx
+3
-0
index.tsx
dlink-web/src/pages/DevOps/JobInfo/index.tsx
+25
-22
Welcome.tsx
dlink-web/src/pages/Welcome.tsx
+6
-0
No files found.
dlink-admin/src/main/java/com/dlink/controller/TaskController.java
View file @
df3c6187
...
@@ -4,7 +4,6 @@ import com.dlink.common.result.ProTableResult;
...
@@ -4,7 +4,6 @@ import com.dlink.common.result.ProTableResult;
import
com.dlink.common.result.Result
;
import
com.dlink.common.result.Result
;
import
com.dlink.job.JobResult
;
import
com.dlink.job.JobResult
;
import
com.dlink.model.Task
;
import
com.dlink.model.Task
;
import
com.dlink.result.SubmitResult
;
import
com.dlink.service.TaskService
;
import
com.dlink.service.TaskService
;
import
com.fasterxml.jackson.databind.JsonNode
;
import
com.fasterxml.jackson.databind.JsonNode
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
...
@@ -174,8 +173,12 @@ public class TaskController {
...
@@ -174,8 +173,12 @@ public class TaskController {
* 重启任务
* 重启任务
*/
*/
@GetMapping
(
value
=
"/restartTask"
)
@GetMapping
(
value
=
"/restartTask"
)
public
Result
restartTask
(
@RequestParam
Integer
id
)
{
public
Result
restartTask
(
@RequestParam
Integer
id
,
@RequestParam
Boolean
isOnLine
)
{
return
Result
.
succeed
(
taskService
.
restartTask
(
id
),
"操作成功"
);
if
(
isOnLine
)
{
return
taskService
.
reOnLineTask
(
id
);
}
else
{
return
Result
.
succeed
(
taskService
.
restartTask
(
id
),
"重启成功"
);
}
}
}
}
}
dlink-admin/src/main/java/com/dlink/job/FlinkJobTask.java
View file @
df3c6187
package
com
.
dlink
.
job
;
package
com
.
dlink
.
job
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.context.SpringContextUtils
;
import
com.dlink.context.SpringContextUtils
;
import
com.dlink.daemon.constant.FlinkTaskConstant
;
import
com.dlink.daemon.constant.FlinkTaskConstant
;
import
com.dlink.daemon.pool.DefaultThreadPool
;
import
com.dlink.daemon.pool.DefaultThreadPool
;
...
@@ -12,6 +13,9 @@ import org.slf4j.Logger;
...
@@ -12,6 +13,9 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.context.annotation.DependsOn
;
import
org.springframework.context.annotation.DependsOn
;
import
java.time.Duration
;
import
java.time.LocalDateTime
;
@DependsOn
(
"springContextUtils"
)
@DependsOn
(
"springContextUtils"
)
public
class
FlinkJobTask
implements
DaemonTask
{
public
class
FlinkJobTask
implements
DaemonTask
{
...
@@ -48,8 +52,9 @@ public class FlinkJobTask implements DaemonTask {
...
@@ -48,8 +52,9 @@ public class FlinkJobTask implements DaemonTask {
}
}
}
}
preDealTime
=
System
.
currentTimeMillis
();
preDealTime
=
System
.
currentTimeMillis
();
JobInstance
jobInstance
=
taskService
.
refreshJobInstance
(
config
.
getId
());
JobInstance
jobInstance
=
taskService
.
refreshJobInstance
(
config
.
getId
(),
false
);
if
(!
JobStatus
.
isDone
(
jobInstance
.
getStatus
()))
{
if
((!
JobStatus
.
isDone
(
jobInstance
.
getStatus
()))
||
(
Asserts
.
isNotNull
(
jobInstance
.
getFinishTime
())
&&
Duration
.
between
(
jobInstance
.
getFinishTime
(),
LocalDateTime
.
now
()).
toMinutes
()
<
1
))
{
DefaultThreadPool
.
getInstance
().
execute
(
this
);
DefaultThreadPool
.
getInstance
().
execute
(
this
);
}
}
}
}
...
...
dlink-admin/src/main/java/com/dlink/mapper/JobInstanceMapper.java
View file @
df3c6187
...
@@ -21,4 +21,7 @@ public interface JobInstanceMapper extends SuperMapper<JobInstance> {
...
@@ -21,4 +21,7 @@ public interface JobInstanceMapper extends SuperMapper<JobInstance> {
List
<
JobInstanceCount
>
countHistoryStatus
();
List
<
JobInstanceCount
>
countHistoryStatus
();
List
<
JobInstance
>
listJobInstanceActive
();
List
<
JobInstance
>
listJobInstanceActive
();
JobInstance
getJobInstanceByTaskId
(
Integer
id
);
}
}
dlink-admin/src/main/java/com/dlink/service/JobInstanceService.java
View file @
df3c6187
...
@@ -25,4 +25,6 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
...
@@ -25,4 +25,6 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
JobInfoDetail
getJobInfoDetailInfo
(
JobInstance
jobInstance
);
JobInfoDetail
getJobInfoDetailInfo
(
JobInstance
jobInstance
);
LineageResult
getLineage
(
Integer
id
);
LineageResult
getLineage
(
Integer
id
);
JobInstance
getJobInstanceByTaskId
(
Integer
id
);
}
}
dlink-admin/src/main/java/com/dlink/service/TaskService.java
View file @
df3c6187
...
@@ -43,6 +43,8 @@ public interface TaskService extends ISuperService<Task> {
...
@@ -43,6 +43,8 @@ public interface TaskService extends ISuperService<Task> {
Result
onLineTask
(
Integer
id
);
Result
onLineTask
(
Integer
id
);
Result
reOnLineTask
(
Integer
id
);
Result
offLineTask
(
Integer
id
,
String
type
);
Result
offLineTask
(
Integer
id
,
String
type
);
Result
cancelTask
(
Integer
id
);
Result
cancelTask
(
Integer
id
);
...
@@ -51,7 +53,7 @@ public interface TaskService extends ISuperService<Task> {
...
@@ -51,7 +53,7 @@ public interface TaskService extends ISuperService<Task> {
boolean
savepointTask
(
Integer
taskId
,
String
savePointType
);
boolean
savepointTask
(
Integer
taskId
,
String
savePointType
);
JobInstance
refreshJobInstance
(
Integer
id
);
JobInstance
refreshJobInstance
(
Integer
id
,
boolean
isCoercive
);
JobInfoDetail
refreshJobInfoDetail
(
Integer
id
);
JobInfoDetail
refreshJobInfoDetail
(
Integer
id
);
}
}
dlink-admin/src/main/java/com/dlink/service/impl/JobInstanceServiceImpl.java
View file @
df3c6187
...
@@ -117,4 +117,9 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
...
@@ -117,4 +117,9 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
return
LineageBuilder
.
getLineage
(
getJobInfoDetail
(
id
).
getHistory
().
getStatement
());
return
LineageBuilder
.
getLineage
(
getJobInfoDetail
(
id
).
getHistory
().
getStatement
());
}
}
@Override
public
JobInstance
getJobInstanceByTaskId
(
Integer
id
)
{
return
baseMapper
.
getJobInstanceByTaskId
(
id
);
}
}
}
dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java
View file @
df3c6187
...
@@ -11,6 +11,8 @@ import com.dlink.assertion.Tips;
...
@@ -11,6 +11,8 @@ import com.dlink.assertion.Tips;
import
com.dlink.common.result.Result
;
import
com.dlink.common.result.Result
;
import
com.dlink.config.Dialect
;
import
com.dlink.config.Dialect
;
import
com.dlink.constant.FlinkRestResultConstant
;
import
com.dlink.constant.FlinkRestResultConstant
;
import
com.dlink.daemon.task.DaemonFactory
;
import
com.dlink.daemon.task.DaemonTaskConfig
;
import
com.dlink.db.service.impl.SuperServiceImpl
;
import
com.dlink.db.service.impl.SuperServiceImpl
;
import
com.dlink.dto.SqlDTO
;
import
com.dlink.dto.SqlDTO
;
import
com.dlink.exception.BusException
;
import
com.dlink.exception.BusException
;
...
@@ -19,10 +21,7 @@ import com.dlink.gateway.config.SavePointStrategy;
...
@@ -19,10 +21,7 @@ import com.dlink.gateway.config.SavePointStrategy;
import
com.dlink.gateway.config.SavePointType
;
import
com.dlink.gateway.config.SavePointType
;
import
com.dlink.gateway.model.JobInfo
;
import
com.dlink.gateway.model.JobInfo
;
import
com.dlink.gateway.result.SavePointResult
;
import
com.dlink.gateway.result.SavePointResult
;
import
com.dlink.job.Job
;
import
com.dlink.job.*
;
import
com.dlink.job.JobConfig
;
import
com.dlink.job.JobManager
;
import
com.dlink.job.JobResult
;
import
com.dlink.mapper.TaskMapper
;
import
com.dlink.mapper.TaskMapper
;
import
com.dlink.metadata.driver.Driver
;
import
com.dlink.metadata.driver.Driver
;
import
com.dlink.metadata.result.JdbcSelectResult
;
import
com.dlink.metadata.result.JdbcSelectResult
;
...
@@ -35,6 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired;
...
@@ -35,6 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
java.time.Duration
;
import
java.time.LocalDateTime
;
import
java.time.LocalDateTime
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.List
;
...
@@ -123,7 +123,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -123,7 +123,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Task
task
=
this
.
getTaskInfoById
(
id
);
Task
task
=
this
.
getTaskInfoById
(
id
);
Asserts
.
checkNull
(
task
,
Tips
.
TASK_NOT_EXIST
);
Asserts
.
checkNull
(
task
,
Tips
.
TASK_NOT_EXIST
);
if
(
Asserts
.
isNotNull
(
task
.
getJobInstanceId
())
&&
task
.
getJobInstanceId
()
!=
0
)
{
if
(
Asserts
.
isNotNull
(
task
.
getJobInstanceId
())
&&
task
.
getJobInstanceId
()
!=
0
)
{
savepoint
Task
(
task
,
SavePointType
.
CANCEL
.
getValue
());
savepoint
JobInstance
(
task
.
getJobInstanceId
()
,
SavePointType
.
CANCEL
.
getValue
());
}
}
if
(
Dialect
.
isSql
(
task
.
getDialect
()))
{
if
(
Dialect
.
isSql
(
task
.
getDialect
()))
{
return
executeCommonSql
(
SqlDTO
.
build
(
task
.
getStatement
(),
return
executeCommonSql
(
SqlDTO
.
build
(
task
.
getStatement
(),
...
@@ -222,8 +222,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -222,8 +222,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if
(
statement
!=
null
)
{
if
(
statement
!=
null
)
{
task
.
setStatement
(
statement
.
getStatement
());
task
.
setStatement
(
statement
.
getStatement
());
}
}
if
(
Asserts
.
isNull
(
task
.
getJobInstanceId
())
||
task
.
getJobInstanceId
()
==
0
)
{
JobInstance
jobInstance
=
jobInstanceService
.
getJobInstanceByTaskId
(
id
);
if
(
Asserts
.
isNotNull
(
jobInstance
)
&&
!
JobStatus
.
isDone
(
jobInstance
.
getStatus
()))
{
task
.
setJobInstanceId
(
jobInstance
.
getId
());
}
}
}
}
return
task
;
return
task
;
}
}
...
@@ -306,7 +311,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -306,7 +311,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
@Override
public
Result
releaseTask
(
Integer
id
)
{
public
Result
releaseTask
(
Integer
id
)
{
Task
task
=
getById
(
id
);
Task
task
=
get
TaskInfo
ById
(
id
);
Assert
.
check
(
task
);
Assert
.
check
(
task
);
if
(
JobLifeCycle
.
DEVELOP
.
equalsValue
(
task
.
getStep
()))
{
if
(
JobLifeCycle
.
DEVELOP
.
equalsValue
(
task
.
getStep
()))
{
List
<
SqlExplainResult
>
sqlExplainResults
=
explainTask
(
id
);
List
<
SqlExplainResult
>
sqlExplainResults
=
explainTask
(
id
);
...
@@ -327,7 +332,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -327,7 +332,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
@Override
public
boolean
developTask
(
Integer
id
)
{
public
boolean
developTask
(
Integer
id
)
{
Task
task
=
getById
(
id
);
Task
task
=
get
TaskInfo
ById
(
id
);
Assert
.
check
(
task
);
Assert
.
check
(
task
);
if
(
JobLifeCycle
.
RELEASE
.
equalsValue
(
task
.
getStep
()))
{
if
(
JobLifeCycle
.
RELEASE
.
equalsValue
(
task
.
getStep
()))
{
task
.
setStep
(
JobLifeCycle
.
DEVELOP
.
getValue
());
task
.
setStep
(
JobLifeCycle
.
DEVELOP
.
getValue
());
...
@@ -338,7 +343,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -338,7 +343,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
@Override
public
Result
onLineTask
(
Integer
id
)
{
public
Result
onLineTask
(
Integer
id
)
{
Task
task
=
getById
(
id
);
Task
task
=
get
TaskInfo
ById
(
id
);
Assert
.
check
(
task
);
Assert
.
check
(
task
);
if
(
JobLifeCycle
.
RELEASE
.
equalsValue
(
task
.
getStep
()))
{
if
(
JobLifeCycle
.
RELEASE
.
equalsValue
(
task
.
getStep
()))
{
if
(
Asserts
.
isNotNull
(
task
.
getJobInstanceId
())
&&
task
.
getJobInstanceId
()
!=
0
)
{
if
(
Asserts
.
isNotNull
(
task
.
getJobInstanceId
())
&&
task
.
getJobInstanceId
()
!=
0
)
{
...
@@ -360,9 +365,30 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -360,9 +365,30 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return
Result
.
failed
(
"上线失败,作业不存在。"
);
return
Result
.
failed
(
"上线失败,作业不存在。"
);
}
}
@Override
public
Result
reOnLineTask
(
Integer
id
)
{
Task
task
=
this
.
getTaskInfoById
(
id
);
Asserts
.
checkNull
(
task
,
Tips
.
TASK_NOT_EXIST
);
if
(
Asserts
.
isNotNull
(
task
.
getJobInstanceId
())
&&
task
.
getJobInstanceId
()
!=
0
)
{
savepointJobInstance
(
task
.
getJobInstanceId
(),
SavePointType
.
CANCEL
.
getValue
());
}
JobResult
jobResult
=
submitTaskToOnline
(
id
);
if
(
Job
.
JobStatus
.
SUCCESS
==
jobResult
.
getStatus
())
{
task
.
setStep
(
JobLifeCycle
.
ONLINE
.
getValue
());
task
.
setJobInstanceId
(
jobResult
.
getJobInstanceId
());
if
(
updateById
(
task
))
{
return
Result
.
succeed
(
jobResult
,
"重新上线成功"
);
}
else
{
return
Result
.
failed
(
"由于未知原因,重新上线失败"
);
}
}
else
{
return
Result
.
failed
(
"重新上线失败,原因:"
+
jobResult
.
getError
());
}
}
@Override
@Override
public
Result
offLineTask
(
Integer
id
,
String
type
)
{
public
Result
offLineTask
(
Integer
id
,
String
type
)
{
Task
task
=
getById
(
id
);
Task
task
=
get
TaskInfo
ById
(
id
);
Assert
.
check
(
task
);
Assert
.
check
(
task
);
if
(
Asserts
.
isNullString
(
type
))
{
if
(
Asserts
.
isNullString
(
type
))
{
type
=
SavePointType
.
CANCEL
.
getValue
();
type
=
SavePointType
.
CANCEL
.
getValue
();
...
@@ -384,7 +410,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -384,7 +410,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
@Override
public
Result
cancelTask
(
Integer
id
)
{
public
Result
cancelTask
(
Integer
id
)
{
Task
task
=
getById
(
id
);
Task
task
=
get
TaskInfo
ById
(
id
);
Assert
.
check
(
task
);
Assert
.
check
(
task
);
if
(
JobLifeCycle
.
ONLINE
!=
JobLifeCycle
.
get
(
task
.
getStep
()))
{
if
(
JobLifeCycle
.
ONLINE
!=
JobLifeCycle
.
get
(
task
.
getStep
()))
{
if
(
Asserts
.
isNotNull
(
task
.
getJobInstanceId
())
&&
task
.
getJobInstanceId
()
!=
0
)
{
if
(
Asserts
.
isNotNull
(
task
.
getJobInstanceId
())
&&
task
.
getJobInstanceId
()
!=
0
)
{
...
@@ -402,7 +428,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -402,7 +428,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
@Override
public
boolean
recoveryTask
(
Integer
id
)
{
public
boolean
recoveryTask
(
Integer
id
)
{
Task
task
=
getById
(
id
);
Task
task
=
get
TaskInfo
ById
(
id
);
Assert
.
check
(
task
);
Assert
.
check
(
task
);
if
(
JobLifeCycle
.
CANCEL
==
JobLifeCycle
.
get
(
task
.
getStep
()))
{
if
(
JobLifeCycle
.
CANCEL
==
JobLifeCycle
.
get
(
task
.
getStep
()))
{
task
.
setStep
(
JobLifeCycle
.
DEVELOP
.
getValue
());
task
.
setStep
(
JobLifeCycle
.
DEVELOP
.
getValue
());
...
@@ -411,15 +437,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -411,15 +437,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return
false
;
return
false
;
}
}
private
boolean
savepointTask
(
Task
task
,
String
savePointType
)
{
private
boolean
savepointJobInstance
(
Integer
jobInstanceId
,
String
savePointType
)
{
Asserts
.
checkNotNull
(
task
,
"该任务不存在"
);
JobInstance
jobInstance
=
jobInstanceService
.
getById
(
jobInstanceId
);
Cluster
cluster
=
clusterService
.
getById
(
task
.
getClusterId
());
Asserts
.
checkNotNull
(
cluster
,
"该集群不存在"
);
Asserts
.
checkNotNull
(
task
.
getJobInstanceId
(),
"无任务需要SavePoint"
);
JobInstance
jobInstance
=
jobInstanceService
.
getById
(
task
.
getJobInstanceId
());
if
(
Asserts
.
isNull
(
jobInstance
))
{
if
(
Asserts
.
isNull
(
jobInstance
))
{
return
true
;
return
true
;
}
}
Cluster
cluster
=
clusterService
.
getById
(
jobInstance
.
getClusterId
());
Asserts
.
checkNotNull
(
cluster
,
"该集群不存在"
);
String
jobId
=
jobInstance
.
getJid
();
String
jobId
=
jobInstance
.
getJid
();
boolean
useGateway
=
false
;
boolean
useGateway
=
false
;
JobConfig
jobConfig
=
new
JobConfig
();
JobConfig
jobConfig
=
new
JobConfig
();
...
@@ -431,7 +455,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -431,7 +455,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
jobConfig
.
getGatewayConfig
().
getClusterConfig
().
setAppId
(
cluster
.
getName
());
jobConfig
.
getGatewayConfig
().
getClusterConfig
().
setAppId
(
cluster
.
getName
());
useGateway
=
true
;
useGateway
=
true
;
}
}
jobConfig
.
setTaskId
(
task
.
get
Id
());
jobConfig
.
setTaskId
(
jobInstance
.
getTask
Id
());
JobManager
jobManager
=
JobManager
.
build
(
jobConfig
);
JobManager
jobManager
=
JobManager
.
build
(
jobConfig
);
jobManager
.
setUseGateway
(
useGateway
);
jobManager
.
setUseGateway
(
useGateway
);
if
(
"canceljob"
.
equals
(
savePointType
))
{
if
(
"canceljob"
.
equals
(
savePointType
))
{
...
@@ -456,7 +480,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -456,7 +480,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
@Override
public
boolean
savepointTask
(
Integer
taskId
,
String
savePointType
)
{
public
boolean
savepointTask
(
Integer
taskId
,
String
savePointType
)
{
return
savepointTask
(
getById
(
taskId
),
savePointType
);
Task
task
=
getTaskInfoById
(
taskId
);
return
savepointJobInstance
(
task
.
getJobInstanceId
(),
savePointType
);
}
}
private
JobConfig
buildJobConfig
(
Task
task
)
{
private
JobConfig
buildJobConfig
(
Task
task
)
{
...
@@ -518,12 +543,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -518,12 +543,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
}
@Override
@Override
public
JobInstance
refreshJobInstance
(
Integer
id
)
{
public
JobInstance
refreshJobInstance
(
Integer
id
,
boolean
isCoercive
)
{
JobInstance
jobInstance
=
jobInstanceService
.
getById
(
id
);
JobInstance
jobInstance
=
jobInstanceService
.
getById
(
id
);
Asserts
.
checkNull
(
jobInstance
,
"该任务实例不存在"
);
Asserts
.
checkNull
(
jobInstance
,
"该任务实例不存在"
);
if
(
JobStatus
.
isDone
(
jobInstance
.
getStatus
()
))
{
if
(
!
isCoercive
&&
!
inRefreshPlan
(
jobInstance
))
{
return
jobInstance
;
return
jobInstance
;
}
}
String
status
=
jobInstance
.
getStatus
();
Cluster
cluster
=
clusterService
.
getById
(
jobInstance
.
getClusterId
());
Cluster
cluster
=
clusterService
.
getById
(
jobInstance
.
getClusterId
());
JobHistory
jobHistoryJson
=
jobHistoryService
.
refreshJobHistory
(
id
,
cluster
.
getJobManagerHost
(),
jobInstance
.
getJid
());
JobHistory
jobHistoryJson
=
jobHistoryService
.
refreshJobHistory
(
id
,
cluster
.
getJobManagerHost
(),
jobInstance
.
getJid
());
JobHistory
jobHistory
=
jobHistoryService
.
getJobHistoryInfo
(
jobHistoryJson
);
JobHistory
jobHistory
=
jobHistoryService
.
getJobHistoryInfo
(
jobHistoryJson
);
...
@@ -533,16 +559,29 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -533,16 +559,29 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
jobInstance
.
setDuration
(
jobHistory
.
getJob
().
get
(
FlinkRestResultConstant
.
JOB_DURATION
).
asLong
()
/
1000
);
jobInstance
.
setDuration
(
jobHistory
.
getJob
().
get
(
FlinkRestResultConstant
.
JOB_DURATION
).
asLong
()
/
1000
);
jobInstance
.
setStatus
(
jobHistory
.
getJob
().
get
(
FlinkRestResultConstant
.
JOB_STATE
).
asText
());
jobInstance
.
setStatus
(
jobHistory
.
getJob
().
get
(
FlinkRestResultConstant
.
JOB_STATE
).
asText
());
}
}
jobInstanceService
.
updateById
(
jobInstance
);
if
(
JobStatus
.
isDone
(
jobInstance
.
getStatus
())
&&
!
status
.
equals
(
jobInstance
.
getStatus
()))
{
if
(
JobStatus
.
isDone
(
jobInstance
.
getStatus
()))
{
jobInstance
.
setFinishTime
(
LocalDateTime
.
now
());
handleJobDone
(
jobInstance
);
handleJobDone
(
jobInstance
);
}
}
if
(
isCoercive
)
{
DaemonFactory
.
addTask
(
DaemonTaskConfig
.
build
(
FlinkJobTask
.
TYPE
,
jobInstance
.
getId
()));
}
jobInstanceService
.
updateById
(
jobInstance
);
return
jobInstance
;
return
jobInstance
;
}
}
private
boolean
inRefreshPlan
(
JobInstance
jobInstance
)
{
if
((!
JobStatus
.
isDone
(
jobInstance
.
getStatus
()))
||
(
Asserts
.
isNotNull
(
jobInstance
.
getFinishTime
())
&&
Duration
.
between
(
jobInstance
.
getFinishTime
(),
LocalDateTime
.
now
()).
toMinutes
()
<
1
))
{
return
true
;
}
else
{
return
false
;
}
}
@Override
@Override
public
JobInfoDetail
refreshJobInfoDetail
(
Integer
id
)
{
public
JobInfoDetail
refreshJobInfoDetail
(
Integer
id
)
{
return
jobInstanceService
.
getJobInfoDetailInfo
(
refreshJobInstance
(
id
));
return
jobInstanceService
.
getJobInfoDetailInfo
(
refreshJobInstance
(
id
,
true
));
}
}
private
void
handleJobDone
(
JobInstance
jobInstance
)
{
private
void
handleJobDone
(
JobInstance
jobInstance
)
{
...
@@ -553,7 +592,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -553,7 +592,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Task
updateTask
=
new
Task
();
Task
updateTask
=
new
Task
();
updateTask
.
setId
(
jobInstance
.
getTaskId
());
updateTask
.
setId
(
jobInstance
.
getTaskId
());
updateTask
.
setJobInstanceId
(
0
);
updateTask
.
setJobInstanceId
(
0
);
if
(!
JobLifeCycle
.
ONLINE
.
equalsValue
(
task
.
getStep
()))
{
if
(!
JobLifeCycle
.
ONLINE
.
equalsValue
(
jobInstance
.
getStep
()))
{
updateById
(
updateTask
);
updateById
(
updateTask
);
return
;
return
;
}
}
...
...
dlink-admin/src/main/resources/mapper/JobInstanceMapper.xml
View file @
df3c6187
...
@@ -51,33 +51,35 @@
...
@@ -51,33 +51,35 @@
</select>
</select>
<select
id=
"countStatus"
resultType=
"com.dlink.model.JobInstanceCount"
>
<select
id=
"countStatus"
resultType=
"com.dlink.model.JobInstanceCount"
>
select
select a.status,
a.status,
count(1) as counts
count(1) as counts
from dlink_job_instance a
from
inner join (
dlink_job_instance a
select max(ji.id) as id
inner join (
from dlink_job_instance ji
select max(ji.id) as id from dlink_job_instance ji
group by ji.task_id
group by ji.task_id
) snap on snap.id = a.id
) snap on snap.id = a.id
group by status
group by status
</select>
</select>
<select
id=
"countHistoryStatus"
resultType=
"com.dlink.model.JobInstanceCount"
>
<select
id=
"countHistoryStatus"
resultType=
"com.dlink.model.JobInstanceCount"
>
select
select status,
status,
count(1) as counts
count(1) as counts
from dlink_job_instance
from
dlink_job_instance
group by status
group by status
</select>
</select>
<select
id=
"listJobInstanceActive"
resultType=
"com.dlink.model.JobInstance"
>
<select
id=
"listJobInstanceActive"
resultType=
"com.dlink.model.JobInstance"
>
select
select *
*
from dlink_job_instance
from
where status not in ('FAILED', 'CANCELED', 'FINISHED', 'UNKNOWN')
dlink_job_instance
where status not in ('FAILED','CANCELED','FINISHED','UNKNOWN')
order by id desc
order by id desc
</select>
</select>
<select
id=
"getJobInstanceByTaskId"
resultType=
"com.dlink.model.JobInstance"
>
select *
from dlink_job_instance
where task_id = #{id}
order by id desc limit 1
</select>
</mapper>
</mapper>
dlink-web/src/components/Common/JobStatus.tsx
View file @
df3c6187
...
@@ -64,6 +64,9 @@ const JobStatus = (props: JobStatusFormProps) => {
...
@@ -64,6 +64,9 @@ const JobStatus = (props: JobStatusFormProps) => {
</
Tag
>)
:
(
status
===
'RESTARTING'
)
?
</
Tag
>)
:
(
status
===
'RESTARTING'
)
?
(<
Tag
icon=
{
<
ClockCircleOutlined
/>
}
color=
"default"
>
(<
Tag
icon=
{
<
ClockCircleOutlined
/>
}
color=
"default"
>
RESTARTING
RESTARTING
</
Tag
>)
:
(
status
===
'CREATED'
)
?
(<
Tag
icon=
{
<
ClockCircleOutlined
/>
}
color=
"default"
>
CREATED
</
Tag
>)
:
</
Tag
>)
:
(<
Tag
icon=
{
<
QuestionCircleOutlined
/>
}
color=
"default"
>
(<
Tag
icon=
{
<
QuestionCircleOutlined
/>
}
color=
"default"
>
UNKNOWEN
UNKNOWEN
...
...
dlink-web/src/pages/DevOps/JobInfo/index.tsx
View file @
df3c6187
...
@@ -15,7 +15,7 @@ import Config from "@/pages/DevOps/JobInfo/Config";
...
@@ -15,7 +15,7 @@ import Config from "@/pages/DevOps/JobInfo/Config";
import
JobStatus
,
{
isStatusDone
}
from
"@/components/Common/JobStatus"
;
import
JobStatus
,
{
isStatusDone
}
from
"@/components/Common/JobStatus"
;
import
{
cancelJob
,
offLineTask
,
restartJob
}
from
"@/components/Studio/StudioEvent/DDL"
;
import
{
cancelJob
,
offLineTask
,
restartJob
}
from
"@/components/Studio/StudioEvent/DDL"
;
import
{
CODE
}
from
"@/components/Common/crud"
;
import
{
CODE
}
from
"@/components/Common/crud"
;
import
JobLifeCycle
from
"@/components/Common/JobLifeCycle"
;
import
JobLifeCycle
,
{
JOB_LIFE_CYCLE
}
from
"@/components/Common/JobLifeCycle"
;
import
Exception
from
"@/pages/DevOps/JobInfo/Exception"
;
import
Exception
from
"@/pages/DevOps/JobInfo/Exception"
;
import
FlinkSQL
from
"@/pages/DevOps/JobInfo/FlinkSQL"
;
import
FlinkSQL
from
"@/pages/DevOps/JobInfo/FlinkSQL"
;
import
Alert
from
"@/pages/DevOps/JobInfo/Alert"
;
import
Alert
from
"@/pages/DevOps/JobInfo/Alert"
;
...
@@ -61,7 +61,7 @@ const JobInfo = (props: any) => {
...
@@ -61,7 +61,7 @@ const JobInfo = (props: any) => {
};
};
const
handleSavepoint
=
(
key
:
string
)
=>
{
const
handleSavepoint
=
(
key
:
string
)
=>
{
if
(
key
==
'canceljob'
)
{
if
(
key
==
'canceljob'
)
{
Modal
.
confirm
({
Modal
.
confirm
({
title
:
'停止任务'
,
title
:
'停止任务'
,
content
:
`确定只停止该作业,不进行 SavePoint 操作吗?`
,
content
:
`确定只停止该作业,不进行 SavePoint 操作吗?`
,
...
@@ -72,10 +72,10 @@ const JobInfo = (props: any) => {
...
@@ -72,10 +72,10 @@ const JobInfo = (props: any) => {
const
res
=
cancelJob
(
job
?.
cluster
?.
id
,
job
?.
instance
?.
jid
);
const
res
=
cancelJob
(
job
?.
cluster
?.
id
,
job
?.
instance
?.
jid
);
res
.
then
((
result
)
=>
{
res
.
then
((
result
)
=>
{
if
(
result
.
code
==
CODE
.
SUCCESS
)
{
if
(
result
.
code
==
CODE
.
SUCCESS
)
{
message
.
success
(
key
+
"成功"
);
message
.
success
(
key
+
"成功"
);
handleGetJobInfoDetail
();
handleGetJobInfoDetail
();
}
else
{
}
else
{
message
.
error
(
key
+
"失败"
);
message
.
error
(
key
+
"失败"
);
}
}
});
});
}
}
...
@@ -83,19 +83,19 @@ const JobInfo = (props: any) => {
...
@@ -83,19 +83,19 @@ const JobInfo = (props: any) => {
return
;
return
;
}
}
Modal
.
confirm
({
Modal
.
confirm
({
title
:
key
+
'任务'
,
title
:
key
+
'任务'
,
content
:
`确定
${
key
}
该作业吗?`
,
content
:
`确定
${
key
}
该作业吗?`
,
okText
:
'确认'
,
okText
:
'确认'
,
cancelText
:
'取消'
,
cancelText
:
'取消'
,
onOk
:
async
()
=>
{
onOk
:
async
()
=>
{
if
(
!
job
?.
cluster
?.
id
)
return
;
if
(
!
job
?.
cluster
?.
id
)
return
;
const
res
=
offLineTask
(
job
?.
instance
?.
taskId
,
key
);
const
res
=
offLineTask
(
job
?.
instance
?.
taskId
,
key
);
res
.
then
((
result
)
=>
{
res
.
then
((
result
)
=>
{
if
(
result
.
code
==
CODE
.
SUCCESS
)
{
if
(
result
.
code
==
CODE
.
SUCCESS
)
{
message
.
success
(
key
+
"成功"
);
message
.
success
(
key
+
"成功"
);
handleGetJobInfoDetail
();
handleGetJobInfoDetail
();
}
else
{
}
else
{
message
.
error
(
key
+
"失败"
);
message
.
error
(
key
+
"失败"
);
}
}
});
});
}
}
...
@@ -110,7 +110,7 @@ const JobInfo = (props: any) => {
...
@@ -110,7 +110,7 @@ const JobInfo = (props: any) => {
cancelText
:
'取消'
,
cancelText
:
'取消'
,
onOk
:
async
()
=>
{
onOk
:
async
()
=>
{
if
(
!
job
?.
cluster
?.
id
)
return
;
if
(
!
job
?.
cluster
?.
id
)
return
;
const
res
=
restartJob
(
job
?.
instance
?.
taskId
);
const
res
=
restartJob
(
job
?.
instance
?.
taskId
,
job
?.
instance
?.
step
==
JOB_LIFE_CYCLE
.
ONLINE
);
res
.
then
((
result
)
=>
{
res
.
then
((
result
)
=>
{
if
(
result
.
code
==
CODE
.
SUCCESS
)
{
if
(
result
.
code
==
CODE
.
SUCCESS
)
{
message
.
success
(
"重新上线成功"
);
message
.
success
(
"重新上线成功"
);
...
@@ -126,16 +126,19 @@ const JobInfo = (props: any) => {
...
@@ -126,16 +126,19 @@ const JobInfo = (props: any) => {
let
buttons
=
[
let
buttons
=
[
<
Button
key=
"back"
type=
"dashed"
onClick=
{
handleBack
}
>
返回
</
Button
>,
<
Button
key=
"back"
type=
"dashed"
onClick=
{
handleBack
}
>
返回
</
Button
>,
];
];
if
(
!
isStatusDone
(
job
?.
instance
?.
status
as
string
)){
buttons
.
push
(<
Button
key=
"refresh"
icon=
{
<
RedoOutlined
/>
}
onClick=
{
handleRefreshJobInfoDetail
}
/>);
buttons
.
push
(<
Button
key=
"refresh"
icon=
{
<
RedoOutlined
/>
}
onClick=
{
handleRefreshJobInfoDetail
}
/>);
if
(
!
isStatusDone
(
job
?.
instance
?.
status
as
string
))
{
buttons
.
push
(<
Button
key=
"flinkwebui"
>
buttons
.
push
(<
Button
key=
"flinkwebui"
>
<
Link
href=
{
`http://${job?.history?.jobManagerAddress}/#/job/${job?.instance?.jid}/overview`
}
target=
"_blank"
>
<
Link
href=
{
`http://${job?.history?.jobManagerAddress}/#/job/${job?.instance?.jid}/overview`
}
target=
"_blank"
>
FlinkWebUI
FlinkWebUI
</
Link
></
Button
>);
</
Link
></
Button
>);
}
}
buttons
.
push
(<
Button
key=
"autorestart"
type=
"primary"
onClick=
{
handleRestart
}
>
重新
{
job
?.
instance
?.
step
==
5
?
'上线'
:
'启动'
}
</
Button
>);
buttons
.
push
(<
Button
key=
"autorestart"
type=
"primary"
if
(
!
isStatusDone
(
job
?.
instance
?.
status
as
string
)){
onClick=
{
handleRestart
}
>
重新
{
job
?.
instance
?.
step
==
5
?
'上线'
:
'启动'
}
</
Button
>);
buttons
.
push
(<
Button
key=
"autostop"
type=
"primary"
danger
onClick=
{
()
=>
{
handleSavepoint
(
'cancel'
)}
}
>
{
job
?.
instance
?.
step
==
5
?
'下线'
:
'智能停止'
}
</
Button
>);
if
(
!
isStatusDone
(
job
?.
instance
?.
status
as
string
))
{
buttons
.
push
(<
Button
key=
"autostop"
type=
"primary"
danger
onClick=
{
()
=>
{
handleSavepoint
(
'cancel'
)
}
}
>
{
job
?.
instance
?.
step
==
5
?
'下线'
:
'智能停止'
}
</
Button
>);
buttons
.
push
(<
Dropdown
buttons
.
push
(<
Dropdown
key=
"dropdown"
key=
"dropdown"
trigger=
{
[
'click'
]
}
trigger=
{
[
'click'
]
}
...
@@ -256,16 +259,16 @@ const JobInfo = (props: any) => {
...
@@ -256,16 +259,16 @@ const JobInfo = (props: any) => {
<
ProCard
>
<
ProCard
>
{
tabKey
===
'base'
?
<
BaseInfo
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'base'
?
<
BaseInfo
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'config'
?
<
Config
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'config'
?
<
Config
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'cluster'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'cluster'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'snapshot'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'snapshot'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'exception'
?
<
Exception
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'exception'
?
<
Exception
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'log'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'log'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'optimize'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'optimize'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'flinksql'
?
<
FlinkSQL
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'flinksql'
?
<
FlinkSQL
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'datamap'
?
<
DataMap
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'datamap'
?
<
DataMap
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'olap'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'olap'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'version'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'version'
?
<
Empty
image=
{
Empty
.
PRESENTED_IMAGE_SIMPLE
}
/>
:
undefined
}
{
tabKey
===
'alert'
?
<
Alert
job=
{
job
}
/>
:
undefined
}
{
tabKey
===
'alert'
?
<
Alert
job=
{
job
}
/>
:
undefined
}
</
ProCard
>
</
ProCard
>
</
PageContainer
>
</
PageContainer
>
);
);
...
...
dlink-web/src/pages/Welcome.tsx
View file @
df3c6187
...
@@ -773,6 +773,12 @@ export default (): React.ReactNode => {
...
@@ -773,6 +773,12 @@ export default (): React.ReactNode => {
<
li
>
<
li
>
<
Link
>
新增 Hive 数据源注册、元数据、查询和执行
</
Link
>
<
Link
>
新增 Hive 数据源注册、元数据、查询和执行
</
Link
>
</
li
>
</
li
>
<
li
>
<
Link
>
新增 作业剪切和粘贴
</
Link
>
</
li
>
<
li
>
<
Link
>
新增 实时任务监控容错机制
</
Link
>
</
li
>
</
ul
>
</
ul
>
</
Paragraph
>
</
Paragraph
>
</
Timeline
.
Item
>
</
Timeline
.
Item
>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment