Unverified Commit 6ae871f2 authored by xiaoguaiguai's avatar xiaoguaiguai Committed by GitHub

[Feature-649][admin,web] update taskManager tableForm of DevOps (#657)

* add taskManagers tableForm of DevOps

* Fixed the problem that when the job is not running, the cluster configuration is still obtained through flinkapi, resulting in an error
parent 4ce997d6
package com.dlink.job;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.model.JobManagerConfiguration;
import com.dlink.model.TaskContainerConfigInfo;
import com.dlink.model.TaskManagerConfiguration;
import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.*;
/**
* @program: dlink
* @description:
* @author: zhumingye
* @create: 2022-06-28 19:00
*/
public class BuildConfiguration {
/**
* @return void
* @Author: zhumingye
* @date: 2022/6/27
* @Description: buildJobManagerConfiguration
* @Params: [jobManagerConfiguration, flinkAPI]
*/
public static void buildJobManagerConfiguration(JobManagerConfiguration jobManagerConfiguration, FlinkAPI flinkAPI) {
Map<String, String> jobManagerMetricsMap = new HashMap<String, String>(); //获取jobManager metrics
List<LinkedHashMap> jobManagerMetricsItemsList = JSONUtil.toList(JSONUtil.toJsonString(flinkAPI.getJobManagerMetrics()), LinkedHashMap.class);
jobManagerMetricsItemsList.forEach(mapItems -> {
String configKey = (String) mapItems.get("id");
String configValue = (String) mapItems.get("value");
if (Asserts.isNotNullString(configKey) && Asserts.isNotNullString(configValue)) {
jobManagerMetricsMap.put(configKey, configValue);
}
});
Map<String, String> jobManagerConfigMap = new HashMap<String, String>();//获取jobManager配置信息
List<LinkedHashMap> jobManagerConfigMapItemsList = JSONUtil.toList(JSONUtil.toJsonString(flinkAPI.getJobManagerConfig()), LinkedHashMap.class);
jobManagerConfigMapItemsList.forEach(mapItems -> {
String configKey = (String) mapItems.get("key");
String configValue = (String) mapItems.get("value");
if (Asserts.isNotNullString(configKey) && Asserts.isNotNullString(configValue)) {
jobManagerConfigMap.put(configKey, configValue);
}
});
String jobMangerLog = flinkAPI.getJobManagerLog(); //获取jobManager日志
String jobManagerStdOut = flinkAPI.getJobManagerStdOut(); //获取jobManager标准输出日志
jobManagerConfiguration.setMetrics(jobManagerMetricsMap);
jobManagerConfiguration.setJobManagerConfig(jobManagerConfigMap);
jobManagerConfiguration.setJobManagerLog(jobMangerLog);
jobManagerConfiguration.setJobManagerStdout(jobManagerStdOut);
}
/**
* @Author: zhumingye
* @date: 2022/6/27
* @Description: buildTaskManagerConfiguration
* @Params: [taskManagerConfigurationList, flinkAPI, taskManagerContainers]
* @return void
*/
public static void buildTaskManagerConfiguration(Set<TaskManagerConfiguration> taskManagerConfigurationList, FlinkAPI flinkAPI, JsonNode taskManagerContainers) {
if (Asserts.isNotNull(taskManagerContainers)) {
JsonNode taskmanagers = taskManagerContainers.get("taskmanagers");
for (JsonNode taskManagers : taskmanagers) {
TaskManagerConfiguration taskManagerConfiguration = new TaskManagerConfiguration();
/**
* 解析 taskManager 的配置信息
*/
String containerId = taskManagers.get("id").asText();// 获取container id
String containerPath = taskManagers.get("path").asText(); // 获取container path
Integer dataPort = taskManagers.get("dataPort").asInt(); // 获取container dataPort
Integer jmxPort =taskManagers.get("jmxPort").asInt(); // 获取container jmxPort
Long timeSinceLastHeartbeat =taskManagers.get("timeSinceLastHeartbeat").asLong(); // 获取container timeSinceLastHeartbeat
Integer slotsNumber =taskManagers.get("slotsNumber").asInt(); // 获取container slotsNumber
Integer freeSlots = taskManagers.get("freeSlots").asInt(); // 获取container freeSlots
String totalResource = JSONUtil.toJsonString(taskManagers.get("totalResource")); // 获取container totalResource
String freeResource = JSONUtil.toJsonString(taskManagers.get("freeResource") ); // 获取container freeResource
String hardware = JSONUtil.toJsonString(taskManagers.get("hardware") ); // 获取container hardware
String memoryConfiguration = JSONUtil.toJsonString(taskManagers.get("memoryConfiguration") ); // 获取container memoryConfiguration
Asserts.checkNull(containerId, "获取不到 containerId , containerId不能为空");
JsonNode taskManagerMetrics = flinkAPI.getTaskManagerMetrics(containerId);//获取taskManager metrics
String taskManagerLog = flinkAPI.getTaskManagerLog(containerId);//获取taskManager日志
String taskManagerThreadDumps = JSONUtil.toJsonString(flinkAPI.getTaskManagerThreadDump(containerId).get("threadInfos"));//获取taskManager线程dumps
String taskManagerStdOut = flinkAPI.getTaskManagerStdOut(containerId);//获取taskManager标准输出日志
Map<String, String> taskManagerMetricsMap = new HashMap<String, String>(); //获取taskManager metrics
List<LinkedHashMap> taskManagerMetricsItemsList = JSONUtil.toList(JSONUtil.toJsonString(taskManagerMetrics), LinkedHashMap.class);
taskManagerMetricsItemsList.forEach(mapItems -> {
String configKey = (String) mapItems.get("id");
String configValue = (String) mapItems.get("value");
if (Asserts.isNotNullString(configKey) && Asserts.isNotNullString(configValue)) {
taskManagerMetricsMap.put(configKey, configValue);
}
});
/**
* TaskManagerConfiguration 赋值
*/
taskManagerConfiguration.setContainerId(containerId);
taskManagerConfiguration.setContainerPath(containerPath);
taskManagerConfiguration.setDataPort(dataPort);
taskManagerConfiguration.setJmxPort(jmxPort);
taskManagerConfiguration.setTimeSinceLastHeartbeat(timeSinceLastHeartbeat);
taskManagerConfiguration.setSlotsNumber(slotsNumber);
taskManagerConfiguration.setFreeSlots(freeSlots);
taskManagerConfiguration.setTotalResource(totalResource);
taskManagerConfiguration.setFreeResource(freeResource);
taskManagerConfiguration.setHardware(hardware);
taskManagerConfiguration.setMemoryConfiguration(memoryConfiguration);
/**
* TaskContainerConfigInfo 赋值
*/
TaskContainerConfigInfo taskContainerConfigInfo = new TaskContainerConfigInfo();
taskContainerConfigInfo.setMetrics(taskManagerMetricsMap);
taskContainerConfigInfo.setTaskManagerLog(taskManagerLog);
taskContainerConfigInfo.setTaskManagerThreadDump(taskManagerThreadDumps);
taskContainerConfigInfo.setTaskManagerStdout(taskManagerStdOut);
taskManagerConfiguration.setTaskContainerConfigInfo(taskContainerConfigInfo);
// 将taskManagerConfiguration添加到set集合中
taskManagerConfigurationList.add(taskManagerConfiguration);
}
}
}
}
package com.dlink.service.impl;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.common.result.ProTableResult;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.db.util.ProTableUtil;
import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.job.BuildConfiguration;
import com.dlink.job.FlinkJobTaskPool;
import com.dlink.mapper.JobInstanceMapper;
import com.dlink.model.History;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.JobInstanceCount;
import com.dlink.model.JobInstanceStatus;
import com.dlink.model.JobStatus;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.HistoryService;
import com.dlink.service.JobHistoryService;
import com.dlink.service.JobInstanceService;
import com.dlink.model.*;
import com.dlink.service.*;
import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* JobInstanceServiceImpl
......@@ -120,9 +114,7 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
JobInfoDetail jobInfoDetail;
FlinkJobTaskPool pool = FlinkJobTaskPool.getInstance();
String key = jobInstance.getId().toString();
if (pool.exist(key)) {
jobInfoDetail = pool.get(key);
} else {
jobInfoDetail = new JobInfoDetail(jobInstance.getId());
jobInfoDetail.setInstance(jobInstance);
jobInfoDetail.setCluster(clusterService.getById(jobInstance.getClusterId()));
......@@ -133,10 +125,33 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
}
JobManagerConfiguration jobManagerConfiguration = new JobManagerConfiguration();
Set<TaskManagerConfiguration> taskManagerConfigurationList = new HashSet<>();
if (Asserts.isNotNullString(history.getJobManagerAddress()) && JobStatus.RUNNING.getValue().equals(jobInfoDetail.getInstance().getStatus())) { // 如果有jobManager地址,则使用该地址
FlinkAPI flinkAPI = FlinkAPI.build(history.getJobManagerAddress());
// 获取jobManager的配置信息 开始
BuildConfiguration.buildJobManagerConfiguration(jobManagerConfiguration, flinkAPI);
// 获取jobManager的配置信息 结束
// 获取taskManager的配置信息 开始
JsonNode taskManagerContainers = flinkAPI.getTaskManagers(); //获取taskManager列表
BuildConfiguration.buildTaskManagerConfiguration(taskManagerConfigurationList, flinkAPI, taskManagerContainers);
// 获取taskManager的配置信息 结束
}
jobInfoDetail.setJobManagerConfiguration(jobManagerConfiguration);
jobInfoDetail.setTaskManagerConfiguration(taskManagerConfigurationList);
if (pool.exist(key)) {
pool.refresh(jobInfoDetail);;
} else {
pool.push(key,jobInfoDetail);
}
return jobInfoDetail;
}
@Override
public LineageResult getLineage(Integer id) {
History history = getJobInfoDetail(id).getHistory();
......
......@@ -576,17 +576,17 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Set<TaskManagerConfiguration> taskManagerConfigurationList = new HashSet<>();
if (Asserts.isNotNullString(history.getJobManagerAddress())) { // 如果有jobManager地址,则使用该地址
if (Asserts.isNotNullString(history.getJobManagerAddress()) && JobStatus.RUNNING.getValue().equals(jobInfoDetail.getInstance().getStatus())) { // 如果有jobManager地址,则使用该地址
FlinkAPI flinkAPI = FlinkAPI.build(history.getJobManagerAddress());
// 获取jobManager的配置信息 开始
buildJobManagerConfiguration(jobManagerConfiguration, flinkAPI);
BuildConfiguration.buildJobManagerConfiguration(jobManagerConfiguration, flinkAPI);
jobInfoDetail.setJobManagerConfiguration(jobManagerConfiguration);
// 获取jobManager的配置信息 结束
// 获取taskManager的配置信息 开始
JsonNode taskManagerContainers = flinkAPI.getTaskManagers(); //获取taskManager列表
buildTaskManagerConfiguration(taskManagerConfigurationList, flinkAPI, taskManagerContainers);
BuildConfiguration.buildTaskManagerConfiguration(taskManagerConfigurationList, flinkAPI, taskManagerContainers);
jobInfoDetail.setTaskManagerConfiguration(taskManagerConfigurationList);
// 获取taskManager的配置信息 结束
......@@ -631,119 +631,6 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return jobInfoDetail.getInstance();
}
/**
* @Author: zhumingye
* @date: 2022/6/27
* @Description: buildTaskManagerConfiguration
* @Params: [taskManagerConfigurationList, flinkAPI, taskManagerContainers]
* @return void
*/
private void buildTaskManagerConfiguration(Set<TaskManagerConfiguration> taskManagerConfigurationList, FlinkAPI flinkAPI, JsonNode taskManagerContainers) {
if (Asserts.isNotNull(taskManagerContainers)) {
JsonNode taskmanagers = taskManagerContainers.get("taskmanagers");
for (JsonNode taskManagers : taskmanagers) {
TaskManagerConfiguration taskManagerConfiguration = new TaskManagerConfiguration();
/**
* 解析 taskManager 的配置信息
*/
String containerId = taskManagers.get("id").asText();// 获取container id
String containerPath = taskManagers.get("path").asText(); // 获取container path
Integer dataPort = taskManagers.get("dataPort").asInt(); // 获取container dataPort
Integer jmxPort =taskManagers.get("jmxPort").asInt(); // 获取container jmxPort
Long timeSinceLastHeartbeat =taskManagers.get("timeSinceLastHeartbeat").asLong(); // 获取container timeSinceLastHeartbeat
Integer slotsNumber =taskManagers.get("slotsNumber").asInt(); // 获取container slotsNumber
Integer freeSlots = taskManagers.get("freeSlots").asInt(); // 获取container freeSlots
String totalResource = JSONUtil.toJsonString(taskManagers.get("totalResource")); // 获取container totalResource
String freeResource = JSONUtil.toJsonString(taskManagers.get("freeResource") ); // 获取container freeResource
String hardware = JSONUtil.toJsonString(taskManagers.get("hardware") ); // 获取container hardware
String memoryConfiguration = JSONUtil.toJsonString(taskManagers.get("memoryConfiguration") ); // 获取container memoryConfiguration
Asserts.checkNull(containerId, "获取不到 containerId , containerId不能为空");
JsonNode taskManagerMetrics = flinkAPI.getTaskManagerMetrics(containerId);//获取taskManager metrics
String taskManagerLog = flinkAPI.getTaskManagerLog(containerId);//获取taskManager日志
String taskManagerThreadDumps = JSONUtil.toJsonString(flinkAPI.getTaskManagerThreadDump(containerId).get("threadInfos"));//获取taskManager线程dumps
String taskManagerStdOut = flinkAPI.getTaskManagerStdOut(containerId);//获取taskManager标准输出日志
Map<String, String> taskManagerMetricsMap = new HashMap<String, String>(); //获取taskManager metrics
List<LinkedHashMap> taskManagerMetricsItemsList = JSONUtil.toList(JSONUtil.toJsonString(taskManagerMetrics), LinkedHashMap.class);
taskManagerMetricsItemsList.forEach(mapItems -> {
String configKey = (String) mapItems.get("id");
String configValue = (String) mapItems.get("value");
if (Asserts.isNotNullString(configKey) && Asserts.isNotNullString(configValue)) {
taskManagerMetricsMap.put(configKey, configValue);
}
});
/**
* TaskManagerConfiguration 赋值
*/
taskManagerConfiguration.setContainerId(containerId);
taskManagerConfiguration.setContainerPath(containerPath);
taskManagerConfiguration.setDataPort(dataPort);
taskManagerConfiguration.setJmxPort(jmxPort);
taskManagerConfiguration.setTimeSinceLastHeartbeat(timeSinceLastHeartbeat);
taskManagerConfiguration.setSlotsNumber(slotsNumber);
taskManagerConfiguration.setFreeSlots(freeSlots);
taskManagerConfiguration.setTotalResource(totalResource);
taskManagerConfiguration.setFreeResource(freeResource);
taskManagerConfiguration.setHardware(hardware);
taskManagerConfiguration.setMemoryConfiguration(memoryConfiguration);
/**
* TaskContainerConfigInfo 赋值
*/
TaskContainerConfigInfo taskContainerConfigInfo = new TaskContainerConfigInfo();
taskContainerConfigInfo.setMetrics(taskManagerMetricsMap);
taskContainerConfigInfo.setTaskManagerLog(taskManagerLog);
taskContainerConfigInfo.setTaskManagerThreadDump(taskManagerThreadDumps);
taskContainerConfigInfo.setTaskManagerStdout(taskManagerStdOut);
taskManagerConfiguration.setTaskContainerConfigInfo(taskContainerConfigInfo);
// 将taskManagerConfiguration添加到set集合中
taskManagerConfigurationList.add(taskManagerConfiguration);
}
}
}
/**
* @Author: zhumingye
* @date: 2022/6/27
* @Description: buildJobManagerConfiguration
* @Params: [jobManagerConfiguration, flinkAPI]
* @return void
*/
private void buildJobManagerConfiguration(JobManagerConfiguration jobManagerConfiguration, FlinkAPI flinkAPI) {
Map<String, String> jobManagerMetricsMap = new HashMap<String, String>(); //获取jobManager metrics
List<LinkedHashMap> jobManagerMetricsItemsList = JSONUtil.toList(JSONUtil.toJsonString(flinkAPI.getJobManagerMetrics()), LinkedHashMap.class);
jobManagerMetricsItemsList.forEach(mapItems -> {
String configKey = (String) mapItems.get("id");
String configValue = (String) mapItems.get("value");
if (Asserts.isNotNullString(configKey) && Asserts.isNotNullString(configValue)) {
jobManagerMetricsMap.put(configKey, configValue);
}
});
Map<String, String> jobManagerConfigMap = new HashMap<String, String>();//获取jobManager配置信息
List<LinkedHashMap> jobManagerConfigMapItemsList = JSONUtil.toList(JSONUtil.toJsonString(flinkAPI.getJobManagerConfig()), LinkedHashMap.class);
jobManagerConfigMapItemsList.forEach(mapItems -> {
String configKey = (String) mapItems.get("key");
String configValue = (String) mapItems.get("value");
if (Asserts.isNotNullString(configKey) && Asserts.isNotNullString(configValue)) {
jobManagerConfigMap.put(configKey, configValue);
}
});
String jobMangerLog = flinkAPI.getJobManagerLog(); //获取jobManager日志
String jobManagerStdOut = flinkAPI.getJobManagerStdOut(); //获取jobManager标准输出日志
jobManagerConfiguration.setMetrics(jobManagerMetricsMap);
jobManagerConfiguration.setJobManagerConfig(jobManagerConfigMap);
jobManagerConfiguration.setJobManagerLog(jobMangerLog);
jobManagerConfiguration.setJobManagerStdout(jobManagerStdOut);
}
private boolean inRefreshPlan(JobInstance jobInstance) {
if ((!JobStatus.isDone(jobInstance.getStatus())) || (Asserts.isNotNull(jobInstance.getFinishTime())
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
......
......@@ -3,7 +3,6 @@ import CodeShow from "@/components/Common/CodeShow";
const {TabPane} = Tabs;
// TODO: 此页面需要根据设置的3秒刷新时间,自动刷新数据
const JobManagerConfiguration = (props: any) => {
const {job} = props;
......
......@@ -9,7 +9,6 @@ import {HomeOutlined} from "@ant-design/icons";
const {TabPane} = Tabs;
// TODO: 此页面需要根据设置的3秒刷新时间,自动刷新数据
const TaskManagerConfigurationForm = (props: any) => {
const {job} = props;
const actionRef = useRef<ActionType>();
......
......@@ -3,11 +3,15 @@ import {PageContainer} from '@ant-design/pro-layout';
import ProCard from '@ant-design/pro-card';
import JobManagerConfiguration from "@/pages/DevOps/JobInfo/ClusterConfiguration/JobManager";
import TaskManagerConfigurationForm from "@/pages/DevOps/JobInfo/ClusterConfiguration/TaskManager";
import {JOB_STATUS} from "@/components/Common/JobStatus";
import {Empty} from "antd";
const ClusterConfiguration = (props: any) => {
const {job} = props;
const [tabKey, setTabKey] = useState<string>('jobmanager');
return (
<>
{job?.instance?.status === JOB_STATUS.RUNNING ?
<PageContainer
header={{title: undefined}}
tabList={[
......@@ -26,11 +30,14 @@ const ClusterConfiguration = (props: any) => {
setTabKey(key);
}}
>
<ProCard >
<ProCard>
{tabKey === 'jobmanager' ? <JobManagerConfiguration job={job}/> : undefined}
{tabKey === 'taskmanager' ? <TaskManagerConfigurationForm job={job}/> : undefined}
</ProCard>
</PageContainer>
</PageContainer> : <Empty image={Empty.PRESENTED_IMAGE_SIMPLE}/>
}
</>
);
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment