Commit 41369531 authored by wenmo's avatar wenmo

新增运维中心的作业实例与历史切换

parent be79fe47
......@@ -12,6 +12,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
......@@ -74,7 +75,10 @@ public class JobInstanceController {
*/
@GetMapping("/getStatusCount")
public Result getStatusCount() {
return Result.succeed(jobInstanceService.getStatusCount(), "获取成功");
HashMap<String,Object> result = new HashMap<>();
result.put("history",jobInstanceService.getStatusCount(true));
result.put("instance",jobInstanceService.getStatusCount(false));
return Result.succeed(result, "获取成功");
}
/**
......
......@@ -18,5 +18,7 @@ public interface JobInstanceMapper extends SuperMapper<JobInstance> {
List<JobInstanceCount> countStatus();
List<JobInstanceCount> countHistoryStatus();
List<JobInstance> listJobInstanceActive();
}
......@@ -15,7 +15,7 @@ import java.util.List;
*/
public interface JobInstanceService extends ISuperService<JobInstance> {
JobInstanceStatus getStatusCount();
JobInstanceStatus getStatusCount(boolean isHistory);
List<JobInstance> listJobInstanceActive();
......
......@@ -44,8 +44,13 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
private JobHistoryService jobHistoryService;
@Override
public JobInstanceStatus getStatusCount() {
List<JobInstanceCount> jobInstanceCounts = baseMapper.countStatus();
public JobInstanceStatus getStatusCount(boolean isHistory) {
List<JobInstanceCount> jobInstanceCounts = null;
if(isHistory){
jobInstanceCounts = baseMapper.countHistoryStatus();
}else{
jobInstanceCounts = baseMapper.countStatus();
}
JobInstanceStatus jobInstanceStatus = new JobInstanceStatus();
Integer total = 0;
for (JobInstanceCount item : jobInstanceCounts) {
......
......@@ -9,6 +9,12 @@
(select dc.alias FROM dlink_cluster dc where dc.id=a.cluster_id) as clusterAlias
from
dlink_job_instance a
<if test='!(param.isHistory!=null and param.isHistory==true)'>
inner join (
select max(ji.id) as id from dlink_job_instance ji
group by ji.task_id
) snap on snap.id = a.id
</if>
left join dlink_history dh on a.history_id = dh.id
<where>
1=1
......@@ -45,6 +51,19 @@
</select>
<select id="countStatus" resultType="com.dlink.model.JobInstanceCount">
select
a.status,
count(1) as counts
from
dlink_job_instance a
inner join (
select max(ji.id) as id from dlink_job_instance ji
group by ji.task_id
) snap on snap.id = a.id
group by status
</select>
<select id="countHistoryStatus" resultType="com.dlink.model.JobInstanceCount">
select
status,
count(1) as counts
......
......@@ -46,17 +46,18 @@ public class FlinkAPI {
return result;
}
private JsonNode get(String route){
private JsonNode get(String route) {
String res = HttpUtil.get(NetConstant.HTTP + address + NetConstant.SLASH + route, NetConstant.SERVER_TIME_OUT_ACTIVE);
return parse(res);
}
/**
* get请求获取jobManger/TaskManager的日志 (结果为字符串并不是json格式)
*
* @param route
* @return
*/
private String getResult(String route){
private String getResult(String route) {
String res = HttpUtil.get(NetConstant.HTTP + address + NetConstant.SLASH + route, NetConstant.SERVER_TIME_OUT_ACTIVE);
return res;
}
......@@ -67,7 +68,7 @@ public class FlinkAPI {
}
private JsonNode patch(String route, String body) {
String res = HttpUtil.createRequest(Method.PATCH,NetConstant.HTTP + address + NetConstant.SLASH + route).timeout(NetConstant.SERVER_TIME_OUT_ACTIVE).body(body).execute().body();
String res = HttpUtil.createRequest(Method.PATCH, NetConstant.HTTP + address + NetConstant.SLASH + route).timeout(NetConstant.SERVER_TIME_OUT_ACTIVE).body(body).execute().body();
return parse(res);
}
......@@ -83,27 +84,27 @@ public class FlinkAPI {
return joblist;
}
public boolean stop(String jobId){
get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.CANCEL);
public boolean stop(String jobId) {
get(FlinkRestAPIConstant.JOBS + jobId + FlinkRestAPIConstant.CANCEL);
return true;
}
public SavePointResult savepoints(String jobId, String savePointType){
public SavePointResult savepoints(String jobId, String savePointType) {
SavePointType type = SavePointType.get(savePointType);
String paramType = null;
SavePointResult result = SavePointResult.build(GatewayType.YARN_PER_JOB);
JobInfo jobInfo = new JobInfo(jobId);
Map<String, Object> paramMap = new HashMap<>();
switch (type){
switch (type) {
case CANCEL:
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
case STOP:
paramMap.put("drain",false);
paramMap.put("drain", false);
paramType = FlinkRestAPIConstant.STOP;
jobInfo.setStatus(JobInfo.JobStatus.STOP);
break;
case TRIGGER:
paramMap.put("cancel-job",false);
paramMap.put("cancel-job", false);
//paramMap.put("target-directory","hdfs:///flink13/ss1");
paramType = FlinkRestAPIConstant.SAVEPOINTS;
jobInfo.setStatus(JobInfo.JobStatus.RUN);
......@@ -122,17 +123,17 @@ public class FlinkAPI {
Thread.sleep(1000);
JsonNode node = get(FlinkRestAPIConstant.JOBS + jobId + FlinkRestAPIConstant.SAVEPOINTS + NetConstant.SLASH + triggerid);
String status = node.get("status").get("id").asText();
if(Asserts.isEquals(status,"IN_PROGRESS")){
if (Asserts.isEquals(status, "IN_PROGRESS")) {
continue;
}
if(node.get("operation").has("failure-cause")) {
if (node.get("operation").has("failure-cause")) {
String failureCause = node.get("operation").get("failure-cause").asText();
if (Asserts.isNotNullString(failureCause)) {
result.fail(failureCause);
break;
}
}
if(node.get("operation").has("location")) {
if (node.get("operation").has("location")) {
String location = node.get("operation").get("location").asText();
List<JobInfo> jobInfos = new ArrayList<>();
jobInfo.setSavePoint(location);
......@@ -154,24 +155,28 @@ public class FlinkAPI {
return result.get("flink-version").asText();
}
public JsonNode getOverview() {
return get(FlinkRestAPIConstant.OVERVIEW);
}
public JsonNode getJobInfo(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId);
return get(FlinkRestAPIConstant.JOBS + jobId);
}
public JsonNode getException(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.EXCEPTIONS);
return get(FlinkRestAPIConstant.JOBS + jobId + FlinkRestAPIConstant.EXCEPTIONS);
}
public JsonNode getCheckPoints(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.CHECKPOINTS);
return get(FlinkRestAPIConstant.JOBS + jobId + FlinkRestAPIConstant.CHECKPOINTS);
}
public JsonNode getCheckPointsConfig(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.CHECKPOINTS_CONFIG);
return get(FlinkRestAPIConstant.JOBS + jobId + FlinkRestAPIConstant.CHECKPOINTS_CONFIG);
}
public JsonNode getJobsConfig(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.CONFIG);
return get(FlinkRestAPIConstant.JOBS + jobId + FlinkRestAPIConstant.CONFIG);
}
public JsonNode getJobManagerMetrics() {
......@@ -202,7 +207,6 @@ public class FlinkAPI {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.METRICS);
}
public String getTaskManagerLog(String containerId) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOG);
}
......@@ -218,6 +222,4 @@ public class FlinkAPI {
public JsonNode getTaskManagerThreadDump(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.THREAD_DUMP);
}
}
......@@ -8,6 +8,8 @@ package com.dlink.constant;
**/
public final class FlinkRestAPIConstant {
public static final String OVERVIEW = "overview";
public static final String FLINK_CONFIG = "config";
public static final String CONFIG = "/config";
......
......@@ -387,4 +387,6 @@ create table dlink_job_history
)
comment 'Job历史详情';
CREATE INDEX dlink_job_instance_task_id_IDX USING BTREE ON dlink_job_instance (task_id);
SET FOREIGN_KEY_CHECKS = 1;
......@@ -634,5 +634,9 @@ ALTER TABLE `dlink_task`
-- ----------------------------
ALTER TABLE `dlink_job_instance`
ADD COLUMN `step` INT NULL COMMENT '生命周期' AFTER `task_id`;
-- ----------------------------
-- 0.6.0-SNAPSHOT 2022-03-15
-- ----------------------------
CREATE INDEX dlink_job_instance_task_id_IDX USING BTREE ON dlink_job_instance (task_id);
SET FOREIGN_KEY_CHECKS = 1;
import { history } from 'umi';
import {queryData} from "@/components/Common/crud";
import React, {useState} from "react";
import type { ProColumns } from '@ant-design/pro-table';
import React, {useState, useRef, useEffect} from "react";
import type { ProColumns,ActionType } from '@ant-design/pro-table';
import ProTable from "@ant-design/pro-table";
import {JobInstanceTableListItem} from "@/pages/DevOps/data";
import moment from 'moment';
......@@ -14,6 +14,11 @@ const JobInstanceTable = (props: any) => {
const {status, activeKey,isHistory, dispatch} = props;
const [time, setTime] = useState(() => Date.now());
const ref = useRef<ActionType>();
useEffect(() => {
ref?.current?.reload();
}, [isHistory]);
const getColumns = () => {
const columns: ProColumns<JobInstanceTableListItem>[] = [{
......@@ -133,6 +138,7 @@ const JobInstanceTable = (props: any) => {
return (
<><ProTable
actionRef={ref}
request={(params, sorter, filter) => {
setTime(Date.now());
return queryData(url, {...params,status,isHistory, sorter: {id: 'descend'}, filter});
......
......@@ -37,12 +37,29 @@ const DevOps = (props:any) => {
{ key: JOB_STATUS.UNKNOWN, status: 'default', title: '未知', value: 0 },
];
const [statusCount, setStatusCount] = useState<any[]>(statusCountDefault);
const [statusHistoryCount, setStatusHistoryCount] = useState<any[]>(statusCountDefault);
const [activeKey, setActiveKey] = useState<string>('');
const refreshStatusCount = () => {
const res = getStatusCount();
res.then((result)=>{
const statusCountData: StatusCount = result.datas;
const statusHistoryCountData: StatusCount = result.datas.history;
const historyItems: any = [
{ key: '', title: renderSwitch(), value: statusHistoryCountData.all, total: true },
{ key: JOB_STATUS.CREATED, status: 'default', title: '已创建', value: statusHistoryCountData.created },
{ key: JOB_STATUS.INITIALIZING, status: 'default', title: '初始化', value: statusHistoryCountData.initializing },
{ key: JOB_STATUS.RUNNING, status: 'success', title: '运行中', value: statusHistoryCountData.running },
{ key: JOB_STATUS.FINISHED, status: 'processing', title: '已完成', value: statusHistoryCountData.finished },
{ key: JOB_STATUS.FAILING, status: 'error', title: '异常中', value: statusHistoryCountData.failing },
{ key: JOB_STATUS.FAILED, status: 'error', title: '已异常', value: statusHistoryCountData.failed },
{ key: JOB_STATUS.SUSPENDED, status: 'warning', title: '已暂停', value: statusHistoryCountData.suspended },
{ key: JOB_STATUS.CANCELLING, status: 'warning', title: '停止中', value: statusHistoryCountData.cancelling },
{ key: JOB_STATUS.CANCELED, status: 'warning', title: '停止', value: statusHistoryCountData.canceled },
{ key: JOB_STATUS.RESTARTING, status: 'default', title: '重启中', value: statusHistoryCountData.restarting },
{ key: JOB_STATUS.UNKNOWN, status: 'default', title: '未知', value: statusHistoryCountData.unknown },
];
setStatusHistoryCount(historyItems);
const statusCountData: StatusCount = result.datas.instance;
const items: any = [
{ key: '', title: renderSwitch(), value: statusCountData.all, total: true },
{ key: JOB_STATUS.CREATED, status: 'default', title: '已创建', value: statusCountData.created },
......@@ -77,7 +94,7 @@ const DevOps = (props:any) => {
},
}}
>
{statusCount.map((item) => (
{(isHistory?statusHistoryCount:statusCount).map((item) => (
<ProCard.TabPane
style={{ width: '100%' }}
key={item.key}
......
......@@ -749,6 +749,9 @@ export default (): React.ReactNode => {
<li>
<Link>修复 用户逻辑删除bug</Link>
</li>
<li>
<Link>新增 运维中心的作业实例与历史切换</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment