Commit b23cc76b authored by godkaikai's avatar godkaikai

FlinkSQLEnv实现

parent af3b7796
...@@ -105,5 +105,13 @@ public class TaskController { ...@@ -105,5 +105,13 @@ public class TaskController {
Task task = taskService.getTaskInfoById(id); Task task = taskService.getTaskInfoById(id);
return Result.succeed(task,"获取成功"); return Result.succeed(task,"获取成功");
} }
/**
* 获取所有可用的 FlinkSQLEnv
*/
@GetMapping(value = "/listFlinkSQLEnv")
public Result listFlinkSQLEnv() {
return Result.succeed(taskService.listFlinkSQLEnv(),"获取成功");
}
} }
...@@ -20,14 +20,14 @@ import java.util.Map; ...@@ -20,14 +20,14 @@ import java.util.Map;
*/ */
@Getter @Getter
@Setter @Setter
public class APIExecuteSqlDTO { public class APIExecuteSqlDTO extends AbstractStatementDTO{
// RUN_MODE // RUN_MODE
private String type; private String type;
private boolean useResult = false; private boolean useResult = false;
private boolean useStatementSet = false; private boolean useStatementSet = false;
private String address; private String address;
private boolean fragment = false; private boolean fragment = false;
private String statement; // private String statement;
private String jobName; private String jobName;
private Integer maxRowNum = 100; private Integer maxRowNum = 100;
private Integer checkPoint = 0; private Integer checkPoint = 0;
......
...@@ -17,10 +17,10 @@ import java.util.Map; ...@@ -17,10 +17,10 @@ import java.util.Map;
*/ */
@Getter @Getter
@Setter @Setter
public class APIExplainSqlDTO { public class APIExplainSqlDTO extends AbstractStatementDTO{
private boolean useStatementSet = false; private boolean useStatementSet = false;
private boolean fragment = false; private boolean fragment = false;
private String statement; // private String statement;
private Integer parallelism; private Integer parallelism;
private Map<String, String> configuration; private Map<String, String> configuration;
......
package com.dlink.dto;
/**
* AbstractStatementDTO
*
* @author wenmo
* @since 2021/12/29
**/
public class AbstractStatementDTO {
private String statement;
private Integer envId;
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public Integer getEnvId() {
return envId;
}
public void setEnvId(Integer envId) {
this.envId = envId;
}
}
...@@ -11,8 +11,8 @@ import lombok.Setter; ...@@ -11,8 +11,8 @@ import lombok.Setter;
**/ **/
@Getter @Getter
@Setter @Setter
public class StudioCADTO { public class StudioCADTO extends AbstractStatementDTO{
private String statement; // private String statement;
/* 1:单表表级血缘 /* 1:单表表级血缘
* 2:单表字段血缘 * 2:单表字段血缘
* 3.全局表级血缘 * 3.全局表级血缘
......
...@@ -19,7 +19,7 @@ import java.util.Map; ...@@ -19,7 +19,7 @@ import java.util.Map;
*/ */
@Getter @Getter
@Setter @Setter
public class StudioExecuteDTO { public class StudioExecuteDTO extends AbstractStatementDTO{
// RUN_MODE // RUN_MODE
private String type; private String type;
private String dialect; private String dialect;
...@@ -33,7 +33,7 @@ public class StudioExecuteDTO { ...@@ -33,7 +33,7 @@ public class StudioExecuteDTO {
private Integer databaseId; private Integer databaseId;
private Integer jarId; private Integer jarId;
private boolean fragment; private boolean fragment;
private String statement; // private String statement;
private String jobName; private String jobName;
private Integer taskId; private Integer taskId;
private Integer maxRowNum; private Integer maxRowNum;
......
...@@ -56,6 +56,8 @@ public class Task extends SuperEntity{ ...@@ -56,6 +56,8 @@ public class Task extends SuperEntity{
private Integer jarId; private Integer jarId;
private Integer envId;
private String configJson; private String configJson;
private String note; private String note;
......
...@@ -5,6 +5,8 @@ import com.dlink.db.service.ISuperService; ...@@ -5,6 +5,8 @@ import com.dlink.db.service.ISuperService;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.model.Task; import com.dlink.model.Task;
import java.util.List;
/** /**
* 作业 服务类 * 作业 服务类
* *
...@@ -18,4 +20,6 @@ public interface TaskService extends ISuperService<Task> { ...@@ -18,4 +20,6 @@ public interface TaskService extends ISuperService<Task> {
Task getTaskInfoById(Integer id); Task getTaskInfoById(Integer id);
boolean saveOrUpdateTask(Task task); boolean saveOrUpdateTask(Task task);
List<Task> listFlinkSQLEnv();
} }
...@@ -3,6 +3,7 @@ package com.dlink.service.impl; ...@@ -3,6 +3,7 @@ package com.dlink.service.impl;
import com.dlink.api.FlinkAPI; import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.config.Dialect; import com.dlink.config.Dialect;
import com.dlink.dto.AbstractStatementDTO;
import com.dlink.dto.SessionDTO; import com.dlink.dto.SessionDTO;
import com.dlink.dto.StudioDDLDTO; import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO; import com.dlink.dto.StudioExecuteDTO;
...@@ -20,6 +21,7 @@ import com.dlink.metadata.result.JdbcSelectResult; ...@@ -20,6 +21,7 @@ import com.dlink.metadata.result.JdbcSelectResult;
import com.dlink.model.Cluster; import com.dlink.model.Cluster;
import com.dlink.model.DataBase; import com.dlink.model.DataBase;
import com.dlink.model.Savepoints; import com.dlink.model.Savepoints;
import com.dlink.model.Task;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import com.dlink.result.SelectResult; import com.dlink.result.SelectResult;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
...@@ -61,6 +63,17 @@ public class StudioServiceImpl implements StudioService { ...@@ -61,6 +63,17 @@ public class StudioServiceImpl implements StudioService {
private SavepointsService savepointsService; private SavepointsService savepointsService;
@Autowired @Autowired
private DataBaseService dataBaseService; private DataBaseService dataBaseService;
@Autowired
private TaskService taskService;
private void addFlinkSQLEnv(AbstractStatementDTO statementDTO){
if(Asserts.isNotNull(statementDTO.getEnvId())){
Task task = taskService.getTaskInfoById(statementDTO.getEnvId());
if(Asserts.isNotNull(task)&&Asserts.isNotNullString(task.getStatement())) {
statementDTO.setStatement(task.getStatement() + "\r\n" + statementDTO.getStatement());
}
}
}
@Override @Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) { public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
...@@ -72,6 +85,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -72,6 +85,7 @@ public class StudioServiceImpl implements StudioService {
} }
private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) { private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
// If you are using a shared session, configure the current jobmanager address // If you are using a shared session, configure the current jobmanager address
if(!config.isUseSession()) { if(!config.isUseSession()) {
...@@ -135,6 +149,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -135,6 +149,7 @@ public class StudioServiceImpl implements StudioService {
} }
private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO) { private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
if(!config.isUseSession()) { if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
...@@ -164,6 +179,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -164,6 +179,7 @@ public class StudioServiceImpl implements StudioService {
@Override @Override
public ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO) { public ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
config.setType(GatewayType.LOCAL.getLongValue()); config.setType(GatewayType.LOCAL.getLongValue());
if(!config.isUseSession()) { if(!config.isUseSession()) {
...@@ -175,6 +191,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -175,6 +191,7 @@ public class StudioServiceImpl implements StudioService {
@Override @Override
public ObjectNode getJobPlan(StudioExecuteDTO studioExecuteDTO) { public ObjectNode getJobPlan(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
config.setType(GatewayType.LOCAL.getLongValue()); config.setType(GatewayType.LOCAL.getLongValue());
if(!config.isUseSession()) { if(!config.isUseSession()) {
......
package com.dlink.service.impl; package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Assert; import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.assertion.Tips; import com.dlink.assertion.Tips;
import com.dlink.config.Dialect;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
...@@ -56,6 +58,12 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -56,6 +58,12 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Task task = this.getTaskInfoById(id); Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST); Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
boolean isJarTask = isJarTask(task); boolean isJarTask = isJarTask(task);
if(!isJarTask&&Asserts.isNotNull(task.getEnvId())){
Task envTask = getTaskInfoById(task.getEnvId());
if(Asserts.isNotNull(envTask)&&Asserts.isNotNullString(envTask.getStatement())) {
task.setStatement(envTask.getStatement() + "\r\n" + task.getStatement());
}
}
JobConfig config = task.buildSubmitConfig(); JobConfig config = task.buildSubmitConfig();
if (!JobManager.useGateway(config.getType())) { if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
...@@ -160,4 +168,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -160,4 +168,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return true; return true;
} }
@Override
public List<Task> listFlinkSQLEnv() {
return this.list(new QueryWrapper<Task>().eq("dialect", Dialect.FLINKSQLENV).eq("enabled",1));
}
} }
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
<result column="cluster_configuration_id" property="clusterConfigurationId" /> <result column="cluster_configuration_id" property="clusterConfigurationId" />
<result column="database_id" property="databaseId" /> <result column="database_id" property="databaseId" />
<result column="jar_id" property="jarId" /> <result column="jar_id" property="jarId" />
<result column="env_id" property="envId" />
<result column="config_json" property="configJson" /> <result column="config_json" property="configJson" />
<result column="note" property="note" /> <result column="note" property="note" />
<result column="enabled" property="enabled" /> <result column="enabled" property="enabled" />
...@@ -28,7 +29,7 @@ ...@@ -28,7 +29,7 @@
<!-- 通用查询结果列 --> <!-- 通用查询结果列 -->
<sql id="Base_Column_List"> <sql id="Base_Column_List">
id, name, alias,dialect, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,database_id,jar_id,config_json,note, enabled, create_time, update_time id, name, alias,dialect, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,database_id,jar_id,env_id,config_json,note, enabled, create_time, update_time
</sql> </sql>
......
...@@ -2,6 +2,7 @@ package com.dlink.app.flinksql; ...@@ -2,6 +2,7 @@ package com.dlink.app.flinksql;
import com.dlink.app.db.DBConfig; import com.dlink.app.db.DBConfig;
import com.dlink.app.db.DBUtil; import com.dlink.app.db.DBUtil;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
...@@ -38,8 +39,8 @@ public class Submiter { ...@@ -38,8 +39,8 @@ public class Submiter {
throw new SQLException("请指定任务ID"); throw new SQLException("请指定任务ID");
} }
return "select id, name, alias as jobName, type,check_point as checkpoint," + return "select id, name, alias as jobName, type,check_point as checkpoint," +
"save_point_path as savePointPath, parallelism,fragment as useSqlFragment,statement_set as useStatementSet,config_json as config" + "save_point_path as savePointPath, parallelism,fragment as useSqlFragment,statement_set as useStatementSet,config_json as config," +
" from dlink_task where id = " + id; " env_id as envId from dlink_task where id = " + id;
} }
private static String getFlinkSQLStatement(Integer id, DBConfig config) { private static String getFlinkSQLStatement(Integer id, DBConfig config) {
...@@ -68,13 +69,20 @@ public class Submiter { ...@@ -68,13 +69,20 @@ public class Submiter {
return task; return task;
} }
public static List<String> getStatements(Integer id, DBConfig config){ public static List<String> getStatements(String sql){
return Arrays.asList(getFlinkSQLStatement(id, config).split(FlinkSQLConstant.SEPARATOR)); return Arrays.asList(sql.split(FlinkSQLConstant.SEPARATOR));
} }
public static void submit(Integer id,DBConfig dbConfig){ public static void submit(Integer id,DBConfig dbConfig){
logger.info(LocalDateTime.now() + "开始提交作业 -- "+id); logger.info(LocalDateTime.now() + "开始提交作业 -- "+id);
List<String> statements = Submiter.getStatements(id, dbConfig); StringBuilder sb = new StringBuilder();
Map<String, String> taskConfig = Submiter.getTaskConfig(id, dbConfig);
if(Asserts.isNotNull(taskConfig.get("envId"))){
sb.append(getFlinkSQLStatement(Integer.valueOf(taskConfig.get("envId")), dbConfig));
sb.append("\r\n");
}
sb.append(getFlinkSQLStatement(id, dbConfig));
List<String> statements = Submiter.getStatements(sb.toString());
ExecutorSetting executorSetting = ExecutorSetting.build(Submiter.getTaskConfig(id,dbConfig)); ExecutorSetting executorSetting = ExecutorSetting.build(Submiter.getTaskConfig(id,dbConfig));
logger.info("作业配置如下: "+executorSetting.toString()); logger.info("作业配置如下: "+executorSetting.toString());
Executor executor = Executor.buildAppStreamExecutor(executorSetting); Executor executor = Executor.buildAppStreamExecutor(executorSetting);
......
...@@ -10,7 +10,7 @@ import com.dlink.assertion.Asserts; ...@@ -10,7 +10,7 @@ import com.dlink.assertion.Asserts;
**/ **/
public enum Dialect { public enum Dialect {
FLINKSQL("FlinkSql"),SQL("Sql"),JAVA("Java"); FLINKSQL("FlinkSql"),FLINKSQLENV("FlinkSqlEnv"),SQL("Sql"),JAVA("Java");
private String value; private String value;
......
...@@ -233,6 +233,7 @@ CREATE TABLE `dlink_task` ( ...@@ -233,6 +233,7 @@ CREATE TABLE `dlink_task` (
`cluster_configuration_id` int(11) NULL DEFAULT NULL COMMENT '集群配置ID', `cluster_configuration_id` int(11) NULL DEFAULT NULL COMMENT '集群配置ID',
`database_id` int(11) NULL DEFAULT NULL COMMENT '数据源ID', `database_id` int(11) NULL DEFAULT NULL COMMENT '数据源ID',
`jar_id` int(11) NULL DEFAULT NULL COMMENT 'jarID', `jar_id` int(11) NULL DEFAULT NULL COMMENT 'jarID',
`env_id` int(11) NULL DEFAULT NULL COMMENT '环境ID',
`config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON', `config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON',
`note` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '注释', `note` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '注释',
`enabled` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否启用', `enabled` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否启用',
......
...@@ -479,4 +479,10 @@ ADD COLUMN `dialect` varchar(50) NULL COMMENT '方言' AFTER `alias`; ...@@ -479,4 +479,10 @@ ADD COLUMN `dialect` varchar(50) NULL COMMENT '方言' AFTER `alias`;
ALTER TABLE `dlink_task` ALTER TABLE `dlink_task`
ADD COLUMN `database_id` int(11) NULL COMMENT '数据源ID' AFTER `cluster_configuration_id`; ADD COLUMN `database_id` int(11) NULL COMMENT '数据源ID' AFTER `cluster_configuration_id`;
-- ----------------------------
-- 0.5.0-SNAPSHOT 2021-12-29
-- ----------------------------
ALTER TABLE `dlink_task`
ADD COLUMN `env_id` int(11) NULL COMMENT '环境ID' AFTER `jar_id`;
SET FOREIGN_KEY_CHECKS = 1; SET FOREIGN_KEY_CHECKS = 1;
...@@ -144,6 +144,16 @@ export function showDataBase(dispatch: any) { ...@@ -144,6 +144,16 @@ export function showDataBase(dispatch: any) {
}); });
}); });
} }
/*--- 刷新 执行环境 ---*/
export function showEnv(dispatch: any) {
const res = getData('api/task/listFlinkSQLEnv');
res.then((result) => {
result.datas && dispatch && dispatch({
type: "Studio/saveEnv",
payload: result.datas,
});
});
}
/*--- 刷新 自定义Jar ---*/ /*--- 刷新 自定义Jar ---*/
export function showJars(dispatch: any) { export function showJars(dispatch: any) {
const res = getData('api/jar/listEnabledAll'); const res = getData('api/jar/listEnabledAll');
......
...@@ -235,8 +235,7 @@ const StudioMenu = (props: any) => { ...@@ -235,8 +235,7 @@ const StudioMenu = (props: any) => {
} }
} }
return result; return result;
} };
const getTextWidth = (text:string, font:string) => { const getTextWidth = (text:string, font:string) => {
var canvas = getTextWidth.canvas || (getTextWidth.canvas = document.createElement("canvas")); var canvas = getTextWidth.canvas || (getTextWidth.canvas = document.createElement("canvas"));
...@@ -245,6 +244,7 @@ const StudioMenu = (props: any) => { ...@@ -245,6 +244,7 @@ const StudioMenu = (props: any) => {
var metrics = context.measureText(text); var metrics = context.measureText(text);
return metrics.width; return metrics.width;
} }
const escape2Html = (str:string) => { const escape2Html = (str:string) => {
let arrEntities={'lt':'<','gt':'>','nbsp':' ','amp':'&','quot':'"'}; let arrEntities={'lt':'<','gt':'>','nbsp':' ','amp':'&','quot':'"'};
return str.replace(/&(lt|gt|nbsp|amp|quot);/ig,function(all,t){return arrEntities[t];}); return str.replace(/&(lt|gt|nbsp|amp|quot);/ig,function(all,t){return arrEntities[t];});
...@@ -358,13 +358,15 @@ const StudioMenu = (props: any) => { ...@@ -358,13 +358,15 @@ const StudioMenu = (props: any) => {
onClick={onCheckSql} onClick={onCheckSql}
/> />
</Tooltip> </Tooltip>
{current.task.dialect == DIALECT.FLINKSQL &&(
<Tooltip title="获取当前的 FlinkSql 的执行图"> <Tooltip title="获取当前的 FlinkSql 的执行图">
<Button <Button
type="text" type="text"
icon={<FlagTwoTone/>} icon={<FlagTwoTone/>}
onClick={onGetStreamGraph} onClick={onGetStreamGraph}
/> />
</Tooltip> </Tooltip>)}
{(current.task.dialect == DIALECT.FLINKSQL||current.task.dialect == DIALECT.SQL) &&(
<Tooltip title="执行当前的 FlinkSql"> <Tooltip title="执行当前的 FlinkSql">
<Button <Button
type="text" type="text"
...@@ -372,7 +374,8 @@ const StudioMenu = (props: any) => { ...@@ -372,7 +374,8 @@ const StudioMenu = (props: any) => {
//loading={loadings[2]} //loading={loadings[2]}
onClick={execute} onClick={execute}
/> />
</Tooltip> </Tooltip>)}
{current.task.dialect == DIALECT.FLINKSQL &&(<>
<Tooltip title="提交当前的作业到集群"> <Tooltip title="提交当前的作业到集群">
<Button <Button
type="text" type="text"
...@@ -393,7 +396,7 @@ const StudioMenu = (props: any) => { ...@@ -393,7 +396,7 @@ const StudioMenu = (props: any) => {
icon={<PauseCircleTwoTone twoToneColor="#ddd"/>} icon={<PauseCircleTwoTone twoToneColor="#ddd"/>}
/> />
</Tooltip> </Tooltip>
</Popconfirm> </Popconfirm></>)}
<Divider type="vertical"/> <Divider type="vertical"/>
<Button <Button
type="text" type="text"
......
@import '~antd/es/style/themes/default.less';
.form_setting{
padding-left: 10px;
}
.form_item{
margin-bottom: 5px;
}
import {connect} from "umi";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {Form, Switch, Row, Col,Tooltip, Button} from "antd";
import {InfoCircleOutlined,MinusSquareOutlined} from "@ant-design/icons";
import styles from "./index.less";
import {useEffect} from "react";
import {JarStateType} from "@/pages/Jar/model";
import {Scrollbars} from "react-custom-scrollbars";
const StudioEnvSetting = (props: any) => {
const { current, form, dispatch, tabs, toolHeight} = props;
useEffect(() => {
form.setFieldsValue(current.task);
}, [current.task]);
const onValuesChange = (change: any, all: any) => {
const newTabs = tabs;
for (let i = 0; i < newTabs.panes.length; i++) {
if (newTabs.panes[i].key === newTabs.activeKey) {
for (const key in change) {
newTabs.panes[i].task[key] = all[key];
}
break;
}
}
dispatch({
type: "Studio/saveTabs",
payload: newTabs,
});
};
return (
<>
<Row>
<Col span={24}>
<div style={{float: "right"}}>
<Tooltip title="最小化">
<Button
type="text"
icon={<MinusSquareOutlined/>}
/>
</Tooltip>
</div>
</Col>
</Row>
<Scrollbars style={{height: (toolHeight - 32)}}>
<Form
form={form}
layout="vertical"
className={styles.form_setting}
onValuesChange={onValuesChange}
>
<Row>
<Col span={12}>
<Form.Item
label="Fragment" className={styles.form_item} name="fragment" valuePropName="checked"
tooltip={{title: '【增强特性】 开启FlinkSql片段机制,使用“:=”进行定义(以“;”结束),“${}”进行调用', icon: <InfoCircleOutlined/>}}
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
</Row>
</Form>
</Scrollbars>
</>
);
};
export default connect(({Studio, Jar}: { Studio: StateType, Jar: JarStateType }) => ({
current: Studio.current,
tabs: Studio.tabs,
toolHeight: Studio.toolHeight,
jars: Jar.jars,
}))(StudioEnvSetting);
import {connect} from "umi"; import {connect} from "umi";
import {StateType} from "@/pages/FlinkSqlStudio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import {Form, InputNumber, Input, Switch, Select, Tag, Row, Col, Badge, Tooltip, Button, Typography, Space} from "antd"; import {Form, InputNumber, Input, Switch, Select, Tag, Row, Col, Badge, Tooltip, Button, Typography, Space} from "antd";
import {InfoCircleOutlined, PlusOutlined, MinusSquareOutlined, MinusCircleOutlined} from "@ant-design/icons"; import {InfoCircleOutlined, PlusOutlined, MinusSquareOutlined, MinusCircleOutlined,PaperClipOutlined} from "@ant-design/icons";
import styles from "./index.less"; import styles from "./index.less";
import {useEffect} from "react"; import {useEffect} from "react";
import {showTables} from "@/components/Studio/StudioEvent/DDL"; import {showTables} from "@/components/Studio/StudioEvent/DDL";
...@@ -14,13 +14,9 @@ const {Text} = Typography; ...@@ -14,13 +14,9 @@ const {Text} = Typography;
const StudioSetting = (props: any) => { const StudioSetting = (props: any) => {
const {sessionCluster, clusterConfiguration, current, form, dispatch, tabs, currentSession, jars, toolHeight} = props; const {sessionCluster, clusterConfiguration, current, form, dispatch, tabs, currentSession, jars,env, toolHeight} = props;
const getClusterOptions = () => { const getClusterOptions = () => {
/* const itemList = [(<Option key={0} value={0} label={(<><Tag color="default">Local</Tag>本地环境</>)}>
<Tag color="default">Local</Tag>
本地环境
</Option>)]; */
const itemList = []; const itemList = [];
for (const item of sessionCluster) { for (const item of sessionCluster) {
const tag = (<><Tag color={item.enabled ? "processing" : "error"}>{item.type}</Tag>{item.alias}</>); const tag = (<><Tag color={item.enabled ? "processing" : "error"}>{item.type}</Tag>{item.alias}</>);
...@@ -53,6 +49,18 @@ const StudioSetting = (props: any) => { ...@@ -53,6 +49,18 @@ const StudioSetting = (props: any) => {
return itemList; return itemList;
}; };
const getEnvOptions = () => {
const itemList = [];
for (const item of env) {
const tag = (<>{item.enabled ? <Badge status="success"/> : <Badge status="error"/>}
{item.fragment ? <PaperClipOutlined /> : undefined}{item.alias}</>);
itemList.push(<Option key={item.id} value={item.id} label={tag}>
{tag}
</Option>)
}
return itemList;
};
useEffect(() => { useEffect(() => {
form.setFieldsValue(current.task); form.setFieldsValue(current.task);
}, [current.task]); }, [current.task]);
...@@ -152,8 +160,6 @@ const StudioSetting = (props: any) => { ...@@ -152,8 +160,6 @@ const StudioSetting = (props: any) => {
</Col> </Col>
</Row>) : undefined} </Row>) : undefined}
{(current.task.type === RUN_MODE.YARN_APPLICATION || current.task.type === RUN_MODE.KUBERNETES_APPLICATION) ? ( {(current.task.type === RUN_MODE.YARN_APPLICATION || current.task.type === RUN_MODE.KUBERNETES_APPLICATION) ? (
<Row>
<Col span={24}>
<Form.Item label="可执行 Jar" <Form.Item label="可执行 Jar"
tooltip={`选择可执行 Jar 进行 ${current.task.type} 模式的远程提交 Jar 任务。当该参数项存在值时,将只提交可执行 Jar.`} tooltip={`选择可执行 Jar 进行 ${current.task.type} 模式的远程提交 Jar 任务。当该参数项存在值时,将只提交可执行 Jar.`}
name="jarId" name="jarId"
...@@ -166,15 +172,26 @@ const StudioSetting = (props: any) => { ...@@ -166,15 +172,26 @@ const StudioSetting = (props: any) => {
> >
{getJarOptions()} {getJarOptions()}
</Select> </Select>
</Form.Item> </Form.Item>) : undefined}
</Col>
</Row>) : undefined}
<Form.Item <Form.Item
label="作业名" className={styles.form_item} name="jobName" label="作业名" className={styles.form_item} name="jobName"
tooltip='设置任务名称,默认为作业名' tooltip='设置任务名称,默认为作业名'
> >
<Input placeholder="自定义作业名"/> <Input placeholder="自定义作业名"/>
</Form.Item> </Form.Item>
<Form.Item label="FlinkSQL 环境"
tooltip={`选择当前任务的 FlinkSQL 执行环境,会提前执行环境语句,默认无。`}
name="envId"
className={styles.form_item}>
<Select
style={{width: '100%'}}
placeholder="选择 FlinkSQL 环境,非必填"
allowClear
optionLabelProp="label"
>
{getEnvOptions()}
</Select>
</Form.Item>
<Row> <Row>
<Col span={12}> <Col span={12}>
<Form.Item label="CheckPoint" tooltip="设置Flink任务的检查点步长,0 代表不启用" name="checkPoint" <Form.Item label="CheckPoint" tooltip="设置Flink任务的检查点步长,0 代表不启用" name="checkPoint"
...@@ -285,4 +302,5 @@ export default connect(({Studio, Jar}: { Studio: StateType, Jar: JarStateType }) ...@@ -285,4 +302,5 @@ export default connect(({Studio, Jar}: { Studio: StateType, Jar: JarStateType })
currentSession: Studio.currentSession, currentSession: Studio.currentSession,
toolHeight: Studio.toolHeight, toolHeight: Studio.toolHeight,
jars: Jar.jars, jars: Jar.jars,
env: Studio.env,
}))(StudioSetting); }))(StudioSetting);
...@@ -6,6 +6,7 @@ import styles from "./index.less"; ...@@ -6,6 +6,7 @@ import styles from "./index.less";
import StudioConfig from "./StudioConfig"; import StudioConfig from "./StudioConfig";
import StudioSetting from "./StudioSetting"; import StudioSetting from "./StudioSetting";
import StudioSavePoint from "./StudioSavePoint"; import StudioSavePoint from "./StudioSavePoint";
import StudioEnvSetting from "./StudioEnvSetting";
import StudioSqlConfig from "./StudioSqlConfig"; import StudioSqlConfig from "./StudioSqlConfig";
import {DIALECT} from "@/components/Studio/conf"; import {DIALECT} from "@/components/Studio/conf";
...@@ -17,14 +18,30 @@ const StudioRightTool = (props:any) => { ...@@ -17,14 +18,30 @@ const StudioRightTool = (props:any) => {
const {current,form,toolHeight} = props; const {current,form,toolHeight} = props;
const renderContent = () => {
switch (current.task.dialect){
case DIALECT.SQL: return renderSqlContent();
case DIALECT.FLINKSQLENV: return renderEnvContent();
default: return renderFlinkSqlContent();
}
};
const renderSqlContent = () => { const renderSqlContent = () => {
return (<> return (<>
<TabPane tab={<span><ScheduleOutlined /> 执行配置</span>} key="StudioConfig" > <TabPane tab={<span><ScheduleOutlined /> 执行配置</span>} key="StudioSqlConfig" >
<StudioSqlConfig form={form}/> <StudioSqlConfig form={form}/>
</TabPane> </TabPane>
</>) </>)
}; };
const renderEnvContent = () => {
return (<>
<TabPane tab={<span><SettingOutlined /> 作业配置</span>} key="StudioEnvSetting" >
<StudioEnvSetting form={form}/>
</TabPane>
</>)
};
const renderFlinkSqlContent = () => { const renderFlinkSqlContent = () => {
return (<><TabPane tab={<span><SettingOutlined /> 作业配置</span>} key="StudioSetting" > return (<><TabPane tab={<span><SettingOutlined /> 作业配置</span>} key="StudioSetting" >
<StudioSetting form={form} /> <StudioSetting form={form} />
...@@ -32,17 +49,17 @@ const StudioRightTool = (props:any) => { ...@@ -32,17 +49,17 @@ const StudioRightTool = (props:any) => {
<TabPane tab={<span><ScheduleOutlined /> 执行配置</span>} key="StudioConfig" > <TabPane tab={<span><ScheduleOutlined /> 执行配置</span>} key="StudioConfig" >
<StudioConfig form={form}/> <StudioConfig form={form}/>
</TabPane> </TabPane>
<TabPane tab={<span><ScheduleOutlined /> 保存点</span>} key="3" > <TabPane tab={<span><ScheduleOutlined /> 保存点</span>} key="StudioSavePoint" >
<StudioSavePoint /> <StudioSavePoint />
</TabPane> </TabPane>
<TabPane tab={<span><AuditOutlined /> 审计</span>} key="4" > <TabPane tab={<span><AuditOutlined /> 审计</span>} key="Other" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} /> <Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane></>) </TabPane></>)
}; };
return ( return (
<Tabs defaultActiveKey="1" size="small" tabPosition="right" style={{ height: toolHeight}}> <Tabs defaultActiveKey="1" size="small" tabPosition="right" style={{ height: toolHeight}}>
{current.task.dialect === DIALECT.SQL ? renderSqlContent(): renderFlinkSqlContent()} {renderContent()}
</Tabs> </Tabs>
); );
}; };
......
...@@ -52,6 +52,7 @@ const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => { ...@@ -52,6 +52,7 @@ const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => {
> >
<Select defaultValue={DIALECT.FLINKSQL} value={DIALECT.FLINKSQL}> <Select defaultValue={DIALECT.FLINKSQL} value={DIALECT.FLINKSQL}>
<Option value={DIALECT.FLINKSQL}>FlinkSql</Option> <Option value={DIALECT.FLINKSQL}>FlinkSql</Option>
<Option value={DIALECT.FLINKSQLENV}>FlinkSql 环境</Option>
<Option value={DIALECT.SQL}>Sql</Option> <Option value={DIALECT.SQL}>Sql</Option>
<Option value={DIALECT.JAVA}>Java</Option> <Option value={DIALECT.JAVA}>Java</Option>
</Select> </Select>
......
...@@ -10,6 +10,7 @@ export const RUN_MODE = { ...@@ -10,6 +10,7 @@ export const RUN_MODE = {
export const DIALECT = { export const DIALECT = {
FLINKSQL:'FlinkSql', FLINKSQL:'FlinkSql',
FLINKSQLENV:'FlinkSqlEnv',
SQL:'Sql', SQL:'Sql',
JAVA:'Java', JAVA:'Java',
}; };
...@@ -11,7 +11,7 @@ import StudioLeftTool from "./StudioLeftTool"; ...@@ -11,7 +11,7 @@ import StudioLeftTool from "./StudioLeftTool";
import StudioRightTool from "./StudioRightTool"; import StudioRightTool from "./StudioRightTool";
import { import {
listSession, showCluster, showDataBase, getFillAllByVersion, listSession, showCluster, showDataBase, getFillAllByVersion,
showClusterConfiguration, showSessionCluster, showJars showClusterConfiguration, showSessionCluster, showJars, showEnv
} from "@/components/Studio/StudioEvent/DDL"; } from "@/components/Studio/StudioEvent/DDL";
import {loadSettings} from "@/pages/Settings/function"; import {loadSettings} from "@/pages/Settings/function";
import DraggleLayout from "@/components/DraggleLayout"; import DraggleLayout from "@/components/DraggleLayout";
...@@ -62,6 +62,7 @@ const Studio: React.FC<StudioProps> = (props) => { ...@@ -62,6 +62,7 @@ const Studio: React.FC<StudioProps> = (props) => {
showDataBase(dispatch); showDataBase(dispatch);
listSession(dispatch); listSession(dispatch);
showJars(dispatch); showJars(dispatch);
showEnv(dispatch);
const onClick = () => { const onClick = () => {
if (rightClickMenu) { if (rightClickMenu) {
......
...@@ -50,6 +50,13 @@ export type DataBaseType = { ...@@ -50,6 +50,13 @@ export type DataBaseType = {
updateTime: Date, updateTime: Date,
}; };
export type EnvType = {
id?: number,
name?: string,
alias?: string,
fragment?: boolean,
};
export type TaskType = { export type TaskType = {
id?: number, id?: number,
catalogueId?: number, catalogueId?: number,
...@@ -71,6 +78,7 @@ export type TaskType = { ...@@ -71,6 +78,7 @@ export type TaskType = {
databaseId?: number, databaseId?: number,
databaseName?: string, databaseName?: string,
jarId?: number, jarId?: number,
envId?: number,
note?: string, note?: string,
enabled?: boolean, enabled?: boolean,
createTime?: Date, createTime?: Date,
...@@ -132,6 +140,7 @@ export type StateType = { ...@@ -132,6 +140,7 @@ export type StateType = {
sessionCluster?: ClusterType[]; sessionCluster?: ClusterType[];
clusterConfiguration?: ClusterConfigurationType[]; clusterConfiguration?: ClusterConfigurationType[];
database?: DataBaseType[]; database?: DataBaseType[];
env?: EnvType[];
currentSession?: SessionType; currentSession?: SessionType;
current?: TabsItemType; current?: TabsItemType;
sql?: string; sql?: string;
...@@ -172,6 +181,7 @@ export type ModelType = { ...@@ -172,6 +181,7 @@ export type ModelType = {
saveSessionCluster: Reducer<StateType>; saveSessionCluster: Reducer<StateType>;
saveClusterConfiguration: Reducer<StateType>; saveClusterConfiguration: Reducer<StateType>;
saveDataBase: Reducer<StateType>; saveDataBase: Reducer<StateType>;
saveEnv: Reducer<StateType>;
}; };
}; };
...@@ -185,6 +195,7 @@ const Model: ModelType = { ...@@ -185,6 +195,7 @@ const Model: ModelType = {
sessionCluster: [], sessionCluster: [],
clusterConfiguration: [], clusterConfiguration: [],
database: [], database: [],
env: [],
currentSession: { currentSession: {
connectors: [], connectors: [],
}, },
...@@ -211,6 +222,7 @@ const Model: ModelType = { ...@@ -211,6 +222,7 @@ const Model: ModelType = {
databaseId:undefined, databaseId:undefined,
databaseName:undefined, databaseName:undefined,
jarId:undefined, jarId:undefined,
envId:undefined,
maxRowNum: 100, maxRowNum: 100,
config: [], config: [],
session: '', session: '',
...@@ -254,6 +266,7 @@ const Model: ModelType = { ...@@ -254,6 +266,7 @@ const Model: ModelType = {
databaseId:undefined, databaseId:undefined,
databaseName:undefined, databaseName:undefined,
jarId:undefined, jarId:undefined,
envId:undefined,
session: '', session: '',
config: [], config: [],
maxRowNum: 100, maxRowNum: 100,
...@@ -492,6 +505,11 @@ const Model: ModelType = { ...@@ -492,6 +505,11 @@ const Model: ModelType = {
...state, ...state,
database: payload, database: payload,
}; };
},saveEnv(state, {payload}) {
return {
...state,
env: payload,
};
}, },
}, },
}; };
......
...@@ -508,6 +508,9 @@ export default (): React.ReactNode => { ...@@ -508,6 +508,9 @@ export default (): React.ReactNode => {
<li> <li>
<Link>支持 Kubernetes Session 和 Application 模式提交任务</Link> <Link>支持 Kubernetes Session 和 Application 模式提交任务</Link>
</li> </li>
<li>
<Link>新增 FlinkSQL 执行环境方言及其应用功能</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment