Commit f1377fbb authored by wenmo's avatar wenmo

运行模式选择优化

parent 18aa6bbc
package com.dlink.assertion;
/**
* Tips
*
* @author wenmo
* @since 2021/12/9 23:19
*/
public class Tips {
public static final String TASK_NOT_EXIST = "作业不存在";
}
......@@ -20,6 +20,7 @@ import java.util.Map;
@Getter
@Setter
public class StudioExecuteDTO {
// RUN_MODE
private String type;
private boolean useResult;
private boolean useStatementSet;
......
......@@ -62,6 +62,7 @@ public class StudioServiceImpl implements StudioService {
@Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
// If you are using a shared session, configure the current jobmanager address
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
......
......@@ -2,6 +2,7 @@ package com.dlink.service.impl;
import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts;
import com.dlink.assertion.Tips;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig;
......@@ -52,13 +53,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public JobResult submitByTaskId(Integer id) {
Task task = this.getTaskInfoById(id);
Assert.check(task);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
boolean isJarTask = isJarTask(task);
/*Statement statement = null;
if(!isJarTask){
statement = statementService.getById(id);
Assert.check(statement);
}*/
JobConfig config = task.buildSubmitConfig();
if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
......
......@@ -104,7 +104,7 @@ public class JobConfig {
}
public ExecutorSetting getExecutorSetting(){
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,savePointPath,jobName,config);
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,useStatementSet,savePointPath,jobName,config);
}
public void setSessionConfig(SessionConfig sessionConfig){
......
......@@ -11,10 +11,8 @@ import com.dlink.explainer.Explainer;
import com.dlink.gateway.Gateway;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.ActionType;
import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
......@@ -49,13 +47,14 @@ public class JobManager extends RunTime {
private static final Logger logger = LoggerFactory.getLogger(JobManager.class);
private JobHandler handler;
private String sessionId;
private Integer maxRowNum = 100;
private EnvironmentSetting environmentSetting;
private ExecutorSetting executorSetting;
private JobConfig config;
private Executor executor;
private boolean useGateway = false;
private boolean isPlanMode = false;
private GatewayType runMode = GatewayType.LOCAL;
public JobManager() {
}
......@@ -64,20 +63,8 @@ public class JobManager extends RunTime {
this.useGateway = useGateway;
}
public JobManager(String address, ExecutorSetting executorSetting) {
if (address != null) {
this.environmentSetting = EnvironmentSetting.build(address);
this.executorSetting = executorSetting;
this.executor = createExecutor();
}
}
public JobManager(String address, String sessionId, Integer maxRowNum, ExecutorSetting executorSetting) {
this.environmentSetting = EnvironmentSetting.build(address);
this.sessionId = sessionId;
this.maxRowNum = maxRowNum;
this.executorSetting = executorSetting;
this.executor = createExecutorWithSession();
public void setPlanMode(boolean planMode) {
isPlanMode = planMode;
}
public JobManager(JobConfig config) {
......@@ -97,6 +84,13 @@ public class JobManager extends RunTime {
return manager;
}
public static JobManager buildPlanMode(JobConfig config) {
JobManager manager = new JobManager(config);
manager.setPlanMode(true);
manager.init();
return manager;
}
private static void initGatewayConfig(JobConfig config) {
if (useGateway(config.getType())) {
Asserts.checkNull(config.getGatewayConfig(), "GatewayConfig 不能为空");
......@@ -109,13 +103,12 @@ public class JobManager extends RunTime {
}
public static boolean useGateway(String type) {
return (GatewayType.YARN_PER_JOB.equalsValue(type) ||
GatewayType.YARN_APPLICATION.equalsValue(type));
return (GatewayType.YARN_PER_JOB.equalsValue(type) || GatewayType.YARN_APPLICATION.equalsValue(type));
}
private Executor createExecutor() {
initEnvironmentSetting();
if (!useGateway && config.isUseRemote() && config.getClusterId() != 0) {
if (!runMode.equals(GatewayType.LOCAL)&& !useGateway && config.isUseRemote()) {
executor = Executor.buildRemoteExecutor(environmentSetting, config.getExecutorSetting());
return executor;
} else {
......@@ -154,8 +147,11 @@ public class JobManager extends RunTime {
@Override
public boolean init() {
useGateway = useGateway(config.getType());
handler = JobHandler.build();
if(!isPlanMode) {
runMode = GatewayType.get(config.getType());
useGateway = useGateway(config.getType());
handler = JobHandler.build();
}
initExecutorSetting();
createExecutorWithSession();
return false;
......
......@@ -23,43 +23,6 @@ import java.util.List;
**/
public class JobManagerTest {
@Test
public void executeJobTest(){
ExecutorSetting setting = new ExecutorSetting(0,1,false,null);
JobManager jobManager = new JobManager("192.168.123.157:8081","test2",100, setting);
String sql1 ="CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (sid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://192.168.24.1:3306/data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" +
" 'username'='datalink',\n" +
" 'password'='datalink',\n" +
" 'table-name' = 'student'\n" +
");";
String sql2 ="CREATE TABLE man (\n" +
" pid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (pid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://192.168.24.1:3306/data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" +
" 'username'='datalink',\n" +
" 'password'='datalink',\n" +
" 'table-name' = 'man'\n" +
");";
String sql3 = "SELECT sid as pid,name from student";
List<String> sqls = new ArrayList<>();
sqls.add(sql1);
sqls.add(sql2);
sqls.add(sql3);
String sql = sql1+sql2+sql3;
JobResult jobResult = jobManager.executeSql(sql);
System.out.println(jobResult.isSuccess());
}
@Test
public void cancelJobSelect(){
......
export const RUN_MODE = {
LOCAL:'local',
STANDALONE:'standalone',
YARN_SESSION:'yarn-session',
YARN_PER_JOB:'yarn-per-job',
YARN_APPLICATION:'yarn-application',
};
......@@ -3,10 +3,11 @@ import {StateType} from "@/pages/FlinkSqlStudio/model";
import {Form, InputNumber, Input, Switch, Select, Tag, Row, Col, Badge, Tooltip, Button, Typography, Space} from "antd";
import {InfoCircleOutlined, PlusOutlined, MinusSquareOutlined, MinusCircleOutlined} from "@ant-design/icons";
import styles from "./index.less";
import {useEffect, useState} from "react";
import {useEffect} from "react";
import {showTables} from "@/components/Studio/StudioEvent/DDL";
import {JarStateType} from "@/pages/Jar/model";
import {Scrollbars} from 'react-custom-scrollbars';
import {Scrollbars} from "react-custom-scrollbars";
import {RUN_MODE} from "@/components/Studio/StudioRightTool/StudioSetting/conf";
const {Option} = Select;
const {Text} = Typography;
......@@ -16,12 +17,13 @@ const StudioSetting = (props: any) => {
const {sessionCluster, clusterConfiguration, current, form, dispatch, tabs, currentSession, jars, toolHeight} = props;
const getClusterOptions = () => {
let itemList = [(<Option key={0} value={0} label={(<><Tag color="default">Local</Tag>本地环境</>)}>
/* const itemList = [(<Option key={0} value={0} label={(<><Tag color="default">Local</Tag>本地环境</>)}>
<Tag color="default">Local</Tag>
本地环境
</Option>)];
for (let item of sessionCluster) {
let tag = (<><Tag color={item.enabled ? "processing" : "error"}>{item.type}</Tag>{item.alias}</>);
</Option>)]; */
const itemList = [];
for (const item of sessionCluster) {
const tag = (<><Tag color={item.enabled ? "processing" : "error"}>{item.type}</Tag>{item.alias}</>);
itemList.push(<Option key={item.id} value={item.id} label={tag}>
{tag}
</Option>)
......@@ -30,9 +32,9 @@ const StudioSetting = (props: any) => {
};
const getClusterConfigurationOptions = () => {
let itemList = [];
for (let item of clusterConfiguration) {
let tag = (<><Tag color={item.enabled ? "processing" : "error"}>{item.type}</Tag>{item.alias}</>);
const itemList = [];
for (const item of clusterConfiguration) {
const tag = (<><Tag color={item.enabled ? "processing" : "error"}>{item.type}</Tag>{item.alias}</>);
itemList.push(<Option key={item.id} value={item.id} label={tag}>
{tag}
</Option>)
......@@ -41,9 +43,9 @@ const StudioSetting = (props: any) => {
};
const getJarOptions = () => {
let itemList = [];
for (let item of jars) {
let tag = (<><Tag color={item.enabled ? "processing" : "error"}>{item.type}</Tag>{item.alias}</>);
const itemList = [];
for (const item of jars) {
const tag = (<><Tag color={item.enabled ? "processing" : "error"}>{item.type}</Tag>{item.alias}</>);
itemList.push(<Option key={item.id} value={item.id} label={tag}>
{tag}
</Option>)
......@@ -57,16 +59,16 @@ const StudioSetting = (props: any) => {
const onValuesChange = (change: any, all: any) => {
let newTabs = tabs;
const newTabs = tabs;
for (let i = 0; i < newTabs.panes.length; i++) {
if (newTabs.panes[i].key == newTabs.activeKey) {
for (let key in change) {
if (newTabs.panes[i].key === newTabs.activeKey) {
for (const key in change) {
newTabs.panes[i].task[key] = all[key];
}
break;
}
}
dispatch && dispatch({
dispatch({
type: "Studio/saveTabs",
payload: newTabs,
});
......@@ -98,16 +100,17 @@ const StudioSetting = (props: any) => {
>
<Form.Item
label="执行模式" className={styles.form_item} name="type"
tooltip='指定 Flink 任务的执行模式,默认为 yarn-session'
tooltip='指定 Flink 任务的执行模式,默认为 Local'
>
<Select defaultValue="yarn-session" value="yarn-session">
<Option value="standalone">standalone</Option>
<Option value="yarn-session">yarn-session</Option>
<Option value="yarn-per-job">yarn-per-job</Option>
<Option value="yarn-application">yarn-application</Option>
<Select defaultValue={RUN_MODE.LOCAL} value={RUN_MODE.LOCAL}>
<Option value={RUN_MODE.LOCAL}>Local</Option>
<Option value={RUN_MODE.STANDALONE}>Standalone</Option>
<Option value={RUN_MODE.YARN_SESSION}>Yarn Session</Option>
<Option value={RUN_MODE.YARN_PER_JOB}>Yarn Per-Job</Option>
<Option value={RUN_MODE.YARN_APPLICATION}>Yarn Application</Option>
</Select>
</Form.Item>
{(current.task.type == 'yarn-session' || current.task.type == 'standalone') ? (
{(current.task.type === RUN_MODE.YARN_SESSION || current.task.type === RUN_MODE.STANDALONE) ? (
<Row>
<Col span={24}>
<Form.Item label="Flink集群" tooltip={`选择Flink集群进行 ${current.task.type} 模式的远程提交任务`} name="clusterId"
......@@ -130,8 +133,8 @@ const StudioSetting = (props: any) => {
}
</Form.Item>
</Col>
</Row>) : ''}
{(current.task.type == 'yarn-per-job' || current.task.type == 'yarn-application') ? (
</Row>) : undefined}
{(current.task.type === RUN_MODE.YARN_PER_JOB || current.task.type === RUN_MODE.YARN_APPLICATION) ? (
<Row>
<Col span={24}>
<Form.Item label="Flink集群配置" tooltip={`选择Flink集群配置进行 ${current.task.type} 模式的远程提交任务`}
......@@ -147,8 +150,8 @@ const StudioSetting = (props: any) => {
</Select>
</Form.Item>
</Col>
</Row>) : ''}
{(current.task.type == 'yarn-application') ? (
</Row>) : undefined}
{(current.task.type === RUN_MODE.YARN_APPLICATION) ? (
<Row>
<Col span={24}>
<Form.Item label="可执行 Jar"
......@@ -165,7 +168,7 @@ const StudioSetting = (props: any) => {
</Select>
</Form.Item>
</Col>
</Row>) : ''}
</Row>) : undefined}
<Form.Item
label="作业名" className={styles.form_item} name="jobName"
tooltip='设置任务名称,默认为作业名'
......@@ -219,10 +222,10 @@ const StudioSetting = (props: any) => {
<Option value={0}>禁用</Option>
<Option value={1}>最近一次</Option>
<Option value={2}>最早一次</Option>
<Option value={3}>自定义</Option>
<Option value={3}>指定一次</Option>
</Select>
</Form.Item>
{current.task.savePointStrategy == 3 ?
{current.task.savePointStrategy === 3 ?
(<Form.Item
label="SavePointPath" className={styles.form_item} name="savePointPath"
tooltip='从SavePointPath恢复Flink任务'
......@@ -244,7 +247,6 @@ const StudioSetting = (props: any) => {
<Form.Item
{...restField}
name={[name, 'key']}
// fieldKey={[fieldKey, 'key']}
style={{marginBottom: '5px'}}
>
<Input placeholder="参数"/>
......@@ -252,7 +254,6 @@ const StudioSetting = (props: any) => {
<Form.Item
{...restField}
name={[name, 'value']}
// fieldKey={[fieldKey, 'value']}
style={{marginBottom: '5px'}}
>
<Input placeholder="值"/>
......
import React, {useEffect, useState} from 'react';
import React, {useState} from 'react';
import {Form, Button, Input, Modal, Select,Divider,Space,Switch} from 'antd';
import { MinusCircleOutlined, PlusOutlined } from '@ant-design/icons';
import {ClusterConfigurationTableListItem} from "@/pages/ClusterConfiguration/data";
import type {ClusterConfigurationTableListItem} from "@/pages/ClusterConfiguration/data";
import {getConfig, getConfigFormValues} from "@/pages/ClusterConfiguration/function";
import {FLINK_CONFIG_LIST, HADOOP_CONFIG_LIST} from "@/pages/ClusterConfiguration/conf";
import type {Config} from "@/pages/ClusterConfiguration/conf";
......@@ -13,7 +13,7 @@ export type ClusterConfigurationFormProps = {
modalVisible: boolean;
values: Partial<ClusterConfigurationTableListItem>;
};
const Option = Select.Option;
const {Option} = Select;
const formLayout = {
labelCol: {span: 7},
......@@ -39,22 +39,22 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
modalVisible,
} = props;
const buildConfig = (config:Config[]) =>{
let itemList = [];
for(let i in config){
const buildConfig = (config: Config[]) =>{
const itemList: JSX.Element[] = [];
config.forEach(configItem => {
itemList.push(<Form.Item
name={config[i].name}
label={config[i].lable}
name={configItem.name}
label={configItem.lable}
>
<Input placeholder={config[i].placeholder}/>
<Input placeholder={configItem.placeholder}/>
</Form.Item>)
}
});
return itemList;
};
const submitForm = async () => {
const fieldsValue = await form.validateFields();
let formValues = {
const formValues = {
id:formVals.id,
name:fieldsValue.name,
alias:fieldsValue.alias,
......@@ -67,7 +67,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
handleSubmit(formValues);
};
const renderContent = (formVals) => {
const renderContent = (formValsPara: Partial<ClusterConfigurationTableListItem>) => {
return (
<>
<Form.Item
......@@ -200,7 +200,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
name="enabled"
label="是否启用">
<Switch checkedChildren="启用" unCheckedChildren="禁用"
defaultChecked={formVals.enabled}/>
defaultChecked={formValsPara.enabled}/>
</Form.Item>
</>
);
......@@ -208,14 +208,15 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
const testForm = async ()=>{
const fieldsValue = await form.validateFields();
let formValues = {
const formValues = {
id :formVals.id,
name:fieldsValue.name,
alias:fieldsValue.alias,
type:fieldsValue.type,
note:fieldsValue.note,
enabled:fieldsValue.enabled,
configJson:JSON.stringify(getConfig(fieldsValue)),
};
} as ClusterConfigurationTableListItem;
setFormVals(formValues);
testClusterConfigurationConnect(formValues);
};
......
......@@ -4,46 +4,50 @@ export type Config = {
placeholder: string
}
export function HADOOP_CONFIG_NAME_LIST () {
let list: string[] = [];
for (let i in HADOOP_CONFIG_LIST) {
list.push(HADOOP_CONFIG_LIST[i].name);
}
return list;
}
export function FLINK_CONFIG_NAME_LIST() {
let list: string[] = [];
for (let i in FLINK_CONFIG_LIST) {
list.push(FLINK_CONFIG_LIST[i].name);
}
return list;
}
export const HADOOP_CONFIG_LIST: Config[] = [{
name: 'ha.zookeeper.quorum',
lable: 'ha.zookeeper.quorum',
placeholder: '值如 192.168.123.1:2181,192.168.123.2:2181,192.168.123.3:2181',
placeholder: '192.168.123.1:2181,192.168.123.2:2181,192.168.123.3:2181',
}];
export const FLINK_CONFIG_LIST: Config[] = [{
name: 'jobmanager.memory.process.size',
lable: 'jobmanager.memory.process.size',
placeholder: '值如 1600m',
placeholder: '1600m',
}, {
name: 'taskmanager.memory.flink.size',
lable: 'taskmanager.memory.flink.size',
placeholder: '值如 2048m',
placeholder: '2048m',
}, {
name: 'taskmanager.memory.framework.heap.size',
lable: 'taskmanager.memory.framework.heap.size',
placeholder: '值如 1024m',
placeholder: '1024m',
}, {
name: 'taskmanager.numberOfTaskSlots',
lable: 'taskmanager.numberOfTaskSlots',
placeholder: '值如 4',
placeholder: '4',
}, {
name: 'parallelism.default',
lable: 'parallelism.default',
placeholder: '值如 4',
placeholder: '1',
}, {
name: 'state.savepoints.dir',
lable: 'state.savepoints.dir',
placeholder: 'hdfs:///flink/savepoints/',
}
];
export function HADOOP_CONFIG_NAME_LIST () {
const list: string[] = [];
HADOOP_CONFIG_LIST.forEach(item => {
list.push(item.name);
});
return list;
}
export function FLINK_CONFIG_NAME_LIST() {
const list: string[] = [];
FLINK_CONFIG_LIST.forEach(item => {
list.push(item.name);
});
return list;
}
......@@ -457,6 +457,12 @@ export default (): React.ReactNode => {
<li>
<Link>新增 SQLServer Jdbc Connector 的实现</Link>
</li>
<li>
<Link>修复编辑集群配置测试后保存会新建的bug</Link>
</li>
<li>
<Link>新增 Local 的运行模式选择并优化 JobManager</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment