Commit 18f1c9be authored by wenmo's avatar wenmo

会话创建与管理

parent 9542a3f6
......@@ -36,7 +36,9 @@ public class StudioServiceImpl implements StudioService {
@Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(),studioExecuteDTO.getClusterId()));
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(studioExecuteDTO.getStatement());
}
......@@ -44,7 +46,9 @@ public class StudioServiceImpl implements StudioService {
@Override
public IResult executeDDL(StudioDDLDTO studioDDLDTO) {
JobConfig config = studioDDLDTO.getJobConfig();
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(),studioDDLDTO.getClusterId()));
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeDDL(studioDDLDTO.getStatement());
}
......
......@@ -2,6 +2,7 @@ package com.dlink.job;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.session.SessionConfig;
import lombok.Getter;
import lombok.Setter;
......@@ -77,4 +78,12 @@ public class JobConfig {
Integer parallelism, String savePointPath){
return new JobConfig(false,false,false,clusterId,taskId,jobName,useSqlFragment,checkpoint,parallelism,savePointPath);
}
public void setSessionConfig(SessionConfig sessionConfig){
if(sessionConfig!=null) {
address = sessionConfig.getAddress();
clusterId = sessionConfig.getClusterId();
useRemote = sessionConfig.isUseRemote();
}
}
}
......@@ -72,6 +72,7 @@ public class JobManager extends RunTime {
}
private Executor createExecutor() {
initEnvironmentSetting();
if (config.isUseRemote()) {
executor = Executor.buildRemoteExecutor(environmentSetting, config.getExecutorSetting());
return executor;
......@@ -86,6 +87,8 @@ public class JobManager extends RunTime {
ExecutorEntity executorEntity = SessionPool.get(config.getSession());
if (executorEntity != null) {
executor = executorEntity.getExecutor();
config.setSessionConfig(executorEntity.getSessionConfig());
initEnvironmentSetting();
} else {
createExecutor();
SessionPool.push(new ExecutorEntity(config.getSession(), executor));
......@@ -96,11 +99,13 @@ public class JobManager extends RunTime {
return executor;
}
private void initEnvironmentSetting(){
environmentSetting = EnvironmentSetting.build(config.getAddress());
}
@Override
public boolean init() {
handler = JobHandler.build();
String address = config.getAddress();
environmentSetting = EnvironmentSetting.build(address);
createExecutorWithSession();
return false;
}
......
import {executeDDL} from "@/pages/FlinkSqlStudio/service";
import FlinkSQL from "./FlinkSQL";
import {TaskType} from "@/pages/FlinkSqlStudio/model";
import {SessionType, TaskType} from "@/pages/FlinkSqlStudio/model";
import {Modal} from "antd";
import {getData, handleRemove} from "@/components/Common/crud";
export function showTables(task: TaskType, dispatch: any) {
export function changeSession(session: SessionType, dispatch: any) {
dispatch && dispatch({
type: "Studio/refreshCurrentSession",
payload: session,
});
setTimeout(function () {
showTables(session.session,dispatch);
},200);
}
export function quitSession( dispatch: any) {
dispatch && dispatch({
type: "Studio/quitCurrentSession",
});
}
export function showTables(session: string, dispatch: any) {
if(session==null||session==''){
return;
}
const res = executeDDL({
statement: FlinkSQL.SHOW_TABLES,
clusterId: task.clusterId,
session: task.session,
useSession: task.useSession,
session: session,
useSession: true,
useResult: true,
});
res.then((result) => {
......@@ -18,18 +36,15 @@ export function showTables(task: TaskType, dispatch: any) {
tableData = result.datas.rowData;
}
dispatch && dispatch({
type: "Studio/refreshCurrentSessionCluster",
type: "Studio/refreshCurrentSession",
payload: {
session: task.session,
clusterId: task.clusterId,
clusterName: task.clusterName,
connectors: tableData,
connectors:tableData
},
});
});
}
export function removeTable(tablename: string, task: TaskType, dispatch: any) {
export function removeTable(tablename: string, session: string, dispatch: any) {
Modal.confirm({
title: '确定删除表【' + tablename + '】吗?',
okText: '确认',
......@@ -37,19 +52,18 @@ export function removeTable(tablename: string, task: TaskType, dispatch: any) {
onOk: async () => {
const res = executeDDL({
statement: "drop table " + tablename,
clusterId: task.clusterId,
session: task.session,
useSession: task.useSession,
session: session,
useSession: true,
useResult: true,
});
res.then((result) => {
showTables(task, dispatch);
showTables(session, dispatch);
});
}
});
}
export function clearSession(session: string, task: TaskType, dispatch: any) {
export function clearSession(session: string, dispatch: any) {
Modal.confirm({
title: '确认清空会话【' + session + '】?',
okText: '确认',
......@@ -60,7 +74,7 @@ export function clearSession(session: string, task: TaskType, dispatch: any) {
};
const res = handleRemove('/api/studio/clearSession', [para]);
res.then((result) => {
showTables(task, dispatch);
showTables(session, dispatch);
});
}
});
......
......@@ -91,11 +91,10 @@ const SessionForm: React.FC<UpdateFormProps> = (props) => {
<Item
name="clusterId"
label="集群"
rules={[{required: true, message: '请输入别名!'}]}>
>
<Select
style={{ width: '100%' }}
placeholder="选择Flink集群"
defaultValue={0}
optionLabelProp="label"
>
{getClusterOptions()}
......
import {message, Input, Button, Space, Table, Dropdown, Menu, Empty,Divider,
Tooltip,Breadcrumb} from "antd";
Tooltip} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import {useState} from "react";
import styles from "./index.less";
import { SearchOutlined,DownOutlined,DeleteOutlined,CommentOutlined ,MessageOutlined,PlusOutlined} from '@ant-design/icons';
import { SearchOutlined,DownOutlined,DeleteOutlined,CommentOutlined ,PoweroffOutlined,PlusOutlined} from '@ant-design/icons';
import React from "react";
import {removeTable, showTables,clearSession} from "@/components/Studio/StudioEvent/DDL";
import {removeTable, showTables, clearSession, changeSession, quitSession} from "@/components/Studio/StudioEvent/DDL";
import {
ModalForm,
} from '@ant-design/pro-form';
import ProDescriptions from '@ant-design/pro-descriptions';
import ProTable from '@ant-design/pro-table';
import {getData, handleAddOrUpdate} from "@/components/Common/crud";
import SessionForm from "@/components/Studio/StudioLeftTool/StudioConnector/components/SessionForm";
const StudioConnector = (props:any) => {
const {current,dispatch,currentSessionCluster} = props;
const {current,dispatch,currentSession} = props;
const [tableData,setTableData] = useState<[]>([]);
const [loadings,setLoadings] = useState<boolean[]>([]);
const [searchText,setSearchText] = useState<string>('');
......@@ -112,7 +111,7 @@ const StudioConnector = (props:any) => {
const keyEvent=(key, item)=>{
if(key=='delete'){
removeTable(item.tablename,current.task,dispatch);
removeTable(item.tablename,currentSession.session,dispatch);
}else{
message.warn("敬请期待");
}
......@@ -122,14 +121,16 @@ const StudioConnector = (props:any) => {
if(key=='delete'){
clearSession(item.session,current.task,dispatch);
}else if(key=='connect'){
changeSession(item,dispatch);
message.success('连接共享会话【'+item.session+'】成功!');
setModalVisit(false);
}else{
message.warn("敬请期待");
}
};
const getTables = () => {
showTables(current.task,dispatch);
showTables(currentSession.session,dispatch);
};
const onClearSession = () => {
......@@ -189,14 +190,6 @@ const StudioConnector = (props:any) => {
render:function(text, record, index) {
return record.sessionConfig.clusterName;
}
},{
title: "JobManager地址",
key: "address",
sorter: true,
...getColumnSearchProps("address"),
render:function(text, record, index) {
return record.sessionConfig.address;
}
},{
title: "创建人",
dataIndex: "createUser",
......@@ -215,7 +208,7 @@ const StudioConnector = (props:any) => {
render: (_, record) => [
<a
onClick={() => {
message.warn('敬请期待');
keySessionsEvent('connect',record);
}}
>
连接
......@@ -244,6 +237,11 @@ const StudioConnector = (props:any) => {
});
};
const quitSessions=()=>{
quitSession(dispatch);
message.success('退出共享会话成功!');
};
return (
<>
<div style={{float: "right"}}>
......@@ -254,6 +252,13 @@ const StudioConnector = (props:any) => {
onClick={showSessions}
/>
</Tooltip>
<Tooltip title="退出会话">
<Button
type="text"
icon={<PoweroffOutlined />}
onClick={quitSessions}
/>
</Tooltip>
<Tooltip title="新建会话">
<Button
type="text"
......@@ -268,7 +273,7 @@ const StudioConnector = (props:any) => {
onClick={getTables}
/>
</Tooltip>
<Tooltip title="清空连接器">
<Tooltip title="注销会话">
<Button
type="text"
icon={<DeleteOutlined />}
......@@ -276,12 +281,7 @@ const StudioConnector = (props:any) => {
/>
</Tooltip>
</div>
<Breadcrumb className={styles["session-path"]}>
<MessageOutlined />
<Divider type="vertical" />
<Breadcrumb.Item>{currentSessionCluster.session}</Breadcrumb.Item>
</Breadcrumb>
{currentSessionCluster.connectors&&currentSessionCluster.connectors.length>0?(<Table dataSource={currentSessionCluster.connectors} columns={getColumns()} size="small" />):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)}
{currentSession.connectors&&currentSession.connectors.length>0?(<Table dataSource={currentSession.connectors} columns={getColumns()} size="small" />):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)}
<ModalForm
// title="新建表单"
visible={modalVisit}
......@@ -323,7 +323,11 @@ const StudioConnector = (props:any) => {
handleCreateSessionModalVisible(false);
}}
updateModalVisible={createSessionModalVisible}
values={{}}
values={{
session:'',
type:'PUBLIC',
useRemote:false,
}}
/>
</>
);
......@@ -331,5 +335,5 @@ const StudioConnector = (props:any) => {
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
currentSessionCluster: Studio.currentSessionCluster,
currentSession: Studio.currentSession,
}))(StudioConnector);
......@@ -2,7 +2,7 @@ import styles from "./index.less";
import {Menu, Dropdown, Tooltip, Row, Col, Popconfirm, notification, Modal,message} from "antd";
import {PauseCircleTwoTone, CopyTwoTone, DeleteTwoTone,PlayCircleTwoTone,DiffTwoTone,
FileAddTwoTone,FolderOpenTwoTone,SafetyCertificateTwoTone,SaveTwoTone,FlagTwoTone,
EnvironmentOutlined,SmileOutlined,RocketTwoTone,QuestionCircleTwoTone} from "@ant-design/icons";
EnvironmentOutlined,SmileOutlined,RocketTwoTone,QuestionCircleTwoTone,MessageOutlined} from "@ant-design/icons";
import Space from "antd/es/space";
import Divider from "antd/es/divider";
import Button from "antd/es/button/button";
......@@ -23,7 +23,7 @@ const menu = (
const StudioMenu = (props: any) => {
const {tabs,current,currentPath,form,refs,dispatch} = props;
const {tabs,current,currentPath,form,refs,dispatch,currentSession} = props;
const execute = () => {
let selectsql =null;
......@@ -34,10 +34,10 @@ const StudioMenu = (props: any) => {
if(selectsql==null||selectsql==''){
selectsql=current.value;
}
let useSession = current.task.useSession;
let useSession = !!currentSession.session;
let param ={
useSession:useSession,
session:current.task.session,
session:currentSession.session,
useRemote:current.task.useRemote,
clusterId:current.task.clusterId,
useResult:current.task.useResult,
......@@ -85,7 +85,7 @@ const StudioMenu = (props: any) => {
type: "Studio/saveTabs",
payload: newTabs,
});
useSession&&showTables(current.task,dispatch);
useSession&&showTables(currentSession.session,dispatch);
})
};
......@@ -203,6 +203,14 @@ const StudioMenu = (props: any) => {
<EnvironmentOutlined />
<Divider type="vertical" />
{getPathItem(currentPath)}
{currentSession.session&&
(<>
<Divider type="vertical" />
<MessageOutlined />
<Divider type="vertical" />
<Breadcrumb.Item>{currentSession.session}</Breadcrumb.Item>
</>)
}
</Breadcrumb>
</Col>
<Col span={8} offset={8}>
......@@ -293,5 +301,5 @@ export default connect(({Studio}: { Studio: StateType }) => ({
currentPath: Studio.currentPath,
tabs: Studio.tabs,
refs: Studio.refs,
// monaco: Studio.monaco,
currentSession: Studio.currentSession,
}))(StudioMenu);
......@@ -10,20 +10,10 @@ const { Option } = Select;
const StudioConfig = (props: any) => {
const {current,form,dispatch,tabs,session} = props;
const [newSesstion, setNewSesstion] = useState<string>('');
const {current,form,dispatch,tabs,currentSession} = props;
form.setFieldsValue(current.task);
const addSession = ()=>{
if(newSesstion!='') {
dispatch && dispatch({
type: "Studio/saveSession",
payload: newSesstion,
});
setNewSesstion('');
}
};
const onValuesChange = (change:any,all:any)=>{
let newTabs = tabs;
......@@ -42,9 +32,6 @@ const StudioConfig = (props: any) => {
});
};
const onChangeClusterSession = ()=>{
showTables(current.task,dispatch);
};
return (
<>
<Row>
......@@ -65,51 +52,6 @@ const StudioConfig = (props: any) => {
className={styles.form_setting}
onValuesChange={onValuesChange}
>
<Row>
<Col span={10}>
<Form.Item
label="共享会话" className={styles.form_item} name="useSession" valuePropName="checked"
tooltip={{ title: '开启共享会话,将进行 Flink Catalog 的共享', icon: <InfoCircleOutlined /> }}
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
<Col span={14}>
<Form.Item
label="会话 Key" tooltip="设置共享会话的 Key" name="session"
className={styles.form_item}>
<Select
placeholder="选择会话"
allowClear
onChange={onChangeClusterSession}
dropdownRender={menu => (
<div>
{menu}
<Divider style={{ margin: '4px 0' }} />
<div style={{ display: 'flex', flexWrap: 'nowrap', padding: 8 }}>
<Input style={{ flex: 'auto' }} value={newSesstion}
onChange={(e)=>{
setNewSesstion(e.target.value);
}}
/>
<a
style={{ flex: 'none', padding: '8px', display: 'block', cursor: 'pointer' }}
onClick={addSession}
>
<PlusOutlined />
</a>
</div>
</div>
)}
>
{session.map(item => (
<Option key={item}>{item}</Option>
))}
</Select>
</Form.Item>
</Col>
</Row>
<Row>
<Col span={12}>
<Form.Item
......@@ -144,6 +86,6 @@ const StudioConfig = (props: any) => {
export default connect(({Studio}: { Studio: StateType }) => ({
cluster: Studio.cluster,
current: Studio.current,
currentSession: Studio.currentSession,
tabs: Studio.tabs,
session: Studio.session,
}))(StudioConfig);
......@@ -10,7 +10,7 @@ const { Option } = Select;
const StudioSetting = (props: any) => {
const {cluster,current,form,dispatch,tabs} = props;
const {cluster,current,form,dispatch,tabs,currentSession} = props;
const getClusterOptions = ()=>{
let itemList = [(<Option value={0} label={(<><Tag color="default">Local</Tag>本地环境</>)}>
......@@ -46,7 +46,7 @@ const StudioSetting = (props: any) => {
};
const onChangeClusterSession = ()=>{
showTables(current.task,dispatch);
showTables(currentSession.session,dispatch);
};
return (
<>
......@@ -133,4 +133,5 @@ export default connect(({Studio}: { Studio: StateType }) => ({
current: Studio.current,
tabs: Studio.tabs,
session: Studio.session,
currentSession: Studio.currentSession,
}))(StudioSetting);
......@@ -72,6 +72,7 @@ export type RightClickMenu = {
export type ConnectorType = {
tablename: string;
}
export type SessionType = {
session?: string;
type?: string;
......@@ -85,7 +86,7 @@ export type SessionType = {
}
export type StateType = {
cluster?: ClusterType[];
currentSessionCluster?: SessionType;
currentSession?: SessionType;
current?: TabsItemType;
sql?: string;
monaco?: any;
......@@ -114,7 +115,8 @@ export type ModelType = {
saveTaskData: Reducer<StateType>;
saveSession: Reducer<StateType>;
showRightClickMenu: Reducer<StateType>;
refreshCurrentSessionCluster: Reducer<StateType>;
refreshCurrentSession: Reducer<StateType>;
quitCurrentSession: Reducer<StateType>;
saveResult: Reducer<StateType>;
saveCluster: Reducer<StateType>;
};
......@@ -124,7 +126,7 @@ const Model: ModelType = {
namespace: 'Studio',
state: {
cluster: [],
currentSessionCluster: {
currentSession: {
connectors: [],
},
current: {
......@@ -336,12 +338,21 @@ const Model: ModelType = {
rightClickMenu: payload,
};
},
refreshCurrentSessionCluster(state, {payload}) {
refreshCurrentSession(state, {payload}) {
return {
...state,
currentSessionCluster: {
currentSession: {
...state?.currentSession,
...payload
},
}
};
},
quitCurrentSession(state) {
return {
...state,
currentSession: {
connectors: [],
}
};
},
saveResult(state, {payload}) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment