Commit 3072dc0e authored by godkaikai's avatar godkaikai

集群配置测试功能

parent 75bc069e
...@@ -2,12 +2,19 @@ package com.dlink.controller; ...@@ -2,12 +2,19 @@ package com.dlink.controller;
import com.dlink.common.result.ProTableResult; import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.gateway.result.TestResult;
import com.dlink.model.ClusterConfiguration; import com.dlink.model.ClusterConfiguration;
import com.dlink.service.ClusterConfigurationService; import com.dlink.service.ClusterConfigurationService;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -30,6 +37,8 @@ public class ClusterConfigurationController { ...@@ -30,6 +37,8 @@ public class ClusterConfigurationController {
*/ */
@PutMapping @PutMapping
public Result saveOrUpdate(@RequestBody ClusterConfiguration clusterConfiguration) { public Result saveOrUpdate(@RequestBody ClusterConfiguration clusterConfiguration) {
TestResult testResult = clusterConfigurationService.testGateway(clusterConfiguration);
clusterConfiguration.setAvailable(testResult.isAvailable());
if(clusterConfigurationService.saveOrUpdate(clusterConfiguration)){ if(clusterConfigurationService.saveOrUpdate(clusterConfiguration)){
return Result.succeed("新增成功"); return Result.succeed("新增成功");
}else { }else {
...@@ -85,4 +94,17 @@ public class ClusterConfigurationController { ...@@ -85,4 +94,17 @@ public class ClusterConfigurationController {
List<ClusterConfiguration >clusters = clusterConfigurationService.listEnabledAll(); List<ClusterConfiguration >clusters = clusterConfigurationService.listEnabledAll();
return Result.succeed(clusters,"获取成功"); return Result.succeed(clusters,"获取成功");
} }
/**
* 测试
*/
@PostMapping("/testConnect")
public Result testConnect(@RequestBody ClusterConfiguration clusterConfiguration) {
TestResult testResult = clusterConfigurationService.testGateway(clusterConfiguration);
if(testResult.isAvailable()){
return Result.succeed("测试链接成功");
}else {
return Result.failed(testResult.getError());
}
}
} }
package com.dlink.service; package com.dlink.service;
import com.dlink.db.service.ISuperService; import com.dlink.db.service.ISuperService;
import com.dlink.gateway.result.TestResult;
import com.dlink.model.ClusterConfiguration; import com.dlink.model.ClusterConfiguration;
import java.util.List; import java.util.List;
...@@ -20,4 +21,5 @@ public interface ClusterConfigurationService extends ISuperService<ClusterConfig ...@@ -20,4 +21,5 @@ public interface ClusterConfigurationService extends ISuperService<ClusterConfig
Map<String,Object> getGatewayConfig(Integer id); Map<String,Object> getGatewayConfig(Integer id);
TestResult testGateway(ClusterConfiguration clusterConfiguration);
} }
package com.dlink.service.impl; package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.result.TestResult;
import com.dlink.job.JobManager;
import com.dlink.mapper.ClusterConfigurationMapper; import com.dlink.mapper.ClusterConfigurationMapper;
import com.dlink.model.ClusterConfiguration; import com.dlink.model.ClusterConfiguration;
import com.dlink.model.Jar;
import com.dlink.model.SystemConfiguration;
import com.dlink.service.ClusterConfigurationService; import com.dlink.service.ClusterConfigurationService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -35,4 +46,21 @@ public class ClusterConfigurationServiceImpl extends SuperServiceImpl<ClusterCon ...@@ -35,4 +46,21 @@ public class ClusterConfigurationServiceImpl extends SuperServiceImpl<ClusterCon
ClusterConfiguration clusterConfiguration = this.getClusterConfigById(id); ClusterConfiguration clusterConfiguration = this.getClusterConfigById(id);
return clusterConfiguration.getConfig(); return clusterConfiguration.getConfig();
} }
@Override
public TestResult testGateway(ClusterConfiguration clusterConfiguration) {
clusterConfiguration.parseConfig();
Map<String, Object> config = clusterConfiguration.getConfig();
GatewayConfig gatewayConfig = new GatewayConfig();
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(),
config.get("hadoopConfigPath").toString()));
if(config.containsKey("flinkConfig")){
gatewayConfig.setFlinkConfig(FlinkConfig.build((Map<String, String>)config.get("flinkConfig")));
}
if(Asserts.isEqualsIgnoreCase(clusterConfiguration.getType(),"Yarn")){
gatewayConfig.setType(GatewayType.YARN_PER_JOB);
}
return JobManager.testGateway(gatewayConfig);
}
} }
...@@ -13,9 +13,11 @@ import com.dlink.gateway.GatewayType; ...@@ -13,9 +13,11 @@ import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.ActionType; import com.dlink.gateway.config.ActionType;
import com.dlink.gateway.config.AppConfig; import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.FlinkConfig; import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.SavePointType; import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.result.*; import com.dlink.result.*;
...@@ -428,4 +430,8 @@ public class JobManager extends RunTime { ...@@ -428,4 +430,8 @@ public class JobManager extends RunTime {
close(); close();
return job.getJobResult(); return job.getJobResult();
} }
public static TestResult testGateway(GatewayConfig gatewayConfig){
return Gateway.build(gatewayConfig).test();
}
} }
...@@ -5,6 +5,7 @@ import com.dlink.gateway.config.GatewayConfig; ...@@ -5,6 +5,7 @@ import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException; import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import java.util.Iterator; import java.util.Iterator;
...@@ -21,6 +22,7 @@ public interface Gateway { ...@@ -21,6 +22,7 @@ public interface Gateway {
static Optional<Gateway> get(GatewayConfig config){ static Optional<Gateway> get(GatewayConfig config){
Asserts.checkNotNull(config,"配置不能为空"); Asserts.checkNotNull(config,"配置不能为空");
Asserts.checkNotNull(config.getType(),"配置类型不能为空");
ServiceLoader<Gateway> loader = ServiceLoader.load(Gateway.class); ServiceLoader<Gateway> loader = ServiceLoader.load(Gateway.class);
Iterator<Gateway> iterator = loader.iterator(); Iterator<Gateway> iterator = loader.iterator();
while(iterator.hasNext()) { while(iterator.hasNext()) {
...@@ -55,4 +57,6 @@ public interface Gateway { ...@@ -55,4 +57,6 @@ public interface Gateway {
SavePointResult savepointJob(); SavePointResult savepointJob();
TestResult test();
} }
package com.dlink.gateway.result;
/**
* TestResult
*
* @author wenmo
* @since 2021/11/27 16:12
**/
public class TestResult {
private boolean isAvailable;
private String error;
public boolean isAvailable() {
return isAvailable;
}
public String getError() {
return error;
}
public TestResult(boolean isAvailable, String error) {
this.isAvailable = isAvailable;
this.error = error;
}
public static TestResult success(){
return new TestResult(true,null);
}
public static TestResult fail(String error){
return new TestResult(false,error);
}
}
...@@ -10,6 +10,7 @@ import com.dlink.gateway.exception.GatewayException; ...@@ -10,6 +10,7 @@ import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
...@@ -23,6 +24,7 @@ import org.apache.flink.yarn.YarnClusterDescriptor; ...@@ -23,6 +24,7 @@ import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil; import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
...@@ -211,4 +213,22 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -211,4 +213,22 @@ public abstract class YarnGateway extends AbstractGateway {
} }
} }
} }
public TestResult test(){
try {
initConfig();
}catch (Exception e){
return TestResult.fail("测试 Flink 配置失败:"+e.getMessage());
}
try {
initYarnClient();
if(yarnClient.isInState(Service.STATE.STARTED)){
return TestResult.success();
}else{
return TestResult.fail("该配置无对应 Yarn 集群存在");
}
}catch (Exception e){
return TestResult.fail("测试 Yarn 配置失败:"+e.getMessage());
}
}
} }
...@@ -5,6 +5,7 @@ import {ClusterConfigurationTableListItem} from "@/pages/ClusterConfiguration/da ...@@ -5,6 +5,7 @@ import {ClusterConfigurationTableListItem} from "@/pages/ClusterConfiguration/da
import {getConfig, getConfigFormValues} from "@/pages/ClusterConfiguration/function"; import {getConfig, getConfigFormValues} from "@/pages/ClusterConfiguration/function";
import {FLINK_CONFIG_LIST, HADOOP_CONFIG_LIST} from "@/pages/ClusterConfiguration/conf"; import {FLINK_CONFIG_LIST, HADOOP_CONFIG_LIST} from "@/pages/ClusterConfiguration/conf";
import type {Config} from "@/pages/ClusterConfiguration/conf"; import type {Config} from "@/pages/ClusterConfiguration/conf";
import {testClusterConfigurationConnect} from "@/pages/ClusterConfiguration/service";
export type ClusterConfigurationFormProps = { export type ClusterConfigurationFormProps = {
onCancel: (flag?: boolean) => void; onCancel: (flag?: boolean) => void;
...@@ -203,10 +204,19 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props ...@@ -203,10 +204,19 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
); );
}; };
const testForm = ()=>{
const fieldsValue = form.validateFields();
setFormVals({...formVals, ...fieldsValue});
testClusterConfigurationConnect(formVals);
};
const renderFooter = () => { const renderFooter = () => {
return ( return (
<> <>
<Button onClick={() => handleModalVisible(false)}>取消</Button> <Button onClick={() => handleModalVisible(false)}>取消</Button>
<Button type="primary" htmlType="button" onClick={testForm}>
测试
</Button>
<Button type="primary" onClick={() => submitForm()}> <Button type="primary" onClick={() => submitForm()}>
完成 完成
</Button> </Button>
......
import {postAll} from "@/components/Common/crud";
import {message} from "antd";
import {ClusterConfigurationTableListItem} from "@/pages/ClusterConfiguration/data";
export async function testClusterConfigurationConnect(clusterConfiguration: ClusterConfigurationTableListItem) {
const hide = message.loading('正在测试连接');
try {
const {code,msg} = await postAll('/api/clusterConfiguration/testConnect',clusterConfiguration);
hide();
code==0?message.success(msg):message.error(msg);
} catch (error) {
hide();
message.error('请求失败,请重试');
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment