Commit a5426c4e authored by wenmo's avatar wenmo

会话管理

parent fee2a695
package com.dlink.controller; package com.dlink.controller;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.dto.SessionDTO;
import com.dlink.dto.StudioCADTO; import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO; import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO; import com.dlink.dto.StudioExecuteDTO;
...@@ -66,6 +67,14 @@ public class StudioController { ...@@ -66,6 +67,14 @@ public class StudioController {
} }
} }
/**
* 创建session
*/
@PostMapping("/createSession")
public Result createSession(@RequestBody SessionDTO sessionDTO) {
return Result.succeed(studioService.createSession(sessionDTO,"admin"),"创建成功");
}
/** /**
* 清除指定session * 清除指定session
*/ */
...@@ -88,4 +97,12 @@ public class StudioController { ...@@ -88,4 +97,12 @@ public class StudioController {
return Result.failed("请选择要清除的记录"); return Result.failed("请选择要清除的记录");
} }
} }
/**
* 获取session列表
*/
@GetMapping("/listSession")
public Result listSession() {
return Result.succeed(studioService.listSession("admin"),"获取成功");
}
} }
package com.dlink.dto;
import com.dlink.session.SessionConfig;
import lombok.Getter;
import lombok.Setter;
/**
* SessionDTO
*
* @author wenmo
* @since 2021/7/6 22:10
*/
@Getter
@Setter
public class SessionDTO {
private String session;
private String type;
private boolean useRemote;
private Integer clusterId;
}
package com.dlink.service; package com.dlink.service;
import com.dlink.dto.SessionDTO;
import com.dlink.dto.StudioDDLDTO; import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO; import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.ca.TableCANode; import com.dlink.explainer.ca.TableCANode;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import com.dlink.result.SelectResult; import com.dlink.result.SelectResult;
import com.dlink.session.SessionInfo;
import java.util.List; import java.util.List;
...@@ -23,8 +25,12 @@ public interface StudioService { ...@@ -23,8 +25,12 @@ public interface StudioService {
SelectResult getJobData(String jobId); SelectResult getJobData(String jobId);
boolean createSession(SessionDTO sessionDTO,String createUser);
boolean clearSession(String session); boolean clearSession(String session);
List<SessionInfo> listSession(String createUser);
List<TableCANode> getOneTableCAByStatement(String statement); List<TableCANode> getOneTableCAByStatement(String statement);
List<TableCANode> getOneTableColumnCAByStatement(String statement); List<TableCANode> getOneTableColumnCAByStatement(String statement);
......
...@@ -43,7 +43,6 @@ public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster> ...@@ -43,7 +43,6 @@ public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster>
@Override @Override
public String buildEnvironmentAddress(boolean useRemote, Integer id) { public String buildEnvironmentAddress(boolean useRemote, Integer id) {
String address = FlinkConstant.LOCAL_HOST;
if(useRemote) { if(useRemote) {
return getJobManagerAddress(getById(id)); return getJobManagerAddress(getById(id));
}else{ }else{
...@@ -56,7 +55,7 @@ public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster> ...@@ -56,7 +55,7 @@ public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster>
e.printStackTrace(); e.printStackTrace();
} }
} }
return address; return FlinkConstant.LOCAL_HOST;
} }
@Override @Override
......
...@@ -3,6 +3,7 @@ package com.dlink.service.impl; ...@@ -3,6 +3,7 @@ package com.dlink.service.impl;
import com.dlink.assertion.Assert; import com.dlink.assertion.Assert;
import com.dlink.cluster.FlinkCluster; import com.dlink.cluster.FlinkCluster;
import com.dlink.constant.FlinkConstant; import com.dlink.constant.FlinkConstant;
import com.dlink.dto.SessionDTO;
import com.dlink.dto.StudioDDLDTO; import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO; import com.dlink.dto.StudioExecuteDTO;
import com.dlink.exception.BusException; import com.dlink.exception.BusException;
...@@ -20,6 +21,8 @@ import com.dlink.result.RunResult; ...@@ -20,6 +21,8 @@ import com.dlink.result.RunResult;
import com.dlink.result.SelectResult; import com.dlink.result.SelectResult;
import com.dlink.service.ClusterService; import com.dlink.service.ClusterService;
import com.dlink.service.StudioService; import com.dlink.service.StudioService;
import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -61,6 +64,16 @@ public class StudioServiceImpl implements StudioService { ...@@ -61,6 +64,16 @@ public class StudioServiceImpl implements StudioService {
return JobManager.getJobData(jobId); return JobManager.getJobData(jobId);
} }
@Override
public boolean createSession(SessionDTO sessionDTO,String createUser) {
Cluster cluster = clusterService.getById(sessionDTO.getClusterId());
SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), sessionDTO.isUseRemote(),
cluster.getId(),cluster.getAlias(),
clusterService.buildEnvironmentAddress(sessionDTO.isUseRemote(),sessionDTO.getClusterId()));
return JobManager.createSession(sessionDTO.getSession(),sessionConfig,createUser);
}
@Override @Override
public boolean clearSession(String session) { public boolean clearSession(String session) {
if(SessionPool.remove(session)>0){ if(SessionPool.remove(session)>0){
...@@ -70,6 +83,11 @@ public class StudioServiceImpl implements StudioService { ...@@ -70,6 +83,11 @@ public class StudioServiceImpl implements StudioService {
} }
} }
@Override
public List<SessionInfo> listSession(String createUser) {
return JobManager.listSession(createUser);
}
@Override @Override
public List<TableCANode> getOneTableCAByStatement(String statement) { public List<TableCANode> getOneTableCAByStatement(String statement) {
return CABuilder.getOneTableCAByStatement(statement); return CABuilder.getOneTableCAByStatement(statement);
......
...@@ -20,7 +20,7 @@ public class ExecutorSetting { ...@@ -20,7 +20,7 @@ public class ExecutorSetting {
private String savePointPath; private String savePointPath;
private String jobName; private String jobName;
private Map<String,String> config; private Map<String,String> config;
public static final ExecutorSetting DEFAULT = new ExecutorSetting(false); public static final ExecutorSetting DEFAULT = new ExecutorSetting(true);
public ExecutorSetting(boolean useSqlFragment) { public ExecutorSetting(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment; this.useSqlFragment = useSqlFragment;
......
...@@ -9,6 +9,8 @@ import com.dlink.interceptor.FlinkInterceptor; ...@@ -9,6 +9,8 @@ import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.result.*; import com.dlink.result.*;
import com.dlink.session.ExecutorEntity; import com.dlink.session.ExecutorEntity;
import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
...@@ -264,7 +266,21 @@ public class JobManager extends RunTime { ...@@ -264,7 +266,21 @@ public class JobManager extends RunTime {
return ResultPool.get(jobId); return ResultPool.get(jobId);
} }
public static void createSession(String session){ public static boolean createSession(String session, SessionConfig sessionConfig,String createUser){
if(SessionPool.exist(session)){
return false;
}
Executor sessionExecutor = null;
if (sessionConfig.isUseRemote()) {
sessionExecutor = Executor.buildRemoteExecutor(EnvironmentSetting.build(sessionConfig.getAddress()), ExecutorSetting.DEFAULT);
} else {
sessionExecutor = Executor.buildLocalExecutor(sessionConfig.getExecutorSetting());
}
SessionPool.push(new ExecutorEntity(session,sessionConfig,createUser,LocalDateTime.now(), sessionExecutor));
return true;
}
public static List<SessionInfo> listSession(String createUser){
return SessionPool.filter(createUser);
} }
} }
...@@ -16,31 +16,21 @@ import java.time.LocalDateTime; ...@@ -16,31 +16,21 @@ import java.time.LocalDateTime;
@Getter @Getter
public class ExecutorEntity { public class ExecutorEntity {
private String sessionId; private String sessionId;
private SessionType type; private SessionConfig sessionConfig;
private boolean useRemote;
private String address;
private String createUser; private String createUser;
private LocalDateTime createTime; private LocalDateTime createTime;
private Executor executor; private Executor executor;
public enum SessionType{
PUBLIC,
PRIVATE
}
public ExecutorEntity(String sessionId, Executor executor) { public ExecutorEntity(String sessionId, Executor executor) {
this.sessionId = sessionId; this.sessionId = sessionId;
this.executor = executor; this.executor = executor;
} }
public ExecutorEntity(String sessionId, SessionType type, boolean useRemote, String address, String createUser, LocalDateTime createTime, Executor executor) { public ExecutorEntity(String sessionId, SessionConfig sessionConfig, String createUser, LocalDateTime createTime, Executor executor) {
this.sessionId = sessionId; this.sessionId = sessionId;
this.type = type; this.sessionConfig = sessionConfig;
this.useRemote = useRemote;
this.address = address;
this.createUser = createUser; this.createUser = createUser;
this.createTime = createTime; this.createTime = createTime;
this.executor = executor; this.executor = executor;
} }
} }
package com.dlink.session;
import com.dlink.executor.ExecutorSetting;
import lombok.Getter;
import lombok.Setter;
/**
* SessionConfig
*
* @author wenmo
* @since 2021/7/6 21:59
*/
@Getter
@Setter
public class SessionConfig {
private SessionType type;
private boolean useRemote;
private Integer clusterId;
private String clusterName;
private String address;
public enum SessionType{
PUBLIC,
PRIVATE
}
public SessionConfig(SessionType type, boolean useRemote, Integer clusterId, String clusterName, String address) {
this.type = type;
this.useRemote = useRemote;
this.clusterId = clusterId;
this.clusterName = clusterName;
this.address = address;
}
public static SessionConfig build(String type, boolean useRemote, Integer clusterId, String clusterName, String address){
return new SessionConfig(SessionType.valueOf(type),useRemote,clusterId,clusterName,address);
}
public ExecutorSetting getExecutorSetting(){
return new ExecutorSetting(true);
}
}
package com.dlink.session;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
/**
* SessionInfo
*
* @author wenmo
* @since 2021/7/6 22:22
*/
@Setter
@Getter
public class SessionInfo {
private String session;
private SessionConfig sessionConfig;
private String createUser;
private LocalDateTime createTime;
public SessionInfo(String session, SessionConfig sessionConfig, String createUser, LocalDateTime createTime) {
this.session = session;
this.sessionConfig = sessionConfig;
this.createUser = createUser;
this.createTime = createTime;
}
public static SessionInfo build(ExecutorEntity executorEntity){
return new SessionInfo(executorEntity.getSessionId(),executorEntity.getSessionConfig(),executorEntity.getCreateUser(),executorEntity.getCreateTime());
}
}
...@@ -2,6 +2,7 @@ package com.dlink.session; ...@@ -2,6 +2,7 @@ package com.dlink.session;
import com.dlink.constant.FlinkConstant; import com.dlink.constant.FlinkConstant;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Vector; import java.util.Vector;
...@@ -14,7 +15,16 @@ import java.util.Vector; ...@@ -14,7 +15,16 @@ import java.util.Vector;
public class SessionPool { public class SessionPool {
private static volatile List<ExecutorEntity> executorList = new Vector<>(FlinkConstant.DEFAULT_SESSION_COUNT); private static volatile List<ExecutorEntity> executorList = new Vector<>(FlinkConstant.DEFAULT_SESSION_COUNT);
public static boolean exist(String sessionId) {
for (ExecutorEntity executorEntity : executorList) {
if (executorEntity.getSessionId().equals(sessionId)) {
return true;
}
}
return false;
}
public static Integer push(ExecutorEntity executorEntity){ public static Integer push(ExecutorEntity executorEntity){
if (executorList.size() >= FlinkConstant.DEFAULT_SESSION_COUNT * FlinkConstant.DEFAULT_FACTOR) { if (executorList.size() >= FlinkConstant.DEFAULT_SESSION_COUNT * FlinkConstant.DEFAULT_FACTOR) {
executorList.remove(0); executorList.remove(0);
...@@ -48,4 +58,18 @@ public class SessionPool { ...@@ -48,4 +58,18 @@ public class SessionPool {
public static List<ExecutorEntity> list(){ public static List<ExecutorEntity> list(){
return executorList; return executorList;
} }
public static List<SessionInfo> filter(String createUser){
List<SessionInfo> sessionInfos = new ArrayList<>();
for (ExecutorEntity item : executorList) {
if(item.getSessionConfig().getType()== SessionConfig.SessionType.PUBLIC){
sessionInfos.add(SessionInfo.build(item));
}else{
if(createUser!=null&&createUser.equals(item.getCreateUser())){
sessionInfos.add(SessionInfo.build(item));
}
}
}
return sessionInfos;
}
} }
import React, {useEffect, useState} from 'react';
import {Form, Button, Input, Modal} from 'antd';
import type {SessionItem} from '../data.d';
export type UpdateFormProps = {
onCancel: (flag?: boolean, formVals?: Partial<SessionItem>) => void;
onSubmit: (values: Partial<SessionItem>) => void;
updateModalVisible: boolean;
isCreate: boolean;
values: Partial<SessionItem>;
};
const FormItem = Form.Item;
const formLayout = {
labelCol: {span: 7},
wrapperCol: {span: 13},
};
const SessionForm: React.FC<UpdateFormProps> = (props) => {
const [formVals, setFormVals] = useState<Partial<SessionItem>>({
session: props.values.session,
type: props.values.sessionConfig?.type,
useRemote: props.values.sessionConfig?.useRemote,
address: props.values.sessionConfig?.address,
createUser: props.values.createUser,
createTime: props.values.createTime,
});
const [form] = Form.useForm();
const {
onSubmit: handleUpdate,
onCancel: handleUpdateModalVisible,
updateModalVisible,
values,
isCreate,
} = props;
const submitForm = async () => {
const fieldsValue = await form.validateFields();
setFormVals({...formVals, ...fieldsValue});
handleUpdate({...formVals, ...fieldsValue});
};
const renderContent = () => {
return (
<>
<FormItem
name="session"
label="名称"
rules={[{required: true, message: '请输入唯一名称!'}]}>
<Input placeholder="请输入"/>
</FormItem>
<FormItem
name="alias"
label="别名"
rules={[{required: true, message: '请输入别名!'}]}>
<Input placeholder="请输入"/>
</FormItem>
</>
);
};
const renderFooter = () => {
return (
<>
<Button onClick={() => handleUpdateModalVisible(false, values)}>取消</Button>
<Button type="primary" onClick={() => submitForm()}>
完成
</Button>
</>
);
};
return (
<Modal
width={640}
bodyStyle={{padding: '32px 40px 48px'}}
destroyOnClose
title={isCreate ? '创建新作业' : ('重命名作业-' + formVals.name)}
visible={updateModalVisible}
footer={renderFooter()}
onCancel={() => handleUpdateModalVisible()}
>
<Form
{...formLayout}
form={form}
initialValues={{
id: formVals.id,
name: formVals.name,
alias: formVals.alias,
parentId: formVals.parentId,
}}
>
{renderContent()}
</Form>
</Modal>
);
};
export default SessionForm;
export type SessionItem = {
session: string,
sessionConfig: {
type:string,
useRemote:boolean,
clusterId:number,
},
createUser: string,
createTime: string,
};
...@@ -4,7 +4,7 @@ import {StateType} from "@/pages/FlinkSqlStudio/model"; ...@@ -4,7 +4,7 @@ import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi"; import {connect} from "umi";
import {useState} from "react"; import {useState} from "react";
import styles from "./index.less"; import styles from "./index.less";
import { SearchOutlined,DownOutlined,DeleteOutlined,CommentOutlined ,MessageOutlined} from '@ant-design/icons'; import { SearchOutlined,DownOutlined,DeleteOutlined,CommentOutlined ,MessageOutlined,PlusOutlined} from '@ant-design/icons';
import React from "react"; import React from "react";
import {removeTable, showTables,clearSession} from "@/components/Studio/StudioEvent/DDL"; import {removeTable, showTables,clearSession} from "@/components/Studio/StudioEvent/DDL";
import { import {
...@@ -203,6 +203,13 @@ const StudioConnector = (props:any) => { ...@@ -203,6 +203,13 @@ const StudioConnector = (props:any) => {
onClick={showSessions} onClick={showSessions}
/> />
</Tooltip> </Tooltip>
<Tooltip title="新建会话">
<Button
type="text"
icon={<PlusOutlined />}
onClick={showSessions}
/>
</Tooltip>
<Tooltip title="刷新连接器"> <Tooltip title="刷新连接器">
<Button <Button
type="text" type="text"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment