Unverified Commit 6a862447 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-545][admin] Add Task Pool to solve frequent database writes

[Feature-545][admin] Add Task Pool to solve frequent database writes
parents 5de90732 3aaa5707
package com.dlink.controller;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.Jar;
import com.dlink.model.JobInstance;
import com.dlink.service.JobInstanceService;
import com.dlink.service.TaskService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
/**
* JobInstanceController
......@@ -35,7 +42,7 @@ public class JobInstanceController {
*/
@PostMapping
public ProTableResult<JobInstance> listJobInstances(@RequestBody JsonNode para) {
return jobInstanceService.selectForProTable(para);
return jobInstanceService.listJobInstances(para);
}
/**
......
package com.dlink.job;
import java.time.Duration;
import java.time.LocalDateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import com.dlink.assertion.Asserts;
import com.dlink.context.SpringContextUtils;
import com.dlink.daemon.constant.FlinkTaskConstant;
......@@ -9,12 +16,6 @@ import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.model.JobInstance;
import com.dlink.model.JobStatus;
import com.dlink.service.TaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import java.time.Duration;
import java.time.LocalDateTime;
@DependsOn("springContextUtils")
public class FlinkJobTask implements DaemonTask {
......@@ -54,8 +55,10 @@ public class FlinkJobTask implements DaemonTask {
preDealTime = System.currentTimeMillis();
JobInstance jobInstance = taskService.refreshJobInstance(config.getId(), false);
if ((!JobStatus.isDone(jobInstance.getStatus())) || (Asserts.isNotNull(jobInstance.getFinishTime())
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
DefaultThreadPool.getInstance().execute(this);
} else {
FlinkJobTaskPool.getInstance().remove(config.getId().toString());
}
}
}
package com.dlink.job;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.dlink.model.JobInfoDetail;
import com.dlink.pool.AbstractPool;
/**
* FlinkJobTaskPool
*
* @author wenmo
* @since 2022/5/28 16:39
*/
public class FlinkJobTaskPool extends AbstractPool<JobInfoDetail> {
private static volatile Map<String, JobInfoDetail> flinkJobTaskEntityMap = new ConcurrentHashMap<>();
private static FlinkJobTaskPool instance = new FlinkJobTaskPool();
public static FlinkJobTaskPool getInstance(){
return instance;
}
@Override
public Map<String, JobInfoDetail> getMap() {
return flinkJobTaskEntityMap;
}
public void refresh(JobInfoDetail entity) {
entity.refresh();
}
}
......@@ -14,9 +14,11 @@ public class JobInfoDetail {
private ClusterConfiguration clusterConfiguration;
private History history;
private JobHistory jobHistory;
private Integer refreshCount;
public JobInfoDetail(Integer id) {
this.id = id;
this.refreshCount = 0;
}
public Integer getId() {
......@@ -66,4 +68,15 @@ public class JobInfoDetail {
public void setJobHistory(JobHistory jobHistory) {
this.jobHistory = jobHistory;
}
public void refresh() {
refreshCount = refreshCount + 1;
if (isNeedSave()) {
refreshCount = 0;
}
}
public boolean isNeedSave() {
return refreshCount % 60 == 0;
}
}
......@@ -15,5 +15,5 @@ public interface JobHistoryService extends ISuperService<JobHistory> {
JobHistory getJobHistoryInfo(JobHistory jobHistory);
JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId);
JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave);
}
package com.dlink.service;
import java.util.List;
import com.dlink.common.result.ProTableResult;
import com.dlink.db.service.ISuperService;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.JobInstanceStatus;
import java.util.List;
import com.fasterxml.jackson.databind.JsonNode;
/**
* JobInstanceService
......@@ -27,4 +29,6 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
LineageResult getLineage(Integer id);
JobInstance getJobInstanceByTaskId(Integer id);
ProTableResult<JobInstance> listJobInstances(JsonNode para);
}
package com.dlink.service.impl;
import org.springframework.stereotype.Service;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl;
......@@ -8,7 +10,6 @@ import com.dlink.model.JobHistory;
import com.dlink.service.JobHistoryService;
import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.JsonNode;
import org.springframework.stereotype.Service;
/**
* JobHistoryServiceImpl
......@@ -64,7 +65,7 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
}
@Override
public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId) {
public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave) {
JobHistory jobHistory = new JobHistory();
jobHistory.setId(id);
try {
......@@ -78,10 +79,12 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
jobHistory.setCheckpointsJson(JSONUtil.toJsonString(checkPoints));
jobHistory.setCheckpointsConfigJson(JSONUtil.toJsonString(checkPointsConfig));
jobHistory.setConfigJson(JSONUtil.toJsonString(jobsConfig));
if (Asserts.isNotNull(getById(id))) {
updateById(jobHistory);
} else {
save(jobHistory);
if (needSave) {
if (Asserts.isNotNull(getById(id))) {
updateById(jobHistory);
} else {
save(jobHistory);
}
}
} catch (Exception e) {
} finally {
......
package com.dlink.service.impl;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.dlink.assertion.Asserts;
import com.dlink.common.result.ProTableResult;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.db.util.ProTableUtil;
import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.job.FlinkJobTaskPool;
import com.dlink.mapper.JobInstanceMapper;
import com.dlink.model.*;
import com.dlink.service.*;
import com.dlink.model.History;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.JobInstanceCount;
import com.dlink.model.JobInstanceStatus;
import com.dlink.model.JobStatus;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.HistoryService;
import com.dlink.service.JobHistoryService;
import com.dlink.service.JobInstanceService;
import com.dlink.utils.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* JobInstanceServiceImpl
......@@ -99,15 +117,22 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
@Override
public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) {
Asserts.checkNull(jobInstance, "该任务实例不存在");
JobInfoDetail jobInfoDetail = new JobInfoDetail(jobInstance.getId());
jobInfoDetail.setInstance(jobInstance);
jobInfoDetail.setCluster(clusterService.getById(jobInstance.getClusterId()));
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(jobInstance.getId()));
History history = historyService.getById(jobInstance.getHistoryId());
history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
jobInfoDetail.setHistory(history);
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
JobInfoDetail jobInfoDetail;
FlinkJobTaskPool pool = FlinkJobTaskPool.getInstance();
String key = jobInstance.getId().toString();
if (pool.exist(key)) {
jobInfoDetail = pool.get(key);
} else {
jobInfoDetail = new JobInfoDetail(jobInstance.getId());
jobInfoDetail.setInstance(jobInstance);
jobInfoDetail.setCluster(clusterService.getById(jobInstance.getClusterId()));
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(jobInstance.getId()));
History history = historyService.getById(jobInstance.getHistoryId());
history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
jobInfoDetail.setHistory(history);
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
}
}
return jobInfoDetail;
}
......@@ -123,4 +148,27 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
return baseMapper.getJobInstanceByTaskId(id);
}
@Override
public ProTableResult<JobInstance> listJobInstances(JsonNode para) {
Integer current = para.has("current") ? para.get("current").asInt() : 1;
Integer pageSize = para.has("pageSize") ? para.get("pageSize").asInt() : 10;
QueryWrapper<JobInstance> queryWrapper = new QueryWrapper<>();
ProTableUtil.autoQueryDefalut(para, queryWrapper);
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> param = mapper.convertValue(para, Map.class);
Page<JobInstance> page = new Page<>(current, pageSize);
List<JobInstance> list = baseMapper.selectForProTable(page, queryWrapper, param);
FlinkJobTaskPool pool = FlinkJobTaskPool.getInstance();
for (int i = 0; i < list.size(); i++) {
if (pool.exist(list.get(i).getId().toString())) {
list.get(i).setStatus(pool.get(list.get(i).getId().toString()).getInstance().getStatus());
list.get(i).setUpdateTime(pool.get(list.get(i).getId().toString()).getInstance().getUpdateTime());
list.get(i).setFinishTime(pool.get(list.get(i).getId().toString()).getInstance().getFinishTime());
list.get(i).setError(pool.get(list.get(i).getId().toString()).getInstance().getError());
list.get(i).setDuration(pool.get(list.get(i).getId().toString()).getInstance().getDuration());
}
}
return ProTableResult.<JobInstance>builder().success(true).data(list).total(page.getTotal()).current(current).pageSize(pageSize).build();
}
}
......@@ -35,6 +35,7 @@ import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.job.FlinkJobTask;
import com.dlink.job.FlinkJobTaskPool;
import com.dlink.job.Job;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
......@@ -47,6 +48,7 @@ import com.dlink.model.AlertHistory;
import com.dlink.model.AlertInstance;
import com.dlink.model.Cluster;
import com.dlink.model.DataBase;
import com.dlink.model.History;
import com.dlink.model.Jar;
import com.dlink.model.JobHistory;
import com.dlink.model.JobInfoDetail;
......@@ -63,6 +65,7 @@ import com.dlink.service.AlertHistoryService;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.DataBaseService;
import com.dlink.service.HistoryService;
import com.dlink.service.JarService;
import com.dlink.service.JobHistoryService;
import com.dlink.service.JobInstanceService;
......@@ -101,6 +104,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private AlertGroupService alertGroupService;
@Autowired
private AlertHistoryService alertHistoryService;
@Autowired
private HistoryService historyService;
@Value("${spring.datasource.driver-class-name}")
private String driver;
......@@ -581,30 +586,53 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public JobInstance refreshJobInstance(Integer id, boolean isCoercive) {
JobInstance jobInstance = jobInstanceService.getById(id);
Asserts.checkNull(jobInstance, "该任务实例不存在");
if (!isCoercive && !inRefreshPlan(jobInstance)) {
return jobInstance;
JobInfoDetail jobInfoDetail;
FlinkJobTaskPool pool = FlinkJobTaskPool.getInstance();
String key = id.toString();
if (pool.exist(key)) {
jobInfoDetail = pool.get(key);
} else {
jobInfoDetail = new JobInfoDetail(id);
JobInstance jobInstance = jobInstanceService.getById(id);
Asserts.checkNull(jobInstance, "该任务实例不存在");
jobInfoDetail.setInstance(jobInstance);
Cluster cluster = clusterService.getById(jobInstance.getClusterId());
jobInfoDetail.setCluster(cluster);
History history = historyService.getById(jobInstance.getHistoryId());
history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
}
jobInfoDetail.setHistory(history);
pool.push(key, jobInfoDetail);
}
String status = jobInstance.getStatus();
Cluster cluster = clusterService.getById(jobInstance.getClusterId());
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, cluster.getJobManagerHost(), jobInstance.getJid());
if (!isCoercive && !inRefreshPlan(jobInfoDetail.getInstance())) {
return jobInfoDetail.getInstance();
}
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(), jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
if (Asserts.isNull(jobHistory.getJob()) || jobHistory.getJob().has(FlinkRestResultConstant.ERRORS)) {
jobInstance.setStatus(JobStatus.UNKNOWN.getValue());
jobInfoDetail.setJobHistory(jobHistory);
String status = jobInfoDetail.getInstance().getStatus();
boolean jobStatusChanged = false;
if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob()) || jobInfoDetail.getJobHistory().getJob().has(FlinkRestResultConstant.ERRORS)) {
jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue());
} else {
jobInstance.setDuration(jobHistory.getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
jobInstance.setStatus(jobHistory.getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
jobInfoDetail.getInstance().setDuration(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
jobInfoDetail.getInstance().setStatus(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
}
if (JobStatus.isDone(jobInstance.getStatus()) && !status.equals(jobInstance.getStatus())) {
jobInstance.setFinishTime(LocalDateTime.now());
handleJobDone(jobInstance);
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && !status.equals(jobInfoDetail.getInstance().getStatus())) {
jobStatusChanged = true;
jobInfoDetail.getInstance().setFinishTime(LocalDateTime.now());
handleJobDone(jobInfoDetail.getInstance());
}
if (isCoercive) {
DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId()));
DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInfoDetail.getInstance().getId()));
}
if (jobStatusChanged || jobInfoDetail.isNeedSave()) {
jobInstanceService.updateById(jobInfoDetail.getInstance());
}
jobInstanceService.updateById(jobInstance);
return jobInstance;
pool.refresh(jobInfoDetail);
return jobInfoDetail.getInstance();
}
private boolean inRefreshPlan(JobInstance jobInstance) {
......
package com.dlink.pool;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* AbstractPool
*
* @author wenmo
* @since 2022/5/28 19:40
*/
public abstract class AbstractPool<T>{
public abstract Map<String, T> getMap();
public boolean exist(String key) {
if (getMap().containsKey(key)) {
return true;
}
return false;
}
public int push(String key, T entity) {
getMap().put(key, entity);
return getMap().size();
}
public int remove(String key) {
getMap().remove(key);
return getMap().size();
}
public T get(String key) {
return getMap().get(key);
}
public abstract void refresh(T entity);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment