Commit 6e231b00 authored by wenmo's avatar wenmo

自定义jar提交

parent 6ab20c80
......@@ -2,6 +2,7 @@ package com.dlink.assertion;
import com.dlink.exception.BusException;
import com.dlink.model.Cluster;
import com.dlink.model.Jar;
import com.dlink.model.Statement;
import com.dlink.model.Task;
......@@ -40,4 +41,10 @@ public interface Assert {
throw new BusException("集群地址暂不可用");
}
}
static void check(Jar jar) {
if (jar == null) {
throw new BusException("自定义Jar不存在");
}
}
}
......@@ -82,4 +82,13 @@ public class JarController {
jar = jarService.getById(jar.getId());
return Result.succeed(jar,"获取成功");
}
/**
* 获取可用的jar列表
*/
@GetMapping("/listEnabledAll")
public Result listEnabledAll() {
List<Jar >jars = jarService.listEnabledAll();
return Result.succeed(jars,"获取成功");
}
}
......@@ -28,6 +28,7 @@ public class StudioExecuteDTO {
private boolean useRemote;
private Integer clusterId;
private Integer clusterConfigurationId;
private Integer jarId;
private boolean fragment;
private String statement;
private String jobName;
......@@ -56,7 +57,7 @@ public class StudioExecuteDTO {
}
return new JobConfig(
type,useResult, useSession, session, useRemote, clusterId,
clusterConfigurationId, taskId, jobName, fragment,useStatementSet,
clusterConfigurationId,jarId, taskId, jobName, fragment,useStatementSet,
maxRowNum, checkPoint, parallelism,savePointStrategy, savePointPath,config);
}
}
......@@ -44,6 +44,8 @@ public class Task extends SuperEntity{
private Integer clusterConfigurationId;
private Integer jarId;
private String config;
private String note;
......@@ -70,7 +72,7 @@ public class Task extends SuperEntity{
if(clusterId==null||clusterId==0){
useRemote = false;
}
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointStrategy,savePointPath);
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,jarId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointStrategy,savePointPath);
}
}
......@@ -3,6 +3,8 @@ package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.Jar;
import java.util.List;
/**
* JarService
*
......@@ -11,4 +13,5 @@ import com.dlink.model.Jar;
**/
public interface JarService extends ISuperService<Jar> {
List<Jar> listEnabledAll();
}
package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.JarMapper;
import com.dlink.model.Jar;
import com.dlink.service.JarService;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* JarServiceImpl
*
......@@ -14,4 +17,8 @@ import org.springframework.stereotype.Service;
**/
@Service
public class JarServiceImpl extends SuperServiceImpl<JarMapper, Jar> implements JarService {
@Override
public List<Jar> listEnabledAll() {
return list(new QueryWrapper<Jar>().eq("enabled",1));
}
}
......@@ -3,15 +3,12 @@ package com.dlink.service.impl;
import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.mapper.TaskMapper;
import com.dlink.model.Cluster;
import com.dlink.model.Savepoints;
import com.dlink.model.Statement;
import com.dlink.model.SystemConfiguration;
import com.dlink.model.Task;
import com.dlink.model.*;
import com.dlink.service.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -36,6 +33,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private ClusterConfigurationService clusterConfigurationService;
@Autowired
private SavepointsService savepointsService;
@Autowired
private JarService jarService;
@Value("${spring.datasource.driver-class-name}")
private String driver;
......@@ -54,18 +53,30 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
public JobResult submitByTaskId(Integer id) {
Task task = this.getById(id);
Assert.check(task);
Statement statement = statementService.getById(id);
Assert.check(statement);
boolean isJarTask = isJarTask(task);
Statement statement = null;
if(!isJarTask){
statement = statementService.getById(id);
Assert.check(statement);
}
JobConfig config = task.buildSubmitConfig();
if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
} else {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
if ("yarn-application".equals(config.getType()) || "ya".equals(config.getType())) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas", systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarMainAppClass", systemConfiguration.getSqlSubmitJarMainAppClass());
if (GatewayType.YARN_APPLICATION.equals(config.getType())) {
if(!isJarTask) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas", systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarMainAppClass", systemConfiguration.getSqlSubmitJarMainAppClass());
}else{
Jar jar = jarService.getById(task.getJarId());
Assert.check(jar);
gatewayConfig.put("userJarPath", jar.getPath());
gatewayConfig.put("userJarParas", jar.getParas());
gatewayConfig.put("userJarMainAppClass", jar.getMainClass());
}
}
config.buildGatewayConfig(gatewayConfig);
}
......@@ -88,7 +99,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
config.setSavePointPath(null);
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(statement.getStatement());
if(!isJarTask) {
return jobManager.executeSql(statement.getStatement());
}else{
return jobManager.executeJar();
}
}
private boolean isJarTask(Task task){
return GatewayType.YARN_APPLICATION.equalsValue(task.getType())&&Asserts.isNotNull(task.getJarId());
}
@Override
......
......@@ -16,6 +16,7 @@
<result column="statement_set" property="statementSet" />
<result column="cluster_id" property="clusterId" />
<result column="cluster_configuration_id" property="clusterConfigurationId" />
<result column="jar_id" property="jarId" />
<result column="config" property="config" />
<result column="note" property="note" />
<result column="enabled" property="enabled" />
......@@ -25,7 +26,7 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,config,note, enabled, create_time, update_time
id, name, alias, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,jar_id,config,note, enabled, create_time, update_time
</sql>
......
......@@ -29,6 +29,7 @@ public class JobConfig {
private boolean useRemote;
private Integer clusterId;
private Integer clusterConfigurationId;
private Integer jarId;
private String address;
private Integer taskId;
private String jobName;
......@@ -48,7 +49,7 @@ public class JobConfig {
}
public JobConfig(String type, boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer taskId, String jobName, boolean useSqlFragment,
Integer clusterConfigurationId,Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint,
Integer parallelism, Integer savePointStrategyValue, String savePointPath, Map<String,String> config) {
this.type = type;
......@@ -58,6 +59,7 @@ public class JobConfig {
this.useRemote = useRemote;
this.clusterId = clusterId;
this.clusterConfigurationId = clusterConfigurationId;
this.jarId = jarId;
this.taskId = taskId;
this.jobName = jobName;
this.useSqlFragment = useSqlFragment;
......@@ -80,7 +82,7 @@ public class JobConfig {
}
public JobConfig(String type,boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer taskId, String jobName, boolean useSqlFragment,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue, String savePointPath) {
this.type = type;
this.useResult = useResult;
......@@ -88,6 +90,7 @@ public class JobConfig {
this.useRemote = useRemote;
this.clusterId = clusterId;
this.clusterConfigurationId = clusterConfigurationId;
this.jarId = jarId;
this.taskId = taskId;
this.jobName = jobName;
this.useSqlFragment = useSqlFragment;
......
......@@ -398,4 +398,34 @@ public class JobManager extends RunTime {
}
}
public JobResult executeJar() {
Job job = Job.init(GatewayType.get(config.getType()), config, executorSetting, executor, null, useGateway);
JobContextHolder.setJob(job);
ready();
try {
GatewayResult gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
job.setResult(InsertResult.success(gatewayResult.getAppId()));
job.setJobId(gatewayResult.getAppId());
job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL()));
job.setEndTime(LocalDateTime.now());
job.setStatus(Job.JobStatus.SUCCESS);
success();
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getStackTrace();
StringBuffer resMsg = new StringBuffer("");
for (StackTraceElement s : trace) {
resMsg.append(" \n " + s + " ");
}
LocalDateTime now = LocalDateTime.now();
job.setEndTime(now);
job.setStatus(Job.JobStatus.FAILED);
String error = now.toString() + ":" + "运行Jar:\n" + config.getGatewayConfig().getAppConfig().getUserJarPath() + " \n时出现异常:" + e.getMessage() + " \n >>>堆栈信息<<<" + resMsg.toString();
job.setError(error);
failed();
close();
}
close();
return job.getJobResult();
}
}
......@@ -64,7 +64,7 @@ public class JobManagerTest {
public void cancelJobSelect(){
JobConfig config = new JobConfig("session-yarn",true, true, "s1", true, 2,
null, null, "测试", false,false, 100, 0,
null, null,null, "测试", false,false, 100, 0,
1, 0,null,new HashMap<>());
if(config.isUseRemote()) {
config.setAddress("192.168.123.157:8081");
......
......@@ -230,6 +230,7 @@ CREATE TABLE `dlink_task` (
`statement_set` tinyint(1) NULL DEFAULT NULL COMMENT '启用语句集',
`cluster_id` int(11) NULL DEFAULT NULL COMMENT 'Flink集群ID',
`cluster_configuration_id` int(11) NULL DEFAULT NULL COMMENT '集群配置ID',
`jar_id` int(11) NULL DEFAULT NULL COMMENT 'jarID',
`config` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置',
`note` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '注释',
`enabled` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否启用',
......
......@@ -440,4 +440,10 @@ CREATE TABLE `dlink_savepoints` (
ALTER TABLE `dlink_task`
ADD COLUMN `save_point_strategy` int(1) NULL COMMENT 'SavePoint策略' AFTER `check_point`;
-- ----------------------------
-- 0.4.0-SNAPSHOT 2021-11-24
-- ----------------------------
ALTER TABLE `dlink_task`
ADD COLUMN `jar_id` int(11) NULL COMMENT 'JarID' AFTER `cluster_configuration_id`;
SET FOREIGN_KEY_CHECKS = 1;
......@@ -144,6 +144,16 @@ export function showDataBase(dispatch: any) {
});
});
}
/*--- 刷新 自定义Jar ---*/
export function showJars(dispatch: any) {
const res = getData('api/jar/listEnabledAll');
res.then((result) => {
result.datas && dispatch && dispatch({
type: "Jar/saveJars",
payload: result.datas,
});
});
}
/*--- 刷新 元数据表 ---*/
export function showMetaDataTable(id:number) {
return getData('api/database/getSchemasAndTables',{id:id});
......
......@@ -5,13 +5,14 @@ import {InfoCircleOutlined,PlusOutlined,MinusSquareOutlined,MinusCircleOutlined}
import styles from "./index.less";
import {useEffect, useState} from "react";
import { showTables} from "@/components/Studio/StudioEvent/DDL";
import {JarStateType} from "@/pages/Jar/model";
const { Option } = Select;
const { Text } = Typography;
const StudioSetting = (props: any) => {
const {sessionCluster,clusterConfiguration,current,form,dispatch,tabs,currentSession} = props;
const {sessionCluster,clusterConfiguration,current,form,dispatch,tabs,currentSession,jars} = props;
const getClusterOptions = ()=>{
let itemList = [(<Option key={0} value={0} label={(<><Tag color="default">Local</Tag>本地环境</>)}>
......@@ -38,6 +39,17 @@ const StudioSetting = (props: any) => {
return itemList;
};
const getJarOptions = ()=>{
let itemList = [];
for(let item of jars){
let tag =(<><Tag color={item.enabled?"processing":"error"}>{item.type}</Tag>{item.alias}</>);
itemList.push(<Option key={item.id} value={item.id} label={tag}>
{tag}
</Option>)
}
return itemList;
};
useEffect(()=>{
form.setFieldsValue(current.task);
},[current.task]);
......@@ -132,6 +144,22 @@ const StudioSetting = (props: any) => {
</Form.Item>
</Col>
</Row>):''}
{(current.task.type=='yarn-application')?(
<Row>
<Col span={24}>
<Form.Item label="可执行 Jar" tooltip={`选择可执行 Jar 进行 ${current.task.type} 模式的远程提交 Jar 任务。当该参数项存在值时,将只提交可执行 Jar.`} name="jarId"
className={styles.form_item}>
<Select
style={{ width: '100%' }}
placeholder="选择可执行Jar,非必填"
allowClear
optionLabelProp="label"
>
{getJarOptions()}
</Select>
</Form.Item>
</Col>
</Row>):''}
<Form.Item
label="作业名" className={styles.form_item} name="jobName"
tooltip='设置任务名称,默认为作业名'
......@@ -237,11 +265,12 @@ const StudioSetting = (props: any) => {
);
};
export default connect(({Studio}: { Studio: StateType }) => ({
export default connect(({Studio,Jar}: { Studio: StateType,Jar: JarStateType }) => ({
sessionCluster: Studio.sessionCluster,
clusterConfiguration: Studio.clusterConfiguration,
current: Studio.current,
tabs: Studio.tabs,
session: Studio.session,
currentSession: Studio.currentSession,
jars: Jar.jars,
}))(StudioSetting);
......@@ -13,7 +13,7 @@ import StudioLeftTool from "./StudioLeftTool";
import StudioRightTool from "./StudioRightTool";
import {
listSession, showCluster, showDataBase, getFillAllByVersion,
showClusterConfiguration,showSessionCluster
showClusterConfiguration, showSessionCluster, showJars
} from "@/components/Studio/StudioEvent/DDL";
import {loadSettings} from "@/pages/Settings/function";
......@@ -33,6 +33,7 @@ const Studio: React.FC<StudioProps> = (props) => {
showClusterConfiguration(dispatch);
showDataBase(dispatch);
listSession(dispatch);
showJars(dispatch);
const onClick=()=>{
if(rightClickMenu){
......
......@@ -65,8 +65,9 @@ export type TaskType = {
config?: [],
clusterId?: any,
clusterName?: string,
clusterConfigurationId?: string,
clusterConfigurationId?: number,
clusterConfigurationName?: string,
jarId?:number,
note?: string,
enabled?: boolean,
createTime?: Date,
......@@ -195,6 +196,7 @@ const Model: ModelType = {
clusterName: "本地环境",
clusterConfigurationId:undefined,
clusterConfigurationName:undefined,
jarId:undefined,
maxRowNum: 100,
config: [],
session: '',
......@@ -234,6 +236,7 @@ const Model: ModelType = {
clusterName: "本地环境",
clusterConfigurationId:undefined,
clusterConfigurationName:undefined,
jarId:undefined,
session: '',
config: [],
maxRowNum: 100,
......
import {Effect, Reducer} from "umi";
import {JarTableListItem} from "@/pages/Jar/data";
export type JarStateType = {
jars:JarTableListItem[],
};
export type ModelType = {
namespace: string;
state: JarStateType;
effects: {
};
reducers: {
saveJars: Reducer<JarStateType>;
};
};
const JarModel: ModelType = {
namespace: 'Jar',
state: {
jars:[],
},
effects: {
},
reducers: {
saveJars(state, {payload}) {
return {
...state,
jars: payload,
};
},
},
};
export default JarModel;
......@@ -411,6 +411,9 @@ export default (): React.ReactNode => {
<li>
<Link>优化项目结构与打包结构</Link>
</li>
<li>
<Link>新增yarn-application 的自定义Jar提交</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment