Commit 0fb8e527 authored by wenmo's avatar wenmo

作业总览

parent e9be4cb1
...@@ -81,4 +81,12 @@ public class JobInstanceController { ...@@ -81,4 +81,12 @@ public class JobInstanceController {
public Result getJobInfoDetail(@RequestParam Integer id) { public Result getJobInfoDetail(@RequestParam Integer id) {
return Result.succeed(jobInstanceService.getJobInfoDetail(id), "获取成功"); return Result.succeed(jobInstanceService.getJobInfoDetail(id), "获取成功");
} }
/**
* 刷新Job实例的所有信息
*/
@GetMapping("/refreshJobInfoDetail")
public Result refreshJobInfoDetail(@RequestParam Integer id) {
return Result.succeed(jobInstanceService.refreshJobInfoDetail(id), "刷新成功");
}
} }
package com.dlink.job; package com.dlink.job;
import cn.hutool.json.JSONUtil;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.context.SpringContextUtils; import com.dlink.context.SpringContextUtils;
import com.dlink.model.Cluster; import com.dlink.model.*;
import com.dlink.model.History; import com.dlink.service.*;
import com.dlink.model.JobInstance; import com.dlink.utils.JSONUtil;
import com.dlink.model.JobStatus;
import com.dlink.service.ClusterService;
import com.dlink.service.HistoryService;
import com.dlink.service.JobInstanceService;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import java.time.LocalDateTime; import java.time.LocalDateTime;
...@@ -25,12 +20,18 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -25,12 +20,18 @@ public class Job2MysqlHandler implements JobHandler {
private static HistoryService historyService; private static HistoryService historyService;
private static ClusterService clusterService; private static ClusterService clusterService;
private static ClusterConfigurationService clusterConfigurationService;
private static JarService jarService;
private static JobInstanceService jobInstanceService; private static JobInstanceService jobInstanceService;
private static JobHistoryService jobHistoryService;
static { static {
historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class); historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class);
clusterService = SpringContextUtils.getBean("clusterServiceImpl", ClusterService.class); clusterService = SpringContextUtils.getBean("clusterServiceImpl", ClusterService.class);
clusterConfigurationService = SpringContextUtils.getBean("clusterConfigurationServiceImpl", ClusterConfigurationService.class);
jarService = SpringContextUtils.getBean("jarServiceImpl", JarService.class);
jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class); jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class);
jobHistoryService = SpringContextUtils.getBean("jobHistoryServiceImpl", JobHistoryService.class);
} }
@Override @Override
...@@ -50,7 +51,7 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -50,7 +51,7 @@ public class Job2MysqlHandler implements JobHandler {
history.setStatement(job.getStatement()); history.setStatement(job.getStatement());
history.setStartTime(job.getStartTime()); history.setStartTime(job.getStartTime());
history.setTaskId(job.getJobConfig().getTaskId()); history.setTaskId(job.getJobConfig().getTaskId());
history.setConfig(JSONUtil.toJsonStr(job.getJobConfig())); history.setConfigJson(JSONUtil.toJsonString(job.getJobConfig()));
historyService.save(history); historyService.save(history);
job.setId(history.getId()); job.setId(history.getId());
return true; return true;
...@@ -81,16 +82,27 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -81,16 +82,27 @@ public class Job2MysqlHandler implements JobHandler {
history.setJobManagerAddress(job.getJobManagerAddress()); history.setJobManagerAddress(job.getJobManagerAddress());
} }
Integer clusterId = job.getJobConfig().getClusterId(); Integer clusterId = job.getJobConfig().getClusterId();
Cluster cluster = null;
if (job.isUseGateway()) { if (job.isUseGateway()) {
Cluster cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(), cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(),
job.getJobId(), job.getJobConfig().getJobName() + LocalDateTime.now(), job.getType().getLongValue(), job.getJobId(), job.getJobConfig().getJobName() + LocalDateTime.now(), job.getType().getLongValue(),
job.getJobConfig().getClusterConfigurationId(), job.getJobConfig().getTaskId())); job.getJobConfig().getClusterConfigurationId(), job.getJobConfig().getTaskId()));
if (Asserts.isNotNull(cluster)) { if (Asserts.isNotNull(cluster)) {
clusterId = cluster.getId(); clusterId = cluster.getId();
} }
}else{
cluster = clusterService.getById(clusterId);
} }
history.setClusterId(clusterId); history.setClusterId(clusterId);
historyService.updateById(history); historyService.updateById(history);
ClusterConfiguration clusterConfiguration = null;
if(Asserts.isNotNull(job.getJobConfig().getClusterConfigurationId())){
clusterConfiguration = clusterConfigurationService.getClusterConfigById(job.getJobConfig().getClusterConfigurationId());
}
Jar jar = null;
if(Asserts.isNotNull(job.getJobConfig().getJarId())){
jar = jarService.getById(job.getJobConfig().getJarId());
}
if (Asserts.isNotNullCollection(job.getJids())) { if (Asserts.isNotNullCollection(job.getJids())) {
for (String jid : job.getJids()) { for (String jid : job.getJids()) {
JobInstance jobInstance = history.buildJobInstance(); JobInstance jobInstance = history.buildJobInstance();
...@@ -101,6 +113,12 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -101,6 +113,12 @@ public class Job2MysqlHandler implements JobHandler {
jobInstance.setJid(jid); jobInstance.setJid(jid);
jobInstance.setStatus(JobStatus.INITIALIZING.getValue()); jobInstance.setStatus(JobStatus.INITIALIZING.getValue());
jobInstanceService.save(jobInstance); jobInstanceService.save(jobInstance);
JobHistory jobHistory = new JobHistory();
jobHistory.setId(jobInstance.getId());
jobHistory.setJarJson(JSONUtil.toJsonString(jar));
jobHistory.setClusterJson(JSONUtil.toJsonString(cluster));
jobHistory.setClusterConfigurationJson(JSONUtil.toJsonString(clusterConfiguration));
jobHistoryService.save(jobHistory);
} }
} }
return true; return true;
......
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.JobHistory;
import org.apache.ibatis.annotations.Mapper;
/**
* JobHistoryMapper
*
* @author wenmo
* @since 2022/3/2 19:50
**/
@Mapper
public interface JobHistoryMapper extends SuperMapper<JobHistory> {
int insert(JobHistory jobHistory);
}
...@@ -2,6 +2,7 @@ package com.dlink.model; ...@@ -2,6 +2,7 @@ package com.dlink.model;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
...@@ -33,7 +34,9 @@ public class History implements Serializable { ...@@ -33,7 +34,9 @@ public class History implements Serializable {
private String type; private String type;
private String error; private String error;
private String result; private String result;
private String config; @TableField(exist = false)
private ObjectNode config;
private String configJson;
private LocalDateTime startTime; private LocalDateTime startTime;
private LocalDateTime endTime; private LocalDateTime endTime;
private Integer taskId; private Integer taskId;
......
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* JobHistory
*
* @author wenmo
* @since 2022/3/2 19:48
**/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_job_history")
public class JobHistory implements Serializable {
private static final long serialVersionUID = 4984787372340047250L;
private Integer id;
@TableField(exist = false)
private ObjectNode job;
private String jobJson;
@TableField(exist = false)
private ObjectNode exceptions;
private String exceptionsJson;
@TableField(exist = false)
private ObjectNode checkpoints;
private String checkpointsJson;
@TableField(exist = false)
private ObjectNode checkpointsConfig;
private String checkpointsConfigJson;
@TableField(exist = false)
private ObjectNode config;
private String configJson;
@TableField(exist = false)
private ObjectNode jar;
private String jarJson;
@TableField(exist = false)
private ObjectNode cluster;
private String clusterJson;
@TableField(exist = false)
private ObjectNode clusterConfiguration;
private String clusterConfigurationJson;
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime;
}
...@@ -14,6 +14,7 @@ public class JobInfoDetail { ...@@ -14,6 +14,7 @@ public class JobInfoDetail {
private ClusterConfiguration clusterConfiguration; private ClusterConfiguration clusterConfiguration;
private Task task; private Task task;
private History history; private History history;
private JobHistory jobHistory;
public JobInfoDetail(Integer id) { public JobInfoDetail(Integer id) {
this.id = id; this.id = id;
...@@ -66,4 +67,12 @@ public class JobInfoDetail { ...@@ -66,4 +67,12 @@ public class JobInfoDetail {
public void setHistory(History history) { public void setHistory(History history) {
this.history = history; this.history = history;
} }
public JobHistory getJobHistory() {
return jobHistory;
}
public void setJobHistory(JobHistory jobHistory) {
this.jobHistory = jobHistory;
}
} }
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.JobHistory;
/**
* JobHistoryService
*
* @author wenmo
* @since 2022/3/2 19:55
**/
public interface JobHistoryService extends ISuperService<JobHistory> {
JobHistory getJobHistory(Integer id);
JobHistory getJobHistoryInfo(JobHistory jobHistory);
JobHistory refreshJobHistory(Integer id,String jobManagerHost,String jobId);
}
...@@ -16,4 +16,10 @@ public interface JobInstanceService extends ISuperService<JobInstance> { ...@@ -16,4 +16,10 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
JobInstanceStatus getStatusCount(); JobInstanceStatus getStatusCount();
JobInfoDetail getJobInfoDetail(Integer id); JobInfoDetail getJobInfoDetail(Integer id);
JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance);
JobInstance refreshJobInstance(Integer id);
JobInfoDetail refreshJobInfoDetail(Integer id);
} }
package com.dlink.service.impl;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.JobHistoryMapper;
import com.dlink.model.JobHistory;
import com.dlink.service.JobHistoryService;
import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.JsonNode;
import org.springframework.stereotype.Service;
/**
* JobHistoryServiceImpl
*
* @author wenmo
* @since 2022/3/2 20:00
**/
@Service
public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, JobHistory> implements JobHistoryService {
@Override
public JobHistory getJobHistory(Integer id) {
return getJobHistoryInfo(getById(id));
}
@Override
public JobHistory getJobHistoryInfo(JobHistory jobHistory) {
if(Asserts.isNotNull(jobHistory)){
if(Asserts.isNotNullString(jobHistory.getJobJson())){
jobHistory.setJob(JSONUtil.parseObject(jobHistory.getJobJson()));
jobHistory.setJobJson(null);
}
if(Asserts.isNotNullString(jobHistory.getExceptionsJson())){
jobHistory.setExceptions(JSONUtil.parseObject(jobHistory.getExceptionsJson()));
jobHistory.setExceptionsJson(null);
}
if(Asserts.isNotNullString(jobHistory.getCheckpointsJson())){
jobHistory.setCheckpoints(JSONUtil.parseObject(jobHistory.getCheckpointsJson()));
jobHistory.setCheckpointsJson(null);
}
if(Asserts.isNotNullString(jobHistory.getCheckpointsConfigJson())){
jobHistory.setCheckpointsConfig(JSONUtil.parseObject(jobHistory.getCheckpointsConfigJson()));
jobHistory.setCheckpointsConfigJson(null);
}
if(Asserts.isNotNullString(jobHistory.getConfigJson())){
jobHistory.setConfig(JSONUtil.parseObject(jobHistory.getConfigJson()));
jobHistory.setConfigJson(null);
}
if(Asserts.isNotNullString(jobHistory.getJarJson())){
jobHistory.setJar(JSONUtil.parseObject(jobHistory.getJarJson()));
jobHistory.setJarJson(null);
}
if(Asserts.isNotNullString(jobHistory.getClusterJson())){
jobHistory.setCluster(JSONUtil.parseObject(jobHistory.getClusterJson()));
jobHistory.setClusterJson(null);
}
if(Asserts.isNotNullString(jobHistory.getClusterConfigurationJson())){
jobHistory.setClusterConfiguration(JSONUtil.parseObject(jobHistory.getClusterConfigurationJson()));
jobHistory.setClusterConfigurationJson(null);
}
}
return jobHistory;
}
@Override
public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId) {
JsonNode jobInfo = FlinkAPI.build(jobManagerHost).getJobInfo(jobId);
JsonNode exception = FlinkAPI.build(jobManagerHost).getException(jobId);
JsonNode checkPoints = FlinkAPI.build(jobManagerHost).getCheckPoints(jobId);
JsonNode checkPointsConfig = FlinkAPI.build(jobManagerHost).getCheckPointsConfig(jobId);
JsonNode jobsConfig = FlinkAPI.build(jobManagerHost).getJobsConfig(jobId);
JobHistory jobHistory = new JobHistory();
jobHistory.setId(id);
jobHistory.setJobJson(JSONUtil.toJsonString(jobInfo));
jobHistory.setExceptionsJson(JSONUtil.toJsonString(exception));
jobHistory.setCheckpointsJson(JSONUtil.toJsonString(checkPoints));
jobHistory.setCheckpointsConfigJson(JSONUtil.toJsonString(checkPointsConfig));
jobHistory.setConfigJson(JSONUtil.toJsonString(jobsConfig));
if (Asserts.isNotNull(getById(id))) {
updateById(jobHistory);
} else {
save(jobHistory);
}
return jobHistory;
}
}
package com.dlink.service.impl; package com.dlink.service.impl;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkRestResultConstant;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.JobInstanceMapper; import com.dlink.mapper.JobInstanceMapper;
import com.dlink.model.Cluster;
import com.dlink.model.History; import com.dlink.model.History;
import com.dlink.model.JobHistory;
import com.dlink.model.JobInfoDetail; import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance; import com.dlink.model.JobInstance;
import com.dlink.model.JobInstanceCount; import com.dlink.model.JobInstanceCount;
...@@ -12,8 +15,10 @@ import com.dlink.model.JobStatus; ...@@ -12,8 +15,10 @@ import com.dlink.model.JobStatus;
import com.dlink.service.ClusterConfigurationService; import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService; import com.dlink.service.ClusterService;
import com.dlink.service.HistoryService; import com.dlink.service.HistoryService;
import com.dlink.service.JobHistoryService;
import com.dlink.service.JobInstanceService; import com.dlink.service.JobInstanceService;
import com.dlink.service.TaskService; import com.dlink.service.TaskService;
import com.dlink.utils.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -36,6 +41,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, ...@@ -36,6 +41,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
private ClusterService clusterService; private ClusterService clusterService;
@Autowired @Autowired
private ClusterConfigurationService clusterConfigurationService; private ClusterConfigurationService clusterConfigurationService;
@Autowired
private JobHistoryService jobHistoryService;
@Override @Override
public JobInstanceStatus getStatusCount() { public JobInstanceStatus getStatusCount() {
...@@ -68,17 +75,44 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, ...@@ -68,17 +75,44 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
@Override @Override
public JobInfoDetail getJobInfoDetail(Integer id) { public JobInfoDetail getJobInfoDetail(Integer id) {
JobInfoDetail jobInfoDetail = new JobInfoDetail(id); return getJobInfoDetailInfo(getById(id));
JobInstance jobInstance = getById(id); }
@Override
public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) {
Asserts.checkNull(jobInstance, "该任务实例不存在"); Asserts.checkNull(jobInstance, "该任务实例不存在");
JobInfoDetail jobInfoDetail = new JobInfoDetail(jobInstance.getId());
jobInfoDetail.setInstance(jobInstance); jobInfoDetail.setInstance(jobInstance);
jobInfoDetail.setTask(taskService.getTaskInfoById(jobInstance.getTaskId())); jobInfoDetail.setTask(taskService.getTaskInfoById(jobInstance.getTaskId()));
jobInfoDetail.setCluster(clusterService.getById(jobInstance.getClusterId())); jobInfoDetail.setCluster(clusterService.getById(jobInstance.getClusterId()));
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(jobInstance.getId()));
History history = historyService.getById(jobInstance.getHistoryId()); History history = historyService.getById(jobInstance.getHistoryId());
history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
jobInfoDetail.setHistory(history); jobInfoDetail.setHistory(history);
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) { if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
} }
return jobInfoDetail; return jobInfoDetail;
} }
@Override
public JobInstance refreshJobInstance(Integer id) {
JobInstance jobInstance = getById(id);
Asserts.checkNull(jobInstance, "该任务实例不存在");
if(JobStatus.isDone(jobInstance.getStatus())){
return jobInstance;
}
Cluster cluster = clusterService.getById(jobInstance.getClusterId());
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, cluster.getJobManagerHost(), jobInstance.getJid());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
jobInstance.setDuration(jobHistory.getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong()/1000);
jobInstance.setStatus(jobHistory.getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
updateById(jobInstance);
return jobInstance;
}
@Override
public JobInfoDetail refreshJobInfoDetail(Integer id) {
return getJobInfoDetailInfo(refreshJobInstance(id));
}
} }
...@@ -2,34 +2,6 @@ ...@@ -2,34 +2,6 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.HistoryMapper"> <mapper namespace="com.dlink.mapper.HistoryMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.dlink.model.History">
<id column="id" property="id"/>
<result column="cluster_id" property="clusterId"/>
<result column="cluster_configuration_id" property="clusterConfigurationId"/>
<result column="session" property="session"/>
<result column="jod_id" property="jobId"/>
<result column="job_name" property="jobName"/>
<result column="job_manager_address" property="jobManagerAddress"/>
<result column="status" property="status"/>
<result column="statement" property="statement"/>
<result column="type" property="type"/>
<result column="error" property="error"/>
<result column="result" property="result"/>
<result column="config" property="config"/>
<result column="start_time" property="startTime"/>
<result column="end_time" property="endTime"/>
<result column="task_id" property="taskId"/>
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,cluster_id,cluster_configuration_id,session,jod_id,job_name,
job_manager_address,status,statement,type, error,
result,config,start_time,end_time,task_id
</sql>
<select id="selectForProTable" resultType="com.dlink.model.History"> <select id="selectForProTable" resultType="com.dlink.model.History">
select select
a.*, a.*,
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.JobHistoryMapper">
<insert id="insert">
insert into dlink_job_history (id,job_json,exceptions_json,checkpoints_json,checkpoints_config_json,config_json,
jar_json,cluster_json,cluster_configuration_json,update_time)
values (#{id},#{jobJson},#{exceptionsJson},#{checkpointsJson},#{checkpointsConfigJson},#{configJson},
#{jarJson},#{clusterJson},#{clusterConfigurationJson},#{updateTime})
</insert>
<select id="selectForProTable" resultType="com.dlink.model.JobHistory">
select
a.*
from
dlink_job_history a
<where>
1=1
<if test='param.id!=null and param.id!=""'>
and a.id = #{param.id}
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
...@@ -81,12 +81,23 @@ public enum JobStatus { ...@@ -81,12 +81,23 @@ public enum JobStatus {
return value; return value;
} }
public static JobStatus get(String value){ public static JobStatus get(String value) {
for (JobStatus type : JobStatus.values()) { for (JobStatus type : JobStatus.values()) {
if(Asserts.isEqualsIgnoreCase(type.getValue(),value)){ if (Asserts.isEqualsIgnoreCase(type.getValue(), value)) {
return type; return type;
} }
} }
return JobStatus.UNKNOWN; return JobStatus.UNKNOWN;
} }
public static boolean isDone(String value) {
switch (get(value)) {
case FAILED:
case CANCELED:
case FINISHED:
return true;
default:
return false;
}
}
} }
...@@ -124,6 +124,9 @@ public class JSONUtil { ...@@ -124,6 +124,9 @@ public class JSONUtil {
} }
public static String toJsonString(Object object) { public static String toJsonString(Object object) {
if(Asserts.isNull(object)){
return null;
}
try { try {
return objectMapper.writeValueAsString(object); return objectMapper.writeValueAsString(object);
} catch (Exception e) { } catch (Exception e) {
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<!-- `provided` for product environment ,`compile` for dev environment --> <!-- `provided` for product environment ,`compile` for dev environment -->
<scope.runtime>provided</scope.runtime> <scope.runtime>compile</scope.runtime>
</properties> </properties>
<dependencies> <dependencies>
......
...@@ -139,7 +139,27 @@ public class FlinkAPI { ...@@ -139,7 +139,27 @@ public class FlinkAPI {
} }
public String getVersion() { public String getVersion() {
JsonNode result = get(FlinkRestAPIConstant.CONFIG); JsonNode result = get(FlinkRestAPIConstant.FLINK_CONFIG);
return result.get("flink-version").asText(); return result.get("flink-version").asText();
} }
public JsonNode getJobInfo(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId);
}
public JsonNode getException(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.EXCEPTIONS);
}
public JsonNode getCheckPoints(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.CHECKPOINTS);
}
public JsonNode getCheckPointsConfig(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.CHECKPOINTS_CONFIG);
}
public JsonNode getJobsConfig(String jobId) {
return get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.CONFIG);
}
} }
...@@ -6,29 +6,26 @@ package com.dlink.constant; ...@@ -6,29 +6,26 @@ package com.dlink.constant;
* @author wenmo * @author wenmo
* @since 2021/6/24 14:04 * @since 2021/6/24 14:04
**/ **/
public interface FlinkRestAPIConstant { public final class FlinkRestAPIConstant {
/**
* config public static final String FLINK_CONFIG = "config";
*/
String CONFIG = "config"; public static final String CONFIG = "/config";
/**
* jobs public static final String JOBS = "jobs/";
*/
String JOBS = "jobs/"; public static final String JOBSLIST = "jobs/overview";
/**
* JOBSLIST public static final String CANCEL = "/yarn-cancel";
*/
String JOBSLIST = "jobs/overview"; public static final String CHECKPOINTS = "/checkpoints";
/**
* cancel public static final String CHECKPOINTS_CONFIG = "/checkpoints/config";
*/
String CANCEL = "/yarn-cancel"; public static final String SAVEPOINTS = "/savepoints";
/**
* savepoints public static final String STOP = "/stop";
*/
String SAVEPOINTS = "/savepoints"; public static final String EXCEPTIONS = "/exceptions?maxExceptions=10";
/**
* stop
*/
String STOP = "/stop";
} }
package com.dlink.constant;
/**
* FlinkRestAPIConstant
*
* @author wenmo
* @since 2022/3/2 20:04
**/
public final class FlinkRestResultConstant {
public static final String JOB_DURATION = "duration";
public static final String JOB_STATE = "state";
}
...@@ -165,7 +165,7 @@ CREATE TABLE `dlink_history` ( ...@@ -165,7 +165,7 @@ CREATE TABLE `dlink_history` (
`statement` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '语句集', `statement` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '语句集',
`error` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '异常信息', `error` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '异常信息',
`result` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '结果集', `result` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '结果集',
`config` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置', `config_json` json NULL COMMENT '配置JSON',
`start_time` datetime(0) NULL DEFAULT NULL COMMENT '开始时间', `start_time` datetime(0) NULL DEFAULT NULL COMMENT '开始时间',
`end_time` datetime(0) NULL DEFAULT NULL COMMENT '结束时间', `end_time` datetime(0) NULL DEFAULT NULL COMMENT '结束时间',
`task_id` int(11) NULL DEFAULT NULL COMMENT '作业ID', `task_id` int(11) NULL DEFAULT NULL COMMENT '作业ID',
...@@ -372,11 +372,14 @@ create table dlink_job_history ...@@ -372,11 +372,14 @@ create table dlink_job_history
( (
id int comment '实例主键' id int comment '实例主键'
primary key, primary key,
job json null comment 'Job信息', job_json json null comment 'Job信息',
exceptions json null comment '异常日志', exceptions_json json null comment '异常日志',
checkpoints json null comment '保存点', checkpoints_json json null comment '保存点',
checkpoints_config json null comment '保存点配置', checkpoints_config_json json null comment '保存点配置',
config json null comment '配置', config_json json null comment '配置',
jar_json json null comment 'Jar配置',
cluster_json json null comment '集群实例',
cluster_configuration_json json null comment '集群配置',
update_time datetime null comment '更新时间' update_time datetime null comment '更新时间'
) )
comment 'Job历史详情'; comment 'Job历史详情';
......
...@@ -605,13 +605,23 @@ create table dlink_job_history ...@@ -605,13 +605,23 @@ create table dlink_job_history
( (
id int comment '实例主键' id int comment '实例主键'
primary key, primary key,
job json null comment 'Job信息', job_json json null comment 'Job信息',
exceptions json null comment '异常日志', exceptions_json json null comment '异常日志',
checkpoints json null comment '保存点', checkpoints_json json null comment '保存点',
checkpoints_config json null comment '保存点配置', checkpoints_config_json json null comment '保存点配置',
config json null comment '配置', config_json json null comment '配置',
jar_json json null comment 'Jar配置',
cluster_json json null comment '集群实例',
cluster_configuration_json json null comment '集群配置',
update_time datetime null comment '更新时间' update_time datetime null comment '更新时间'
) )
comment 'Job历史详情'; comment 'Job历史详情';
-- ----------------------------
-- 0.4.0 2021-03-02
-- ----------------------------
ALTER TABLE `dlink_history`
CHANGE COLUMN `config` `config_json` json NULL COMMENT '配置JSON' AFTER `result`;
SET FOREIGN_KEY_CHECKS = 1; SET FOREIGN_KEY_CHECKS = 1;
...@@ -60,7 +60,7 @@ const DTable = (props: any) => { ...@@ -60,7 +60,7 @@ const DTable = (props: any) => {
} }
useEffect(() => { useEffect(() => {
if(dataSource.url){ if(dataSource&&dataSource.url){
refreshData(); refreshData();
} }
}, []); }, []);
...@@ -69,7 +69,7 @@ const DTable = (props: any) => { ...@@ -69,7 +69,7 @@ const DTable = (props: any) => {
<ProTable <ProTable
columns={buildColumn()} columns={buildColumn()}
style={{width: '100%'}} style={{width: '100%'}}
dataSource={dataSource.url?data:dataSource} dataSource={dataSource?(dataSource.url?data:dataSource):[]}
rowKey="name" rowKey="name"
pagination={{ pagination={{
pageSize: 10, pageSize: 10,
......
export function parseByteStr(limit: number) {
let size = "";
if (limit < 0.1 * 1024) { //小于0.1KB,则转化成B
size = limit.toFixed(2) + "B"
} else if (limit < 0.1 * 1024 * 1024) { //小于0.1MB,则转化成KB
size = (limit / 1024).toFixed(2) + "KB"
} else if (limit < 0.1 * 1024 * 1024 * 1024) { //小于0.1GB,则转化成MB
size = (limit / (1024 * 1024)).toFixed(2) + "MB"
} else { //其他转化成GB
size = (limit / (1024 * 1024 * 1024)).toFixed(2) + "GB"
}
let sizeStr = size + ""; //转成字符串
let index = sizeStr.indexOf("."); //获取小数点处的索引
let dou = sizeStr.substr(index + 1, 2) //获取小数点后两位的值
if (dou == "00") { //判断后两位是否为00,如果是则删除00
return sizeStr.substring(0, index) + sizeStr.substr(index + 3, 2)
}
return size;
}
export function parseNumStr(num: number) {
let c = (num.toString().indexOf('.') !== -1) ? num.toLocaleString() : num.toString().replace(/(\d)(?=(\d{3})+$)/g, '$1,');
return c;
}
export function parseMilliSecondStr(second_time: number) {
return parseSecondStr(second_time/1000);
}
export function parseSecondStr(second_time: number) {
second_time = Math.floor(second_time);
let time = second_time + "秒";
if (second_time > 60) {
let second = second_time % 60;
let min = Math.floor(second_time / 60);
time = min + "分" + second + "秒";
if (min > 60) {
min = Math.floor(second_time / 60) % 60;
let hour = Math.floor(Math.floor(second_time / 60) / 60);
time = hour + "小时" + min + "分" + second + "秒";
if (hour > 24) {
hour = Math.floor(Math.floor(second_time / 60) / 60) % 24;
let day = Math.floor(Math.floor(Math.floor(second_time / 60) / 60) / 24);
time = day + "天" + hour + "小时" + min + "分" + second + "秒";
}
}
}
return time;
}
...@@ -11,7 +11,8 @@ export type HistoryItem = { ...@@ -11,7 +11,8 @@ export type HistoryItem = {
statement: string; statement: string;
error: string; error: string;
result: string; result: string;
config: string; config: any;
configJson: string;
type: string; type: string;
startTime: string; startTime: string;
endTime: string; endTime: string;
......
...@@ -34,6 +34,7 @@ type HistoryConfig={ ...@@ -34,6 +34,7 @@ type HistoryConfig={
jobName: string; jobName: string;
useSqlFragment: boolean; useSqlFragment: boolean;
useStatementSet: boolean; useStatementSet: boolean;
useBatchModel: boolean;
checkpoint: number; checkpoint: number;
parallelism: number; parallelism: number;
savePointPath: string; savePointPath: string;
...@@ -53,7 +54,7 @@ const StudioHistory = (props: any) => { ...@@ -53,7 +54,7 @@ const StudioHistory = (props: any) => {
setRow(row); setRow(row);
setModalVisit(true); setModalVisit(true);
setType(type); setType(type);
setConfig(JSON.parse(row.config)); setConfig(JSON.parse(row.configJson));
if(type===3){ if(type===3){
// showJobData(row.jobId,dispatch) // showJobData(row.jobId,dispatch)
const res = getJobData(row.jobId); const res = getJobData(row.jobId);
......
...@@ -17,10 +17,10 @@ const StudioGuide = (props: any) => { ...@@ -17,10 +17,10 @@ const StudioGuide = (props: any) => {
<Paragraph> <Paragraph>
<ul> <ul>
<li> <li>
<Link onClick={()=>{history.push('/registration/cluster')}}>注册集群实例</Link> <Link onClick={()=>{history.push('/registration/cluster/clusterInstance')}}>注册集群实例</Link>
</li> </li>
<li> <li>
<Link onClick={()=>{history.push('/registration/clusterConfiguration')}}>注册集群配置</Link> <Link onClick={()=>{history.push('/registration/cluster/clusterConfiguration')}}>注册集群配置</Link>
</li> </li>
<li> <li>
<Link onClick={()=>{history.push('/registration/jar')}}>注册 Jar</Link> <Link onClick={()=>{history.push('/registration/jar')}}>注册 Jar</Link>
......
import {Descriptions} from 'antd'; import {Descriptions, Typography} from 'antd';
import StatusCounts from "@/components/Common/StatusCounts"; import StatusCounts from "@/components/Common/StatusCounts";
import ProTable from '@ant-design/pro-table';
import {ProColumns} from "@ant-design/pro-table";
import {VerticesTableListItem} from "@/pages/DevOps/data";
import JobStatus from "@/components/Common/JobStatus";
import {parseByteStr, parseMilliSecondStr, parseNumStr, parseSecondStr} from "@/components/Common/function";
const { Text } = Typography;
const BaseInfo = (props: any) => { const BaseInfo = (props: any) => {
const {job} = props; const {job} = props;
return (<Descriptions bordered> const columns: ProColumns<VerticesTableListItem>[] = [
<Descriptions.Item label="作业状态"><StatusCounts statusCounts={job?.instance.statusCounts}/></Descriptions.Item> {
<Descriptions.Item label="重启次数">{job?.instance.failedRestartCount}</Descriptions.Item> title: '名称',
<Descriptions.Item label="耗时">{job?.instance.duration}</Descriptions.Item> dataIndex: 'name',
<Descriptions.Item label="启动时间">{job?.instance.createTime}</Descriptions.Item> render: (dom, entity) => {
<Descriptions.Item label="更新时间">{job?.instance.updateTime}</Descriptions.Item> return <Text style={{ width: 500 }} ellipsis={{ tooltip:entity.name }}>{entity.name}</Text>;
<Descriptions.Item label="完成时间">{job?.instance.finishTime}</Descriptions.Item> },
<Descriptions.Item span={3}>{}</Descriptions.Item> },
</Descriptions>) {
title: '状态',
dataIndex: 'status',
sorter: true,
render: (dom, entity) => {
return <JobStatus status={entity.status}/>;
},
},
{
title: '接收字节',
render: (dom, entity) => {
return parseByteStr(entity.metrics['read-bytes']);
},
},
{
title: '接收记录',
render: (dom, entity) => {
return parseNumStr(entity.metrics['read-records']);
},
},
{
title: '发送字节',
render: (dom, entity) => {
return parseByteStr(entity.metrics['write-bytes']);
},
},
{
title: '发送记录',
render: (dom, entity) => {
return parseNumStr(entity.metrics['write-records']);
},
},
{
title: '并行度',
sorter: true,
dataIndex: 'parallelism',
},
{
title: '开始时间',
dataIndex: 'start-time',
valueType: 'dateTime',
},
{
title: '耗时',
render: (dom, entity) => {
return parseMilliSecondStr(entity.duration);
},
},
{
title: '结束时间',
dataIndex: 'end-time',
valueType: 'dateTime',
},
{
title: '算子',
render: (dom, entity) => {
return <StatusCounts statusCounts={entity.tasks}/>;
},
},
];
return (<>
<Descriptions bordered size="small">
<Descriptions.Item label="作业状态">
{job?.jobHistory?.job?<StatusCounts statusCounts={job?.jobHistory?.job['status-counts']}/>:undefined}
</Descriptions.Item>
<Descriptions.Item label="重启次数">{job?.instance?.failedRestartCount?job?.instance?.failedRestartCount:0}</Descriptions.Item>
<Descriptions.Item label="耗时">{parseSecondStr(job?.instance?.duration)}</Descriptions.Item>
<Descriptions.Item label="启动时间">{job?.instance?.createTime}</Descriptions.Item>
<Descriptions.Item label="更新时间">{job?.instance?.updateTime}</Descriptions.Item>
<Descriptions.Item label="完成时间">{job?.instance?.finishTime}</Descriptions.Item>
</Descriptions>
{job?.jobHistory?.job?
<ProTable
columns={columns}
style={{width: '100%'}}
dataSource={job?.jobHistory?.job.vertices}
rowKey="name"
pagination={{
pageSize: 10,
}}
toolBarRender={false}
search={false}
size="small"
/>:undefined}
</>)
}; };
export default BaseInfo; export default BaseInfo;
import {Descriptions, Typography, Tag} from 'antd';
import {
RocketOutlined
} from '@ant-design/icons';
const {Link} = Typography;
const Config = (props: any) => {
const {job} = props;
return (<>
<Descriptions bordered size="small">
<Descriptions.Item label="执行模式">{job?.history?.type ? (
<Tag color="blue" key={job?.history?.type}>
<RocketOutlined/> {job?.history?.type}
</Tag>
) : undefined}
</Descriptions.Item>
<Descriptions.Item label="集群实例">
{job?.cluster?.alias?<Link>{job?.cluster?.alias}</Link>:'-'}
</Descriptions.Item>
<Descriptions.Item label="集群配置">
{job?.clusterConfiguration?.alias?<Link>{job?.clusterConfiguration?.alias}</Link>:'-'}
</Descriptions.Item>
<Descriptions.Item label="共享会话">
{job?.history?.session?<Link>{job?.history?.session}</Link>:'禁用'}
</Descriptions.Item>
<Descriptions.Item label="片段机制">{job?.history?.config.useSqlFragment?'启用':'禁用'}</Descriptions.Item>
<Descriptions.Item label="语句集">{job?.history?.config.useStatementSet?'启用':'禁用'}</Descriptions.Item>
<Descriptions.Item label="批模式">{job?.history?.config.useBatchModel?'启用':'禁用'}</Descriptions.Item>
<Descriptions.Item label="SavePoint机制">
{job?.history?.config.savePointStrategy?'启用':'禁用'}
</Descriptions.Item>
</Descriptions>
</>)
};
export default Config;
...@@ -8,9 +8,10 @@ import {Button, Dropdown, Menu, Tag, Space, Typography} from 'antd'; ...@@ -8,9 +8,10 @@ import {Button, Dropdown, Menu, Tag, Space, Typography} from 'antd';
import {PageContainer} from '@ant-design/pro-layout'; import {PageContainer} from '@ant-design/pro-layout';
import ProCard from '@ant-design/pro-card'; import ProCard from '@ant-design/pro-card';
import {JobInfoDetail} from "@/pages/DevOps/data"; import {JobInfoDetail} from "@/pages/DevOps/data";
import {getJobInfoDetail} from "@/pages/DevOps/service"; import {getJobInfoDetail, refreshJobInfoDetail} from "@/pages/DevOps/service";
import moment from "moment"; import moment from "moment";
import BaseInfo from "@/pages/DevOps/JobInfo/BaseInfo"; import BaseInfo from "@/pages/DevOps/JobInfo/BaseInfo";
import Config from "@/pages/DevOps/JobInfo/Config";
import JobStatus from "@/components/Common/JobStatus"; import JobStatus from "@/components/Common/JobStatus";
const {Link} = Typography; const {Link} = Typography;
...@@ -24,7 +25,7 @@ const JobInfo = (props: any) => { ...@@ -24,7 +25,7 @@ const JobInfo = (props: any) => {
const [time, setTime] = useState(() => Date.now()); const [time, setTime] = useState(() => Date.now());
const [tabKey, setTabKey] = useState<string>('base'); const [tabKey, setTabKey] = useState<string>('base');
const refreshJobInfoDetail = () => { const handleGetJobInfoDetail = () => {
const res = getJobInfoDetail(id); const res = getJobInfoDetail(id);
res.then((result) => { res.then((result) => {
setJob(result.datas); setJob(result.datas);
...@@ -33,13 +34,21 @@ const JobInfo = (props: any) => { ...@@ -33,13 +34,21 @@ const JobInfo = (props: any) => {
}; };
useEffect(() => { useEffect(() => {
refreshJobInfoDetail(); handleGetJobInfoDetail();
let dataPolling = setInterval(refreshJobInfoDetail, 3000); let dataPolling = setInterval(handleGetJobInfoDetail, 3000);
return () => { return () => {
clearInterval(dataPolling); clearInterval(dataPolling);
}; };
}, []); }, []);
const handleRefreshJobInfoDetail = () => {
const res = refreshJobInfoDetail(id);
res.then((result) => {
setJob(result.datas);
setTime(Date.now());
});
};
const handleBack = () => { const handleBack = () => {
history.goBack(); history.goBack();
}; };
...@@ -47,13 +56,13 @@ const JobInfo = (props: any) => { ...@@ -47,13 +56,13 @@ const JobInfo = (props: any) => {
return ( return (
<PageContainer <PageContainer
header={{ header={{
title: `${job?.instance.name}`, title: `${job?.instance?.name}`,
ghost: true, ghost: true,
extra: [ extra: [
<Button key="back" type="dashed" onClick={handleBack}>返回</Button>, <Button key="back" type="dashed" onClick={handleBack}>返回</Button>,
<Button key="refresh" icon={<RedoOutlined/>}/>, <Button key="refresh" icon={<RedoOutlined/>} onClick={handleRefreshJobInfoDetail}/>,
<Button key="flinkwebui"> <Button key="flinkwebui">
<Link href={`http://${job?.history.jobManagerAddress}`} target="_blank"> <Link href={`http://${job?.history?.jobManagerAddress}/#/job/${job?.instance?.jid}/overview`} target="_blank">
FlinkWebUI FlinkWebUI
</Link></Button>, </Link></Button>,
<Button key="autorestart" type="primary">智能重启</Button>, <Button key="autorestart" type="primary">智能重启</Button>,
...@@ -77,20 +86,20 @@ const JobInfo = (props: any) => { ...@@ -77,20 +86,20 @@ const JobInfo = (props: any) => {
}} }}
content={<> content={<>
<Space size={0}> <Space size={0}>
{job?.instance.jid ? ( {job?.instance?.jid ? (
<Tag color="blue" key={job?.instance.jid}> <Tag color="blue" key={job?.instance?.jid}>
<FireOutlined/> {job?.instance.jid} <FireOutlined/> {job?.instance?.jid}
</Tag> </Tag>
) : undefined} ) : undefined}
<JobStatus status={job?.instance.status}/> <JobStatus status={job?.instance?.status}/>
{job?.history.type ? ( {job?.history?.type ? (
<Tag color="blue" key={job?.history.type}> <Tag color="blue" key={job?.history?.type}>
<RocketOutlined/> {job?.history.type} <RocketOutlined/> {job?.history?.type}
</Tag> </Tag>
) : undefined} ) : undefined}
{job?.cluster.alias ? ( {job?.cluster?.alias ? (
<Tag color="green" key={job?.cluster.alias}> <Tag color="green" key={job?.cluster?.alias}>
<ClusterOutlined/> {job?.cluster.alias} <ClusterOutlined/> {job?.cluster?.alias}
</Tag> </Tag>
) : (<Tag color="green" key='local'> ) : (<Tag color="green" key='local'>
<ClusterOutlined/> 本地环境 <ClusterOutlined/> 本地环境
...@@ -136,6 +145,7 @@ const JobInfo = (props: any) => { ...@@ -136,6 +145,7 @@ const JobInfo = (props: any) => {
> >
<ProCard> <ProCard>
{tabKey === 'base' ? <BaseInfo job={job}/> : undefined} {tabKey === 'base' ? <BaseInfo job={job}/> : undefined}
{tabKey === 'config' ? <Config job={job}/> : undefined}
</ProCard> </ProCard>
</PageContainer> </PageContainer>
); );
......
...@@ -38,3 +38,14 @@ export type JobInfoDetail = { ...@@ -38,3 +38,14 @@ export type JobInfoDetail = {
task: TaskTableListItem, task: TaskTableListItem,
history: HistoryItem history: HistoryItem
} }
export type VerticesTableListItem = {
name: string,
status: string,
metrics: any,
parallelism: number,
startTime: string,
duration: number,
endTime: string,
tasks:any,
}
...@@ -7,3 +7,7 @@ export function getStatusCount() { ...@@ -7,3 +7,7 @@ export function getStatusCount() {
export function getJobInfoDetail(id:number) { export function getJobInfoDetail(id:number) {
return getData("api/jobInstance/getJobInfoDetail",{id}); return getData("api/jobInstance/getJobInfoDetail",{id});
} }
export function refreshJobInfoDetail(id:number) {
return getData("api/jobInstance/refreshJobInfoDetail",{id});
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment