Unverified Commit 4aed0227 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-1110][admin,web,core,process] Add process manager and remove root log (#1111)

Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent 0ee0a855
......@@ -199,11 +199,14 @@
<artifactId>dlink-client-hadoop</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-scheduler</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-process</artifactId>
</dependency>
</dependencies>
<build>
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.process.model.ProcessEntity;
import com.dlink.service.ProcessService;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* ProcessController
*
* @author wenmo
* @since 2022/10/16 22:53
*/
@RestController
@RequestMapping("/api/process")
public class ProcessController {
@Autowired
private ProcessService processService;
@GetMapping("/listAllProcess")
public ProTableResult<ProcessEntity> listAllProcess(@RequestParam boolean active) {
List<ProcessEntity> processEntities = processService.listAllProcess(active);
return ProTableResult.<ProcessEntity>builder().success(true).data(processEntities).build();
}
}
......@@ -46,6 +46,7 @@ public class FlinkJobTaskPool extends AbstractPool<JobInfoDetail> {
return flinkJobTaskEntityMap;
}
@Override
public void refresh(JobInfoDetail entity) {
entity.refresh();
}
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service;
import com.dlink.process.model.ProcessEntity;
import java.util.List;
/**
* ProcessService
*
* @author wenmo
* @since 2022/10/16 22:05
*/
public interface ProcessService {
List<ProcessEntity> listAllProcess(boolean active);
}
......@@ -19,12 +19,11 @@
package com.dlink.service;
import com.dlink.job.JobConfig;
/**
* @author ZackYoung
* @since 0.6.8
*/
public interface UDFService {
void initUDF(JobConfig config, String statement);
String[] initUDF(String statement);
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service.impl;
import com.dlink.process.model.ProcessEntity;
import com.dlink.process.pool.ProcessPool;
import com.dlink.service.ProcessService;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.stereotype.Service;
/**
* ProcessServiceImpl
*
* @author wenmo
* @since 2022/10/16 22:45
*/
@Service
public class ProcessServiceImpl implements ProcessService {
@Override
public List<ProcessEntity> listAllProcess(boolean active) {
Map<String, ProcessEntity> processEntityMap = ProcessPool.getInstance().getMap();
if (active) {
return processEntityMap.values().stream().filter(x -> x.isActiveProcess())
.sorted(Comparator.comparing(ProcessEntity::getStartTime).reversed()).collect(Collectors.toList());
}
return processEntityMap.values().stream().sorted(Comparator.comparing(ProcessEntity::getStartTime).reversed())
.collect(Collectors.toList());
}
}
......@@ -47,6 +47,10 @@ import com.dlink.model.Schema;
import com.dlink.model.SystemConfiguration;
import com.dlink.model.Table;
import com.dlink.model.Task;
import com.dlink.process.context.ProcessContextHolder;
import com.dlink.process.model.ProcessEntity;
import com.dlink.process.model.ProcessType;
import com.dlink.process.pool.ProcessPool;
import com.dlink.result.DDLResult;
import com.dlink.result.IResult;
import com.dlink.result.SelectResult;
......@@ -81,7 +85,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* StudioServiceImpl
*
......@@ -111,20 +114,40 @@ public class StudioServiceImpl implements StudioService {
private UDFService udfService;
private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) {
ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Start Initialize FlinkSQLEnv:");
if (statementDTO.isFragment()) {
process.config("Variable opened.");
// initialize global variables
process.info("Initializing Global Variables...");
statementDTO.setVariables(fragmentVariableService.listEnabledVariables());
process.infoSuccess();
// initialize database variables
process.info("Initializing Database Variables...");
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
if (statementDTO.isFragment() && Asserts.isNotNullString(flinkWithSql)) {
statementDTO.setStatement(flinkWithSql + "\r\n" + statementDTO.getStatement());
if (Asserts.isNotNullString(flinkWithSql)) {
statementDTO.setStatement(flinkWithSql + "\n" + statementDTO.getStatement());
process.infoSuccess();
} else {
process.info("No variables are loaded.");
}
}
// initialize flinksql environment, such as flink catalog
if (Asserts.isNotNull(statementDTO.getEnvId()) && !statementDTO.getEnvId().equals(0)) {
process.config("FlinkSQLEnv opened.");
process.info("Initializing FlinkSQLEnv...");
Task task = taskService.getTaskInfoById(statementDTO.getEnvId());
if (Asserts.isNotNull(task) && Asserts.isNotNullString(task.getStatement())) {
statementDTO.setStatement(task.getStatement() + "\r\n" + statementDTO.getStatement());
statementDTO.setStatement(task.getStatement() + "\n" + statementDTO.getStatement());
process.infoSuccess();
} else {
process.info("No FlinkSQLEnv are loaded.");
}
}
process.info("Finish Initialize FlinkSQLEnv.");
}
private void buildSession(JobConfig config) {
......@@ -149,7 +172,7 @@ public class StudioServiceImpl implements StudioService {
JobConfig config = studioExecuteDTO.getJobConfig();
buildSession(config);
// To initialize java udf, but it only support local mode.
udfService.initUDF(config, studioExecuteDTO.getStatement());
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement()));
JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
RunTimeUtil.recovery(jobManager);
......@@ -202,7 +225,8 @@ public class StudioServiceImpl implements StudioService {
public IResult executeDDL(StudioDDLDTO studioDDLDTO) {
JobConfig config = studioDDLDTO.getJobConfig();
if (!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId()));
config.setAddress(
clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeDDL(studioDDLDTO.getStatement());
......@@ -218,20 +242,32 @@ public class StudioServiceImpl implements StudioService {
}
private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO) {
Map<String, ProcessEntity> map = ProcessPool.getInstance().getMap();
ProcessEntity process = ProcessContextHolder.registerProcess(
ProcessEntity.init(ProcessType.FLINKEXPLAIN, 1, "admin"));
addFlinkSQLEnv(studioExecuteDTO);
process.info("Initializing Flink Job Config...");
JobConfig config = studioExecuteDTO.getJobConfig();
// If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local.
config.buildLocal();
buildSession(config);
process.infoSuccess();
// To initialize java udf, but it has a bug in the product environment now.
// initUDF(config,studioExecuteDTO.getStatement());
process.start();
JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
List<SqlExplainResult> sqlExplainResults =
jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
process.finish();
return sqlExplainResults;
}
private List<SqlExplainResult> explainCommonSql(StudioExecuteDTO studioExecuteDTO) {
if (Asserts.isNull(studioExecuteDTO.getDatabaseId())) {
return new ArrayList<SqlExplainResult>() {
{
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "请指定数据源"));
}
......@@ -240,6 +276,7 @@ public class StudioServiceImpl implements StudioService {
DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return new ArrayList<SqlExplainResult>() {
{
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "数据源不存在"));
}
......@@ -318,7 +355,8 @@ public class StudioServiceImpl implements StudioService {
@Override
public LineageResult getLineage(StudioCADTO studioCADTO) {
if (Asserts.isNotNullString(studioCADTO.getDialect()) && !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) {
if (Asserts.isNotNullString(studioCADTO.getDialect())
&& !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) {
if (Asserts.isNull(studioCADTO.getDatabaseId())) {
return null;
}
......@@ -327,9 +365,11 @@ public class StudioServiceImpl implements StudioService {
return null;
}
if (studioCADTO.getDialect().equalsIgnoreCase("doris")) {
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql", dataBase.getDriverConfig());
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql",
dataBase.getDriverConfig());
} else {
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(),
studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
}
} else {
addFlinkSQLEnv(studioCADTO);
......@@ -360,7 +400,8 @@ public class StudioServiceImpl implements StudioService {
JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost());
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
}
JobManager jobManager = JobManager.build(jobConfig);
......@@ -378,7 +419,8 @@ public class StudioServiceImpl implements StudioService {
jobConfig.setType(cluster.getType());
// 如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
jobConfig.setTaskId(cluster.getTaskId());
......@@ -436,7 +478,8 @@ public class StudioServiceImpl implements StudioService {
}
}
for (Catalog catalog : catalogs) {
String statement = FlinkQuery.useCatalog(catalog.getName()) + FlinkQuery.separator() + FlinkQuery.showDatabases();
String statement = FlinkQuery.useCatalog(catalog.getName()) + FlinkQuery.separator()
+ FlinkQuery.showDatabases();
studioMetaStoreDTO.setStatement(statement);
IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
......@@ -524,8 +567,7 @@ public class StudioServiceImpl implements StudioService {
item.get(FlinkQuery.columnKey()).toString(),
item.get(FlinkQuery.columnNull()).toString(),
item.get(FlinkQuery.columnExtras()).toString(),
item.get(FlinkQuery.columnWatermark()).toString()
);
item.get(FlinkQuery.columnWatermark()).toString());
columns.add(column);
i++;
}
......
......@@ -189,7 +189,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private UDFService udfService;
private String buildParas(Integer id) {
return "--id " + id + " --driver " + driver + " --url " + url + " --username " + username + " --password " + password;
return "--id " + id + " --driver " + driver + " --url " + url + " --username " + username + " --password "
+ password;
}
@Override
......@@ -201,7 +202,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.getDatabaseId(), null));
}
JobConfig config = buildJobConfig(task);
udfService.initUDF(config, task.getStatement());
config.setJarFiles(udfService.initUDF(task.getStatement()));
JobManager jobManager = JobManager.build(config);
if (!config.isJarTask()) {
return jobManager.executeSql(task.getStatement());
......@@ -307,6 +308,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private List<SqlExplainResult> explainCommonSqlTask(Task task) {
if (Asserts.isNull(task.getDatabaseId())) {
return new ArrayList<SqlExplainResult>() {
{
add(SqlExplainResult.fail(task.getStatement(), "请指定数据源"));
}
......@@ -315,6 +317,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
DataBase dataBase = dataBaseService.getById(task.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return new ArrayList<SqlExplainResult>() {
{
add(SqlExplainResult.fail(task.getStatement(), "数据源不存在"));
}
......@@ -474,7 +477,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public Task getUDFByClassName(String className) {
Task task = getOne(new QueryWrapper<Task>().eq("dialect", "Java").eq("enabled", 1).eq("save_point_path", className));
Task task = getOne(
new QueryWrapper<Task>().eq("dialect", "Java").eq("enabled", 1).eq("save_point_path", className));
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
return task;
......@@ -482,7 +486,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public List<Task> getAllUDF() {
List<Task> tasks = list(new QueryWrapper<Task>().eq("dialect", "Java").eq("enabled", 1).isNotNull("save_point_path"));
List<Task> tasks =
list(new QueryWrapper<Task>().eq("dialect", "Java").eq("enabled", 1).isNotNull("save_point_path"));
return tasks.stream().peek(task -> {
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
......@@ -517,7 +522,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
public Task createTaskVersionSnapshot(Task task) {
List<TaskVersion> taskVersions = taskVersionService.getTaskVersionByTaskId(task.getId());
List<Integer> versionIds = taskVersions.stream().map(TaskVersion::getVersionId).collect(Collectors.toList());
Map<Integer, TaskVersion> versionMap = taskVersions.stream().collect(Collectors.toMap(TaskVersion::getVersionId, t -> t));
Map<Integer, TaskVersion> versionMap =
taskVersions.stream().collect(Collectors.toMap(TaskVersion::getVersionId, t -> t));
TaskVersion taskVersion = new TaskVersion();
BeanUtil.copyProperties(task, taskVersion);
......@@ -539,7 +545,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
version.setId(null);
if (versionIds.contains(task.getVersionId()) && !taskVersion.equals(version)) {
//|| !versionIds.contains(task.getVersionId()) && !taskVersion.equals(version)
// || !versionIds.contains(task.getVersionId()) && !taskVersion.equals(version)
taskVersion.setVersionId(Collections.max(versionIds) + 1);
task.setVersionId(Collections.max(versionIds) + 1);
taskVersionService.save(taskVersion);
......@@ -562,9 +568,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return Result.failed("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
}
LambdaQueryWrapper<TaskVersion> queryWrapper = new LambdaQueryWrapper<TaskVersion>().
eq(TaskVersion::getTaskId, dto.getId()).
eq(TaskVersion::getVersionId, dto.getVersionId());
LambdaQueryWrapper<TaskVersion> queryWrapper = new LambdaQueryWrapper<TaskVersion>()
.eq(TaskVersion::getTaskId, dto.getId()).eq(TaskVersion::getVersionId, dto.getVersionId());
TaskVersion taskVersion = taskVersionService.getOne(queryWrapper);
......@@ -710,7 +715,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
useGateway = true;
}
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
useGateway = true;
......@@ -770,13 +776,16 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Map<String, Object> gatewayConfig = JSONUtil.toMap(task.getStatement(), String.class, Object.class);
config.buildGatewayConfig(gatewayConfig);
} else {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
// submit application type with clusterConfiguration
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType()) || GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if (!isJarTask) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas", systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarParas",
systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarMainAppClass", systemConfiguration.getSqlSubmitJarMainAppClass());
} else {
Jar jar = jarService.getById(task.getJarId());
......@@ -832,7 +841,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
History history = historyService.getById(jobInstance.getHistoryId());
history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
jobInfoDetail.setClusterConfiguration(
clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
}
jobInfoDetail.setHistory(history);
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(id));
......@@ -841,10 +851,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if (!isCoercive && !inRefreshPlan(jobInfoDetail.getInstance())) {
return jobInfoDetail.getInstance();
}
JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(), jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
JobHistory jobHistoryJson =
jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(),
jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
jobInfoDetail.setJobHistory(jobHistory);
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && (Asserts.isNull(jobHistory.getJob()) || jobHistory.isError())) {
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus())
&& (Asserts.isNull(jobHistory.getJob()) || jobHistory.isError())) {
return jobInfoDetail.getInstance();
}
String status = jobInfoDetail.getInstance().getStatus();
......@@ -852,10 +865,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob()) || jobInfoDetail.getJobHistory().isError()) {
jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue());
} else {
jobInfoDetail.getInstance().setDuration(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
jobInfoDetail.getInstance().setStatus(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
jobInfoDetail.getInstance().setDuration(
jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
jobInfoDetail.getInstance()
.setStatus(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
}
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && !status.equals(jobInfoDetail.getInstance().getStatus())) {
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus())
&& !status.equals(jobInfoDetail.getInstance().getStatus())) {
jobStatusChanged = true;
jobInfoDetail.getInstance().setFinishTime(LocalDateTime.now());
// handleJobDone(jobInfoDetail.getInstance());
......@@ -918,8 +934,10 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
((ObjectNode) jsonNode).put("path", getTaskPathByTaskId(taskId));
// clusterConfigurationName
if (Asserts.isNotNull(task.getClusterConfigurationId())) {
ClusterConfiguration clusterConfiguration = clusterConfigurationService.getById(task.getClusterConfigurationId());
((ObjectNode) jsonNode).put("clusterConfigurationName", Asserts.isNotNull(clusterConfiguration) ? clusterConfiguration.getName() : null);
ClusterConfiguration clusterConfiguration =
clusterConfigurationService.getById(task.getClusterConfigurationId());
((ObjectNode) jsonNode).put("clusterConfigurationName",
Asserts.isNotNull(clusterConfiguration) ? clusterConfiguration.getName() : null);
}
// databaseName
if (Asserts.isNotNull(task.getDatabaseId())) {
......@@ -975,11 +993,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
List<JsonNode> jsonNodes = new ArrayList<>();
if (jsonNode.isArray()) {
for (JsonNode a : jsonNode) {
/*if(a.get("dialect").asText().equals("FlinkSqlEnv")){
jsonNodes.add(0,a);
}else{
jsonNodes.add(a);
}*/
/*
* if(a.get("dialect").asText().equals("FlinkSqlEnv")){ jsonNodes.add(0,a); }else{ jsonNodes.add(a); }
*/
jsonNodes.add(a);
}
} else {
......@@ -997,13 +1013,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
if (Asserts.isNotNull(task.getClusterConfigurationName())) {
ClusterConfiguration clusterConfiguration = clusterConfigurationService
.getOne(new QueryWrapper<ClusterConfiguration>().eq("name", task.getClusterConfigurationName()));
.getOne(new QueryWrapper<ClusterConfiguration>().eq("name",
task.getClusterConfigurationName()));
if (Asserts.isNotNull(clusterConfiguration)) {
task.setClusterConfigurationId(clusterConfiguration.getId());
}
}
if (Asserts.isNotNull(task.getDatabaseName())) {
DataBase dataBase = dataBaseService.getOne(new QueryWrapper<DataBase>().eq("name", task.getDatabaseName()));
DataBase dataBase =
dataBaseService.getOne(new QueryWrapper<DataBase>().eq("name", task.getDatabaseName()));
if (Asserts.isNotNull(dataBase)) {
task.setDatabaseId(dataBase.getId());
}
......@@ -1014,14 +1032,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.setJarId(jar.getId());
}
}
/*if(Asserts.isNotNull(task.getEnvName())){
Task task1 = getOne(new QueryWrapper<Task>().eq("name", task.getEnvName()));
if(Asserts.isNotNull(task1)){
task.setEnvId(task1.getId());
}
}*/
/*
* if(Asserts.isNotNull(task.getEnvName())){ Task task1 = getOne(new QueryWrapper<Task>().eq("name",
* task.getEnvName())); if(Asserts.isNotNull(task1)){ task.setEnvId(task1.getId()); } }
*/
if (Asserts.isNotNull(task.getAlertGroupName())) {
AlertGroup alertGroup = alertGroupService.getOne(new QueryWrapper<AlertGroup>().eq("name", task.getAlertGroupName()));
AlertGroup alertGroup =
alertGroupService.getOne(new QueryWrapper<AlertGroup>().eq("name", task.getAlertGroupName()));
if (Asserts.isNotNull(alertGroup)) {
task.setAlertGroupId(alertGroup.getId());
}
......@@ -1056,7 +1073,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if (errorNumber > 0 && errorNumber == jsonNodes.size()) {
return Result.failed("一共" + jsonNodes.size() + "个作业,全部导入失败");
} else if (errorNumber > 0) {
return Result.failed("一共" + jsonNodes.size() + "个作业,其中成功导入" + (jsonNode.size() - errorNumber) + "个,失败" + errorNumber + "个");
return Result.failed("一共" + jsonNodes.size() + "个作业,其中成功导入" + (jsonNode.size() - errorNumber) + "个,失败"
+ errorNumber + "个");
}
return Result.succeed("成功导入" + jsonNodes.size() + "个作业");
}
......@@ -1106,7 +1124,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
long hours = ChronoUnit.HOURS.between(startTime, endTime);
long minutes = ChronoUnit.MINUTES.between(startTime, endTime);
long seconds = ChronoUnit.SECONDS.between(startTime, endTime);
String duration = days + "天 " + (hours - (days * 24)) + "小时 " + (minutes - (hours * 60)) + "分 " + (seconds - (minutes * 60)) + "秒";
String duration = days + "天 " + (hours - (days * 24)) + "小时 " + (minutes - (hours * 60)) + "分 "
+ (seconds - (minutes * 60)) + "秒";
return duration;
}
......@@ -1140,8 +1159,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
String startTime = dateFormat.format(asLongStartTime);
String endTime = dateFormat.format(asLongEndTime);
// Long duration = jsonNodes.get("duration").asLong();
String duration = getDuration(asLongStartTime, asLongEndTime); // 获取任务的 duration 使用的是 start-time 和 end-time 计算 不采用 duration 字段
String duration = getDuration(asLongStartTime, asLongEndTime);
// 获取任务的 duration 使用的是 start-time 和 end-time 计算
// 不采用 duration 字段
String clusterJson = jobHistory.getClusterJson(); // 获取任务历史信息的clusterJson 主要获取 jobManagerHost
ObjectNode clusterJsonNodes = JSONUtil.parseObject(clusterJson);
String jobManagerHost = clusterJsonNodes.get("jobManagerHost").asText();
......@@ -1184,7 +1204,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
private void sendAlert(AlertInstance alertInstance, JobInstance jobInstance, Task task, AlertMsg alertMsg) {
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams()));
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(),
JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.build(alertConfig);
String title = "任务【" + task.getAlias() + "】:" + jobInstance.getStatus();
String content = alertMsg.toString();
......@@ -1229,8 +1250,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
@Override
public Result<List<Task>> queryOnLineTaskByDoneStatus(List<JobLifeCycle> jobLifeCycle, List<JobStatus> jobStatuses
, boolean includeNull, Integer catalogueId) {
public Result<List<Task>> queryOnLineTaskByDoneStatus(List<JobLifeCycle> jobLifeCycle, List<JobStatus> jobStatuses,
boolean includeNull, Integer catalogueId) {
final Tree<Integer> node = ((Tree<Integer>) queryAllCatalogue().getDatas())
.getNode(Objects.isNull(catalogueId) ? 0 : catalogueId);
final List<Integer> parentIds = new ArrayList<>(0);
......@@ -1240,12 +1261,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return Result.succeed(taskList);
}
private List<Task> getTasks(List<JobLifeCycle> jobLifeCycle, List<JobStatus> jobStatuses
, boolean includeNull, List<Integer> parentIds) {
return this.baseMapper.queryOnLineTaskByDoneStatus(parentIds
, jobLifeCycle.stream().filter(Objects::nonNull).map(JobLifeCycle::getValue).collect(Collectors.toList())
, includeNull
, jobStatuses.stream().map(JobStatus::name).collect(Collectors.toList()));
private List<Task> getTasks(List<JobLifeCycle> jobLifeCycle, List<JobStatus> jobStatuses, boolean includeNull,
List<Integer> parentIds) {
return this.baseMapper.queryOnLineTaskByDoneStatus(parentIds,
jobLifeCycle.stream().filter(Objects::nonNull).map(JobLifeCycle::getValue).collect(Collectors.toList()),
includeNull, jobStatuses.stream().map(JobStatus::name).collect(Collectors.toList()));
}
private void childrenNodeParse(Tree<Integer> node, List<Integer> parentIds) {
......@@ -1264,7 +1284,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public void selectSavepointOnLineTask(TaskOperatingResult taskOperatingResult) {
final JobInstance jobInstanceByTaskId = jobInstanceService.getJobInstanceByTaskId(taskOperatingResult.getTask().getId());
final JobInstance jobInstanceByTaskId =
jobInstanceService.getJobInstanceByTaskId(taskOperatingResult.getTask().getId());
if (jobInstanceByTaskId == null) {
startGoingLiveTask(taskOperatingResult, null);
return;
......@@ -1280,7 +1301,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
findTheConditionSavePointToOnline(taskOperatingResult, jobInstanceByTaskId);
}
private void findTheConditionSavePointToOnline(TaskOperatingResult taskOperatingResult, JobInstance jobInstanceByTaskId) {
private void findTheConditionSavePointToOnline(TaskOperatingResult taskOperatingResult,
JobInstance jobInstanceByTaskId) {
final LambdaQueryWrapper<JobHistory> queryWrapper = new LambdaQueryWrapper<JobHistory>()
.select(JobHistory::getId, JobHistory::getCheckpointsJson)
.eq(JobHistory::getId, jobInstanceByTaskId.getId());
......
......@@ -21,9 +21,10 @@ package com.dlink.service.impl;
import com.dlink.constant.PathConstant;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.model.Task;
import com.dlink.process.context.ProcessContextHolder;
import com.dlink.process.model.ProcessEntity;
import com.dlink.service.TaskService;
import com.dlink.service.UDFService;
import com.dlink.utils.UDFUtil;
......@@ -40,29 +41,31 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.map.MapUtil;
/**
* @author ZackYoung
* @since 0.6.8
*/
@Service
public class UDFServiceImpl implements UDFService {
/**
* 网关类型 map
* 快速获取 session 与 application 等类型,为了减少判断
*/
private static final Map<String, List<GatewayType>> GATEWAY_TYPE_MAP = MapUtil
.builder("session", Arrays.asList(GatewayType.YARN_SESSION, GatewayType.KUBERNETES_SESSION, GatewayType.STANDALONE))
.builder("session",
Arrays.asList(GatewayType.YARN_SESSION, GatewayType.KUBERNETES_SESSION, GatewayType.STANDALONE))
.build();
@Resource
TaskService taskService;
@Override
public void initUDF(JobConfig config, String statement) {
public String[] initUDF(String statement) {
ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Initializing Flink UDF...Start");
Opt<String> udfJarPath = Opt.empty();
List<String> udfClassNameList = JobManager.getUDFClassName(statement);
List<String> udfClassNameList = UDFUtil.getUDFClassName(statement);
List<String> codeList = CollUtil.map(udfClassNameList, x -> {
Task task = taskService.getUDFByClassName(x);
JobManager.initMustSuccessUDF(x, task.getStatement());
......@@ -71,7 +74,11 @@ public class UDFServiceImpl implements UDFService {
if (codeList.size() > 0) {
udfJarPath = Opt.ofBlankAble(UDFUtil.getUdfNameAndBuildJar(codeList));
}
udfJarPath.ifPresent(jarPath -> config.setJarFiles(new String[]{PathConstant.UDF_PATH + jarPath}));
process.info("Initializing Flink UDF...Finish");
if (udfJarPath.isPresent()) {
return new String[]{PathConstant.UDF_PATH + udfJarPath.get()};
} else {
return new String[0];
}
}
}
......@@ -15,18 +15,16 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<artifactId>dlink</artifactId>
<version>0.6.8-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>dlink-core</artifactId>
<packaging>jar</packaging>
<properties>
<java.version>1.8</java.version>
......@@ -176,5 +174,9 @@
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-process</artifactId>
</dependency>
</dependencies>
</project>
......@@ -38,6 +38,8 @@ import com.dlink.job.StatementParam;
import com.dlink.model.LineageRel;
import com.dlink.model.SystemConfiguration;
import com.dlink.parser.SqlType;
import com.dlink.process.context.ProcessContextHolder;
import com.dlink.process.model.ProcessEntity;
import com.dlink.result.ExplainResult;
import com.dlink.result.SqlExplainResult;
import com.dlink.trans.Operations;
......@@ -56,6 +58,8 @@ import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import cn.hutool.core.util.StrUtil;
/**
* Explainer
*
......@@ -110,7 +114,8 @@ public class Explainer {
continue;
}
SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT) || operationType.equals(SqlType.SHOW)
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)
|| operationType.equals(SqlType.SHOW)
|| operationType.equals(SqlType.DESCRIBE) || operationType.equals(SqlType.DESC)) {
trans.add(new StatementParam(statement, operationType));
statementList.add(statement);
......@@ -172,6 +177,8 @@ public class Explainer {
}
public ExplainResult explainSql(String statement) {
ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Start explain FlinkSQL...");
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
int index = 1;
......@@ -185,13 +192,15 @@ public class Explainer {
}
executor.executeSql(item.getValue());
} catch (Exception e) {
record.setError(LogUtil.getError(e));
String error = LogUtil.getError(e);
record.setError(error);
record.setExplainTrue(false);
record.setExplainTime(LocalDateTime.now());
record.setSql(item.getValue());
record.setIndex(index);
sqlExplainRecords.add(record);
correct = false;
process.error(error);
break;
}
record.setExplainTrue(true);
......@@ -216,10 +225,12 @@ public class Explainer {
record.setParseTrue(true);
record.setExplainTrue(true);
} catch (Exception e) {
record.setError(LogUtil.getError(e));
String error = LogUtil.getError(e);
record.setError(error);
record.setParseTrue(false);
record.setExplainTrue(false);
correct = false;
process.error(error);
} finally {
record.setType("Modify DML");
record.setExplainTime(LocalDateTime.now());
......@@ -236,10 +247,12 @@ public class Explainer {
record.setParseTrue(true);
record.setExplainTrue(true);
} catch (Exception e) {
record.setError(LogUtil.getError(e));
String error = LogUtil.getError(e);
record.setError(error);
record.setParseTrue(false);
record.setExplainTrue(false);
correct = false;
process.error(error);
} finally {
record.setType("Modify DML");
record.setExplainTime(LocalDateTime.now());
......@@ -262,13 +275,15 @@ public class Explainer {
record.setType("DATASTREAM");
record.setParseTrue(true);
} catch (Exception e) {
record.setError(LogUtil.getError(e));
String error = LogUtil.getError(e);
record.setError(error);
record.setExplainTrue(false);
record.setExplainTime(LocalDateTime.now());
record.setSql(item.getValue());
record.setIndex(index);
sqlExplainRecords.add(record);
correct = false;
process.error(error);
break;
}
record.setExplainTrue(true);
......@@ -277,6 +292,7 @@ public class Explainer {
record.setIndex(index++);
sqlExplainRecords.add(record);
}
process.info(StrUtil.format("A total of {} FlinkSQL have been Explained.", sqlExplainRecords.size()));
return new ExplainResult(correct, sqlExplainRecords.size(), sqlExplainRecords);
}
......@@ -345,7 +361,8 @@ public class Explainer {
for (int i = 0; i < results.size(); i++) {
TableCA sinkTableCA = (TableCA) results.get(i).getSinkTableCA();
if (Asserts.isNotNull(sinkTableCA)) {
sinkTableCA.setFields(FlinkUtil.getFieldNamesFromCatalogManager(catalogManager, sinkTableCA.getCatalog(), sinkTableCA.getDatabase(), sinkTableCA.getTable()));
sinkTableCA.setFields(FlinkUtil.getFieldNamesFromCatalogManager(catalogManager,
sinkTableCA.getCatalog(), sinkTableCA.getDatabase(), sinkTableCA.getTable()));
}
}
}
......@@ -364,7 +381,8 @@ public class Explainer {
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>();
for (int i = 0; i < sqlExplainRecords.size(); i++) {
if (Asserts.isNotNull(sqlExplainRecords.get(i).getType()) && sqlExplainRecords.get(i).getType().contains("DML")) {
if (Asserts.isNotNull(sqlExplainRecords.get(i).getType())
&& sqlExplainRecords.get(i).getType().contains("DML")) {
strPlans.add(sqlExplainRecords.get(i).getSql());
}
}
......@@ -400,7 +418,8 @@ public class Explainer {
private void correctColumn(ColumnCAResult columnCAResult) {
for (TableCA tableCA : columnCAResult.getTableCAS()) {
CatalogManager catalogManager = executor.getCatalogManager();
List<String> columnList = FlinkUtil.getFieldNamesFromCatalogManager(catalogManager, tableCA.getCatalog(), tableCA.getDatabase(), tableCA.getTable());
List<String> columnList = FlinkUtil.getFieldNamesFromCatalogManager(catalogManager, tableCA.getCatalog(),
tableCA.getDatabase(), tableCA.getTable());
List<String> fields = tableCA.getFields();
List<String> oldFields = new ArrayList<>();
oldFields.addAll(fields);
......@@ -410,7 +429,8 @@ public class Explainer {
if (!sinkColumnName.equals(oldFields.get(i))) {
for (Map.Entry<Integer, ColumnCA> item : columnCAResult.getColumnCASMaps().entrySet()) {
ColumnCA columnCA = item.getValue();
if (columnCA.getTableId().equals(tableCA.getId()) && columnCA.getName().equals(oldFields.get(i))) {
if (columnCA.getTableId().equals(tableCA.getId())
&& columnCA.getName().equals(oldFields.get(i))) {
columnCA.setName(sinkColumnName);
fields.set(i, sinkColumnName);
}
......@@ -421,14 +441,16 @@ public class Explainer {
}
for (TableCA tableCA : columnCAResult.getTableCAS()) {
CatalogManager catalogManager = executor.getCatalogManager();
List<String> columnList = FlinkUtil.getFieldNamesFromCatalogManager(catalogManager, tableCA.getCatalog(), tableCA.getDatabase(), tableCA.getTable());
List<String> columnList = FlinkUtil.getFieldNamesFromCatalogManager(catalogManager, tableCA.getCatalog(),
tableCA.getDatabase(), tableCA.getTable());
List<String> fields = tableCA.getFields();
int i = 0;
List<Integer> idList = new ArrayList<>();
while (i < fields.size()) {
if (!columnList.contains(fields.get(i))) {
for (Map.Entry<Integer, ColumnCA> item : columnCAResult.getColumnCASMaps().entrySet()) {
if (item.getValue().getName().equals(fields.get(i)) && item.getValue().getTableId().equals(tableCA.getId())) {
if (item.getValue().getName().equals(fields.get(i))
&& item.getValue().getTableId().equals(tableCA.getId())) {
idList.add(item.getValue().getId());
break;
}
......@@ -463,9 +485,14 @@ public class Explainer {
if (columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getSufId())
&& columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getPreId())
&& columnCAResult.getColumnCASMaps().containsKey(nodeRel.getSufId())
&& columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getTableId().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getTableId())
&& columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getName().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getName())
&& !columnCAResult.getColumnCASMaps().get(nodeRel2.getPreId()).getType().equals("Data Sink")) {
&& columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getTableId()
.equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId())
.getTableId())
&& columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getName()
.equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId())
.getName())
&& !columnCAResult.getColumnCASMaps().get(nodeRel2.getPreId()).getType()
.equals("Data Sink")) {
addNodeRels.add(new NodeRel(nodeRel2.getPreId(), nodeRel.getPreId()));
}
}
......
......@@ -40,6 +40,7 @@ import com.dlink.model.SystemConfiguration;
import com.dlink.parser.SqlType;
import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool;
import com.dlink.process.context.ProcessContextHolder;
import com.dlink.result.ErrorResult;
import com.dlink.result.ExplainResult;
import com.dlink.result.IResult;
......@@ -74,8 +75,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -104,8 +103,6 @@ public class JobManager {
private String sqlSeparator = FlinkSQLConstant.SEPARATOR;
private GatewayType runMode = GatewayType.LOCAL;
private static final String FUNCTION_REGEX = "function (.*?)'(.*?)'";
public JobManager() {
}
......@@ -171,6 +168,7 @@ public class JobManager {
JobManager manager = new JobManager(config);
manager.setPlanMode(true);
manager.init();
ProcessContextHolder.getProcess().info("Build Flink plan mode success.");
return manager;
}
......@@ -269,7 +267,8 @@ public class JobManager {
JobContextHolder.setJob(job);
ready();
String currentSql = "";
JobParam jobParam = Explainer.build(executor, useStatementSet, sqlSeparator).pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
JobParam jobParam = Explainer.build(executor, useStatementSet, sqlSeparator)
.pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
try {
for (StatementParam item : jobParam.getDdl()) {
currentSql = item.getValue();
......@@ -305,6 +304,7 @@ public class JobManager {
if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
job.setJids(new ArrayList<String>() {
{
add(job.getJobId());
}
......@@ -313,7 +313,10 @@ public class JobManager {
if (config.isUseResult()) {
// Build insert result.
IResult result =
ResultBuilder.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult);
ResultBuilder
.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(),
config.isUseAutoCancel(), executor.getTimeZone())
.getResult(tableResult);
job.setResult(result);
}
}
......@@ -333,10 +336,13 @@ public class JobManager {
} else {
for (StatementParam item : jobParam.getTrans()) {
currentSql = item.getValue();
FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getValue());
FlinkInterceptorResult flinkInterceptorResult =
FlinkInterceptor.build(executor, item.getValue());
if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) {
if (config.isUseResult()) {
IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone())
IResult result = ResultBuilder
.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(),
config.isUseAutoCancel(), executor.getTimeZone())
.getResult(flinkInterceptorResult.getTableResult());
job.setResult(result);
}
......@@ -346,6 +352,7 @@ public class JobManager {
if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
job.setJids(new ArrayList<String>() {
{
add(job.getJobId());
}
......@@ -353,8 +360,9 @@ public class JobManager {
}
if (config.isUseResult()) {
IResult result =
ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog()
, config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult);
ResultBuilder.build(item.getType(), config.getMaxRowNum(),
config.isUseChangeLog(), config.isUseAutoCancel(),
executor.getTimeZone()).getResult(tableResult);
job.setResult(result);
}
}
......@@ -381,7 +389,8 @@ public class JobManager {
streamGraph.setJobName(config.getJobName());
JobGraph jobGraph = streamGraph.getJobGraph();
if (Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
}
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
}
......@@ -400,13 +409,16 @@ public class JobManager {
if (Asserts.isNotNull(jobClient)) {
job.setJobId(jobClient.getJobID().toHexString());
job.setJids(new ArrayList<String>() {
{
add(job.getJobId());
}
});
}
if (config.isUseResult()) {
IResult result = ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(null);
IResult result =
ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(),
config.isUseAutoCancel(), executor.getTimeZone()).getResult(null);
job.setResult(result);
}
}
......@@ -470,7 +482,8 @@ public class JobManager {
}
LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(newStatement);
result = ResultBuilder.build(operationType, config.getMaxRowNum(), false, false, executor.getTimeZone()).getResult(tableResult);
result = ResultBuilder.build(operationType, config.getMaxRowNum(), false, false, executor.getTimeZone())
.getResult(tableResult);
result.setStartTime(startTime);
}
return result;
......@@ -490,11 +503,13 @@ public class JobManager {
}
Executor sessionExecutor = null;
if (sessionConfig.isUseRemote()) {
sessionExecutor = Executor.buildRemoteExecutor(EnvironmentSetting.build(sessionConfig.getAddress()), ExecutorSetting.DEFAULT);
sessionExecutor = Executor.buildRemoteExecutor(EnvironmentSetting.build(sessionConfig.getAddress()),
ExecutorSetting.DEFAULT);
} else {
sessionExecutor = Executor.buildLocalExecutor(sessionConfig.getExecutorSetting());
}
ExecutorEntity executorEntity = new ExecutorEntity(session, sessionConfig, createUser, LocalDateTime.now(), sessionExecutor);
ExecutorEntity executorEntity =
new ExecutorEntity(session, sessionConfig, createUser, LocalDateTime.now(), sessionExecutor);
SessionPool.push(executorEntity);
return SessionInfo.build(executorEntity);
}
......@@ -555,7 +570,8 @@ public class JobManager {
job.setStatus(Job.JobStatus.SUCCESS);
success();
} catch (Exception e) {
String error = LogUtil.getError("Exception in executing Jar:\n" + config.getGatewayConfig().getAppConfig().getUserJarPath(), e);
String error = LogUtil.getError(
"Exception in executing Jar:\n" + config.getGatewayConfig().getAppConfig().getUserJarPath(), e);
job.setEndTime(LocalDateTime.now());
job.setStatus(Job.JobStatus.FAILED);
job.setError(error);
......@@ -580,13 +596,16 @@ public class JobManager {
sb.append("set " + CoreOptions.DEFAULT_PARALLELISM.key() + " = " + config.getParallelism() + ";\r\n");
}
if (Asserts.isNotNull(config.getCheckpoint())) {
sb.append("set " + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key() + " = " + config.getCheckpoint() + ";\r\n");
sb.append("set " + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key() + " = "
+ config.getCheckpoint() + ";\r\n");
}
if (Asserts.isNotNullString(config.getSavePointPath())) {
sb.append("set " + SavepointConfigOptions.SAVEPOINT_PATH + " = " + config.getSavePointPath() + ";\r\n");
}
if (Asserts.isNotNull(config.getGatewayConfig()) && Asserts.isNotNull(config.getGatewayConfig().getFlinkConfig().getConfiguration())) {
for (Map.Entry<String, String> entry : config.getGatewayConfig().getFlinkConfig().getConfiguration().entrySet()) {
if (Asserts.isNotNull(config.getGatewayConfig())
&& Asserts.isNotNull(config.getGatewayConfig().getFlinkConfig().getConfiguration())) {
for (Map.Entry<String, String> entry : config.getGatewayConfig().getFlinkConfig().getConfiguration()
.entrySet()) {
sb.append("set " + entry.getKey() + " = " + entry.getValue() + ";\r\n");
}
}
......@@ -594,12 +613,17 @@ public class JobManager {
switch (GatewayType.get(config.getType())) {
case YARN_PER_JOB:
case YARN_APPLICATION:
sb.append("set " + DeploymentOptions.TARGET.key() + " = " + GatewayType.get(config.getType()).getLongValue() + ";\r\n");
sb.append("set " + DeploymentOptions.TARGET.key() + " = "
+ GatewayType.get(config.getType()).getLongValue() + ";\r\n");
if (Asserts.isNotNull(config.getGatewayConfig())) {
sb.append("set " + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + " = " + Collections.singletonList(config.getGatewayConfig().getClusterConfig().getFlinkLibPath()) + ";\r\n");
sb.append("set " + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + " = "
+ Collections.singletonList(config.getGatewayConfig().getClusterConfig().getFlinkLibPath())
+ ";\r\n");
}
if (Asserts.isNotNull(config.getGatewayConfig()) && Asserts.isNotNullString(config.getGatewayConfig().getFlinkConfig().getJobName())) {
sb.append("set " + YarnConfigOptions.APPLICATION_NAME.key() + " = " + config.getGatewayConfig().getFlinkConfig().getJobName() + ";\r\n");
if (Asserts.isNotNull(config.getGatewayConfig())
&& Asserts.isNotNullString(config.getGatewayConfig().getFlinkConfig().getJobName())) {
sb.append("set " + YarnConfigOptions.APPLICATION_NAME.key() + " = "
+ config.getGatewayConfig().getFlinkConfig().getJobName() + ";\r\n");
}
break;
default:
......@@ -608,16 +632,6 @@ public class JobManager {
return sb.toString();
}
public static List<String> getUDFClassName(String statement) {
Pattern pattern = Pattern.compile(FUNCTION_REGEX, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(statement);
List<String> classNameList = new ArrayList<>();
while (matcher.find()) {
classNameList.add(matcher.group(2));
}
return classNameList;
}
public static void initUDF(String className, String code) {
if (ClassPool.exist(ClassEntity.build(className, code))) {
UDFUtil.initClassLoader(className);
......
......@@ -19,13 +19,13 @@
package com.dlink.utils;
import static com.dlink.constant.PathConstant.UDF_JAR_RULE;
import static com.dlink.constant.PathConstant.UDF_JAR_TMP_PATH;
import static com.dlink.constant.PathConstant.UDF_VERSION_RULE;
import com.dlink.constant.PathConstant;
import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool;
import com.dlink.process.context.ProcessContextHolder;
import com.dlink.process.model.ProcessEntity;
import org.apache.commons.lang3.StringUtils;
import java.io.InputStream;
import java.nio.charset.Charset;
......@@ -34,6 +34,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.slf4j.Logger;
......@@ -55,11 +57,27 @@ import groovy.lang.GroovyClassLoader;
* @since 2021/12/27 23:25
*/
public class UDFUtil {
protected static final Logger log = LoggerFactory.getLogger(UDFUtil.class);
/**
* 存放 udf md5与版本对应的k,v值
*/
protected static final Map<String, Integer> UDF_MD5_MAP = new HashMap<>();
private static final String FUNCTION_REGEX = "function (.*?)'(.*?)'";
public static List<String> getUDFClassName(String statement) {
ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Parse UDF class name:");
Pattern pattern = Pattern.compile(FUNCTION_REGEX, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(statement);
List<String> classNameList = new ArrayList<>();
while (matcher.find()) {
classNameList.add(matcher.group(2));
}
process.info(StringUtils.join(",", classNameList));
process.info(StrUtil.format("A total of {} UDF have been Parsed.", classNameList.size()));
return classNameList;
}
public static Boolean buildClass(String code) {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(code);
......@@ -93,8 +111,8 @@ public class UDFUtil {
List<String> successList = new ArrayList<>();
List<String> failedList = new ArrayList<>();
String tmpPath = PathConstant.UDF_PATH;
String udfJarPath = UDF_JAR_TMP_PATH;
//删除jar缓存
String udfJarPath = PathConstant.UDF_JAR_TMP_PATH;
// 删除jar缓存
FileUtil.del(udfJarPath);
codeList.forEach(code -> {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(code);
......@@ -110,14 +128,18 @@ public class UDFUtil {
failedList.add(className);
}
});
String[] clazzs = successList.stream().map(className -> StrUtil.replace(className, ".", "/") + ".class").toArray(String[]::new);
InputStream[] fileInputStreams = successList.stream().map(className -> tmpPath + StrUtil.replace(className, ".", "/") + ".class").map(FileUtil::getInputStream).toArray(InputStream[]::new);
String[] clazzs = successList.stream().map(className -> StrUtil.replace(className, ".", "/") + ".class")
.toArray(String[]::new);
InputStream[] fileInputStreams =
successList.stream().map(className -> tmpPath + StrUtil.replace(className, ".", "/") + ".class")
.map(FileUtil::getInputStream).toArray(InputStream[]::new);
// 编译好的文件打包jar
try (ZipUtils zipWriter = new ZipUtils(FileUtil.file(udfJarPath), Charset.defaultCharset())) {
zipWriter.add(clazzs, fileInputStreams);
}
String md5 = md5sum(udfJarPath);
return MapUtil.builder("success", successList).put("failed", failedList).put("md5", Collections.singletonList(md5)).build();
return MapUtil.builder("success", successList).put("failed", failedList)
.put("md5", Collections.singletonList(md5)).build();
}
/**
......@@ -138,16 +160,16 @@ public class UDFUtil {
if (UDF_MD5_MAP.isEmpty()) {
scanUDFMD5();
}
//2. 如果有匹配的,返回对应udf 版本,没有则构建jar,对应信息写入 jar
// 2. 如果有匹配的,返回对应udf 版本,没有则构建jar,对应信息写入 jar
if (UDF_MD5_MAP.containsKey(md5)) {
FileUtil.del(UDF_JAR_TMP_PATH);
FileUtil.del(PathConstant.UDF_JAR_TMP_PATH);
return StrUtil.format("udf-{}.jar", UDF_MD5_MAP.get(md5));
}
// 3. 生成新版本jar
Integer newVersion = UDF_MD5_MAP.values().size() > 0 ? CollUtil.max(UDF_MD5_MAP.values()) + 1 : 1;
String jarName = StrUtil.format("udf-{}.jar", newVersion);
String newName = PathConstant.UDF_PATH + jarName;
FileUtil.rename(FileUtil.file(UDF_JAR_TMP_PATH), newName, true);
FileUtil.rename(FileUtil.file(PathConstant.UDF_JAR_TMP_PATH), newName, true);
UDF_MD5_MAP.put(md5, newVersion);
return jarName;
} catch (Exception e) {
......@@ -161,8 +183,9 @@ public class UDFUtil {
*/
private static void scanUDFMD5() {
List<String> fileList = FileUtil.listFileNames(PathConstant.UDF_PATH);
fileList.stream().filter(fileName -> ReUtil.isMatch(UDF_JAR_RULE, fileName)).distinct().forEach(fileName -> {
Integer version = Convert.toInt(ReUtil.getGroup0(UDF_VERSION_RULE, fileName));
fileList.stream().filter(fileName -> ReUtil.isMatch(PathConstant.UDF_JAR_RULE, fileName)).distinct()
.forEach(fileName -> {
Integer version = Convert.toInt(ReUtil.getGroup0(PathConstant.UDF_VERSION_RULE, fileName));
UDF_MD5_MAP.put(md5sum(PathConstant.UDF_PATH + fileName), version);
});
}
......
......@@ -55,7 +55,7 @@ start() {
pid=$(cat ${PIDPATH}/${PIDFILE})
if [ -z $pid ]; then
nohup java -Ddruid.mysql.usePingMethod=false -Xms512M -Xmx2048M -XX:PermSize=512M -XX:MaxPermSize=1024M -XX:+HeapDumpOnOutOfMemoryError -Xverify:none -cp ${CLASS_PATH} com.dlink.Dlink >dlink.log 2>&1 &
nohup java -Ddruid.mysql.usePingMethod=false -Xms512M -Xmx2048M -XX:PermSize=512M -XX:MaxPermSize=1024M -XX:+HeapDumpOnOutOfMemoryError -Xverify:none -cp ${CLASS_PATH} com.dlink.Dlink >/dev/null 2>&1 &
echo $! >${PIDPATH}/${PIDFILE}
echo "........................................Start Dlink Successfully........................................"
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.dlink</groupId>
<artifactId>dlink</artifactId>
<version>0.6.8-SNAPSHOT</version>
</parent>
<artifactId>dlink-process</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
</dependencies>
</project>
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.process.context;
import com.dlink.assertion.Asserts;
import com.dlink.process.model.ProcessEntity;
import com.dlink.process.pool.ProcessPool;
/**
* ProcessContextHolder
*
* @author wenmo
* @since 2022/10/16 16:57
*/
public class ProcessContextHolder {
private static final ThreadLocal<ProcessEntity> PROCESS_CONTEXT = new ThreadLocal<>();
public static void setProcess(ProcessEntity process) {
PROCESS_CONTEXT.set(process);
}
public static ProcessEntity getProcess() {
if (Asserts.isNull(PROCESS_CONTEXT.get())) {
return ProcessEntity.NULL_PROCESS;
}
return PROCESS_CONTEXT.get();
}
public static void clear() {
PROCESS_CONTEXT.remove();
}
public static ProcessEntity registerProcess(ProcessEntity process) {
Asserts.checkNull(process, "Process can not be null.");
setProcess(process);
ProcessPool.getInstance().push(process.getName(), process);
return process;
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.process.model;
import com.dlink.assertion.Asserts;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import cn.hutool.core.util.StrUtil;
/**
* Process
*
* @author wenmo
* @since 2022/10/16 16:30
*/
public class ProcessEntity {
private String pid;
private String name;
private Integer taskId;
private ProcessType type;
private ProcessStatus status;
private LocalDateTime startTime;
private LocalDateTime endTime;
private long time;
private int stepIndex = 0;
private List<ProcessStep> steps;
private Integer userId;
private String userName;
public static final ProcessEntity NULL_PROCESS = new ProcessEntity();
public ProcessEntity() {
}
public ProcessEntity(String pid, String name, Integer taskId, ProcessType type, Integer userId, String userName) {
this.pid = pid;
this.name = name;
this.taskId = taskId;
this.type = type;
this.userId = userId;
this.userName = userName;
}
public ProcessEntity(String name, Integer taskId, ProcessType type, ProcessStatus status, LocalDateTime startTime,
LocalDateTime endTime, long time,
List<ProcessStep> steps, Integer userId, String userName) {
this.name = name;
this.taskId = taskId;
this.type = type;
this.status = status;
this.startTime = startTime;
this.endTime = endTime;
this.time = time;
this.steps = steps;
this.userId = userId;
this.userName = userName;
}
public static ProcessEntity init(ProcessType type, Integer userId, String userName) {
return init(type.getValue() + "_TEMP", null, type, userId, userName);
}
public static ProcessEntity init(Integer taskId, ProcessType type, Integer userId, String userName) {
return init(type.getValue() + taskId, taskId, type, userId, userName);
}
public static ProcessEntity init(String name, Integer taskId, ProcessType type, Integer userId, String userName) {
ProcessEntity process = new ProcessEntity(UUID.randomUUID().toString(), name, taskId, type, userId, userName);
process.setStatus(ProcessStatus.INITIALIZING);
process.setStartTime(LocalDateTime.now());
process.setSteps(new ArrayList<>());
process.getSteps().add(ProcessStep.init());
process.nextStep();
return process;
}
public void start() {
if (isNullProcess()) {
return;
}
steps.get(stepIndex - 1).setEndTime(LocalDateTime.now());
setStatus(ProcessStatus.RUNNING);
steps.add(ProcessStep.run());
nextStep();
}
public void finish() {
if (isNullProcess()) {
return;
}
steps.get(stepIndex - 1).setEndTime(LocalDateTime.now());
setStatus(ProcessStatus.FINISHED);
setEndTime(LocalDateTime.now());
setTime(getEndTime().compareTo(getStartTime()));
}
public void config(String str) {
if (isNullProcess()) {
return;
}
steps.get(stepIndex - 1).appendInfo(
StrUtil.format("\n[{}] {} CONFIG: {}", type.getValue(), LocalDateTime.now(), str));
}
public void info(String str) {
if (isNullProcess()) {
return;
}
steps.get(stepIndex - 1).appendInfo(
StrUtil.format("\n[{}] {} INFO: {}", type.getValue(), LocalDateTime.now(), str));
}
public void infoSuccess() {
if (isNullProcess()) {
return;
}
steps.get(stepIndex - 1).appendInfo("Success.");
}
public void infoFail() {
if (isNullProcess()) {
return;
}
steps.get(stepIndex - 1).appendInfo("Fail.");
}
public void error(String str) {
if (isNullProcess()) {
return;
}
steps.get(stepIndex - 1).appendInfo(
StrUtil.format("\n[{}] {} ERROR: {}", type.getValue(), LocalDateTime.now(), str));
steps.get(stepIndex - 1).appendError(
StrUtil.format("\n[{}] {} ERROR: {}", type.getValue(), LocalDateTime.now(), str));
}
public void nextStep() {
if (isNullProcess()) {
return;
}
stepIndex++;
}
public boolean isNullProcess() {
return Asserts.isNullString(pid);
}
public boolean isActiveProcess() {
return status.isActiveStatus();
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getTaskId() {
return taskId;
}
public void setTaskId(Integer taskId) {
this.taskId = taskId;
}
public ProcessType getType() {
return type;
}
public void setType(ProcessType type) {
this.type = type;
}
public ProcessStatus getStatus() {
return status;
}
public void setStatus(ProcessStatus status) {
this.status = status;
}
public LocalDateTime getStartTime() {
return startTime;
}
public void setStartTime(LocalDateTime startTime) {
this.startTime = startTime;
}
public LocalDateTime getEndTime() {
return endTime;
}
public void setEndTime(LocalDateTime endTime) {
this.endTime = endTime;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public List<ProcessStep> getSteps() {
return steps;
}
public void setSteps(List<ProcessStep> steps) {
this.steps = steps;
}
public int getStepIndex() {
return stepIndex;
}
public void setStepIndex(int stepIndex) {
this.stepIndex = stepIndex;
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.process.model;
import com.dlink.assertion.Asserts;
/**
* ProcessStatus
*
* @author wenmo
* @since 2022/10/16 16:33
*/
public enum ProcessStatus {
INITIALIZING("INITIALIZING"),
RUNNING("RUNNING"),
FAILED("FAILED"),
CANCELED("CANCELED"),
FINISHED("FINISHED"),
UNKNOWN("UNKNOWN");
private String value;
ProcessStatus(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public static ProcessStatus get(String value) {
for (ProcessStatus type : ProcessStatus.values()) {
if (Asserts.isEquals(type.getValue(), value)) {
return type;
}
}
return ProcessStatus.UNKNOWN;
}
public boolean equalsValue(String type) {
if (Asserts.isEquals(value, type)) {
return true;
}
return false;
}
public boolean isActiveStatus() {
switch (this) {
case INITIALIZING:
case RUNNING:
return true;
default:
return false;
}
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.process.model;
import java.time.LocalDateTime;
/**
* ProcessStep
*
* @author wenmo
* @since 2022/10/16 16:46
*/
public class ProcessStep {
private ProcessStatus stepStatus;
private LocalDateTime startTime;
private LocalDateTime endTime;
private long time;
private StringBuilder info = new StringBuilder();
private StringBuilder error = new StringBuilder();
private boolean isError = false;
public ProcessStep() {
}
public ProcessStep(ProcessStatus stepStatus, LocalDateTime startTime) {
this.stepStatus = stepStatus;
this.startTime = startTime;
}
public ProcessStep(int index, ProcessStatus stepStatus, LocalDateTime startTime, LocalDateTime endTime, long time,
StringBuilder info, StringBuilder error) {
this.stepStatus = stepStatus;
this.startTime = startTime;
this.endTime = endTime;
this.time = time;
this.info = info;
this.error = error;
}
public static ProcessStep init() {
return new ProcessStep(ProcessStatus.INITIALIZING, LocalDateTime.now());
}
public static ProcessStep run() {
return new ProcessStep(ProcessStatus.RUNNING, LocalDateTime.now());
}
public void appendInfo(String str) {
info.append(str);
}
public void appendError(String str) {
error.append(str);
isError = true;
}
public ProcessStatus getStepStatus() {
return stepStatus;
}
public void setStepStatus(ProcessStatus stepStatus) {
this.stepStatus = stepStatus;
}
public LocalDateTime getStartTime() {
return startTime;
}
public void setStartTime(LocalDateTime startTime) {
this.startTime = startTime;
}
public LocalDateTime getEndTime() {
return endTime;
}
public void setEndTime(LocalDateTime endTime) {
this.endTime = endTime;
this.time = endTime.compareTo(startTime);
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public StringBuilder getInfo() {
return info;
}
public void setInfo(StringBuilder info) {
this.info = info;
}
public StringBuilder getError() {
return error;
}
public void setError(StringBuilder error) {
this.error = error;
}
public boolean isError() {
return isError;
}
public void setError(boolean error) {
isError = error;
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.process.model;
import com.dlink.assertion.Asserts;
/**
* ProcessType
*
* @author wenmo
* @since 2022/10/16 16:33
*/
public enum ProcessType {
FLINKEXPLAIN("FlinkExplain"),
FLINKSUBMIT("FlinkSubmit"),
SQLEXPLAIN("SQLExplain"),
SQKSUBMIT("SQLSubmit"),
UNKNOWN("Unknown");
private String value;
ProcessType(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public static ProcessType get(String value) {
for (ProcessType type : ProcessType.values()) {
if (Asserts.isEquals(type.getValue(), value)) {
return type;
}
}
return ProcessType.UNKNOWN;
}
public boolean equalsValue(String type) {
if (Asserts.isEquals(value, type)) {
return true;
}
return false;
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.process.pool;
import com.dlink.pool.AbstractPool;
import com.dlink.process.model.ProcessEntity;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* ProcessPool
*
* @author wenmo
* @since 2022/10/16 17:00
*/
public class ProcessPool extends AbstractPool<ProcessEntity> {
private static volatile Map<String, ProcessEntity> processEntityMap = new ConcurrentHashMap<>();
private static ProcessPool instance = new ProcessPool();
public static ProcessPool getInstance() {
return instance;
}
@Override
public Map<String, ProcessEntity> getMap() {
return processEntityMap;
}
@Override
public void refresh(ProcessEntity entity) {
}
}
......@@ -182,6 +182,12 @@ export default [
icon: 'desktop',
component: './SettingCenter/SystemInfo',
},
{
path: '/settingcenter/processList',
name: 'processList',
icon: 'desktop',
component: './SettingCenter/ProcessList',
},
],
},
{
......
......@@ -102,6 +102,7 @@ export default {
'menu.settings': 'Setting Center',
'menu.settings.flinkConfig': 'Flink Settings',
'menu.settings.systemInfo': 'System Info',
'menu.settings.processList': 'Process List',
'menu.about': 'About',
};
......@@ -107,6 +107,7 @@ export default {
'menu.settings': '配置中心',
'menu.settings.flinkConfig': 'Flink 配置',
'menu.settings.systemInfo': '系统信息',
'menu.settings.processList': '进程列表',
'menu.about': '关于',
};
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
export type ProcessItem = {
pid: string,
name: string,
taskId: number,
type: string,
status: string,
startTime: Date,
endTime: Date,
time: number,
steps: ProcessStep[],
userId: number,
userName: string,
};
export type ProcessStep = {
stepStatus: string,
startTime: Date,
endTime: Date,
time: number,
info: string,
error: string,
isError: boolean,
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import React, {useRef, useState} from "react";
import ProTable, {ActionType, ProColumns} from "@ant-design/pro-table";
import {Drawer} from 'antd';
import {PageContainer} from '@ant-design/pro-layout';
import ProDescriptions from '@ant-design/pro-descriptions';
import {getData} from "@/components/Common/crud";
import {ProcessItem} from "@/pages/SettingCenter/ProcessList/data";
const url = '/api/process/listAllProcess';
const ProcessList: React.FC<{}> = (props: any) => {
const {dispatch} = props;
const [row, setRow] = useState<ProcessItem>();
const actionRef = useRef<ActionType>();
const columns: ProColumns<ProcessItem>[] = [
{
title: '进程ID',
dataIndex: 'pid',
sorter: true,
render: (dom, entity) => {
return <a onClick={() => setRow(entity)}>{dom}</a>;
},
},
{
title: '进程名',
sorter: true,
dataIndex: 'name',
},
{
title: '任务ID',
sorter: true,
dataIndex: 'taskId',
},
{
title: '类型',
sorter: true,
dataIndex: 'type',
filters: [
{
text: 'FlinkExplain',
value: 'FlinkExplain',
},{
text: 'FlinkSubmit',
value: 'FlinkSubmit',
},{
text: 'SQLExplain',
value: 'SQLExplain',
},{
text: 'SQLSubmit',
value: 'SQLSubmit',
},{
text: 'Unknown',
value: 'Unknown',
},
],
filterMultiple: false,
valueEnum: {
'FlinkExplain': {text: 'FlinkExplain'},
'FlinkSubmit': {text: 'FlinkSubmit'},
'SQLExplain': {text: 'SQLExplain'},
'SQLSubmit': {text: 'SQLSubmit'},
'Unknown': {text: 'Unknown'},
},
},{
title: '状态',
sorter: true,
dataIndex: 'status',
filters: [
{
text: 'INITIALIZING',
value: 'INITIALIZING',
},{
text: 'RUNNING',
value: 'RUNNING',
},{
text: 'FAILED',
value: 'FAILED',
},{
text: 'CANCELED',
value: 'CANCELED',
},{
text: 'FINISHED',
value: 'FINISHED',
},{
text: 'UNKNOWN',
value: 'UNKNOWN',
},
],
filterMultiple: false,
valueEnum: {
'INITIALIZING': {text: 'INITIALIZING'},
'RUNNING': {text: 'RUNNING'},
'FAILED': {text: 'FAILED'},
'CANCELED': {text: 'CANCELED'},
'FINISHED': {text: 'FINISHED'},
'UNKNOWN': {text: 'UNKNOWN'},
},
},
{
title: '开始时间',
dataIndex: 'startTime',
sorter: true,
valueType: 'dateTime',
},
{
title: '结束时间',
dataIndex: 'endTime',
sorter: true,
valueType: 'dateTime',
}, {
title: '耗时',
sorter: true,
dataIndex: 'time',
}, {
title: '操作人ID',
sorter: true,
dataIndex: 'userId',
}, {
title: '操作人名',
sorter: true,
dataIndex: 'userName',
}
];
return (
<PageContainer title={false}>
<ProTable
actionRef={actionRef}
rowKey="pid"
request={(params, sorter, filter) => getData(url, {active:false})}
columns={columns}
search={false}
/>
<Drawer
width={600}
visible={!!row}
onClose={() => {
setRow(undefined);
}}
closable={false}
>
{row?.pid && (
<ProDescriptions<ProcessItem>
column={2}
title={row?.pid}
request={async () => ({
data: row || {},
})}
params={{
pid: row?.pid,
}}
columns={columns}
/>
)}
</Drawer>
</PageContainer>
);
};
export default ProcessList;
......@@ -43,6 +43,7 @@
<module>dlink-web</module>
<module>dlink-admin</module>
<module>dlink-assembly</module>
<module>dlink-process</module>
</modules>
<properties>
......@@ -452,6 +453,11 @@
<artifactId>dlink-scheduler</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-process</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment