Commit 0ece94bc authored by wenmo's avatar wenmo

执行重构

parent e07cc3b7
......@@ -13,25 +13,25 @@ import com.dlink.model.Task;
*/
public interface Assert {
static void check(Cluster cluster){
if(cluster.getId()==null){
throw new BusException("Flink集群不存在");
static void check(Cluster cluster) {
if (cluster.getId() == null) {
throw new BusException("Flink 集群【" + cluster.getId() + "】不存在");
}
}
static void check(Task task){
static void check(Task task) {
if (task == null) {
throw new BusException("作业不存在");
}
}
static void check(Statement statement){
static void check(Statement statement) {
if (statement == null) {
throw new BusException("FlinkSql语句不存在");
}
}
static void checkHost(String host){
static void checkHost(String host) {
if (host == null || "".equals(host)) {
throw new BusException("集群地址暂不可用");
}
......
......@@ -4,7 +4,9 @@ import com.dlink.common.result.Result;
import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.job.JobResult;
import com.dlink.model.Task;
import com.dlink.result.IResult;
import com.dlink.result.RunResult;
import com.dlink.service.StudioService;
import com.fasterxml.jackson.databind.JsonNode;
......@@ -34,9 +36,8 @@ public class StudioController {
*/
@PostMapping("/executeSql")
public Result executeSql(@RequestBody StudioExecuteDTO studioExecuteDTO) {
// RunResult runResult = studioService.executeSql(studioExecuteDTO);
Integer runResult = studioService.executeSqlTest(studioExecuteDTO);
return Result.succeed(runResult,"执行成功");
JobResult jobResult = studioService.executeSql(studioExecuteDTO);
return Result.succeed(jobResult,"执行成功");
}
/**
......@@ -44,8 +45,8 @@ public class StudioController {
*/
@PostMapping("/executeDDL")
public Result executeDDL(@RequestBody StudioDDLDTO studioDDLDTO) {
RunResult runResult = studioService.executeDDL(studioDDLDTO);
return Result.succeed(runResult,"执行成功");
IResult result = studioService.executeDDL(studioDDLDTO);
return Result.succeed(result,"执行成功");
}
/**
......
package com.dlink.dto;
import com.dlink.job.JobConfig;
import lombok.Getter;
import lombok.Setter;
......@@ -12,7 +13,22 @@ import lombok.Setter;
@Getter
@Setter
public class StudioDDLDTO {
private boolean isResult;
private boolean isSession;
private String session;
private boolean isRemote;
private Integer clusterId;
private String statement;
private Integer clusterId=0;
public JobConfig getJobConfig() {
return new JobConfig(isResult, isSession, getSession(), isRemote, clusterId);
}
public String getSession() {
if(isRemote) {
return clusterId + "_" + session;
}else{
return "0_" + session;
}
}
}
......@@ -13,21 +13,29 @@ import lombok.Setter;
@Getter
@Setter
public class StudioExecuteDTO {
private boolean isResult=true;
private boolean isSession=false;
private boolean isResult;
private boolean isSession;
private String session;
private boolean isRemote=false;
private boolean isRemote;
private Integer clusterId;
private boolean fragment=false;
private boolean fragment;
private String statement;
private String jobName;
private Integer taskId;
private Integer maxRowNum=100;
private Integer checkPoint=0;
private Integer parallelism=1;
private Integer maxRowNum;
private Integer checkPoint;
private Integer parallelism;
private String savePointPath;
public JobConfig getJobConfig(){
return new JobConfig(isResult,isSession,session,isRemote,clusterId,taskId,jobName,fragment,maxRowNum,checkPoint,parallelism,savePointPath);
public JobConfig getJobConfig() {
return new JobConfig(isResult, isSession, getSession(), isRemote, clusterId, taskId, jobName, fragment, maxRowNum, checkPoint, parallelism, savePointPath);
}
public String getSession() {
if(isRemote) {
return clusterId + "_" + session;
}else{
return "0_" + session;
}
}
}
......@@ -28,7 +28,7 @@ public class Job2MysqlHandler implements JobHandler {
history.setClusterId(job.getJobConfig().getClusterId());
history.setJobManagerAddress(job.getJobManagerAddress());
history.setJobName(job.getJobConfig().getJobName());
history.setSession(job.getJobConfig().getSession());
history.setSession(job.getJobConfig().getSessionKey());
history.setStatus(job.getStatus().ordinal());
history.setStartTime(job.getStartTime());
history.setTaskId(job.getJobConfig().getTaskId());
......
......@@ -15,5 +15,7 @@ public interface ClusterService extends ISuperService<Cluster> {
String checkHeartBeat(String hosts,String host);
String getJobManagerAddress(Cluster cluster);
List<Cluster> listEnabledAll();
}
......@@ -3,6 +3,8 @@ package com.dlink.service;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.ca.TableCANode;
import com.dlink.job.JobResult;
import com.dlink.result.IResult;
import com.dlink.result.RunResult;
import org.apache.flink.table.planner.expressions.In;
......@@ -15,11 +17,10 @@ import java.util.List;
* @since 2021/5/30 11:07
*/
public interface StudioService {
RunResult executeSql(StudioExecuteDTO studioExecuteDTO);
Integer executeSqlTest(StudioExecuteDTO studioExecuteDTO);
JobResult executeSql(StudioExecuteDTO studioExecuteDTO);
RunResult executeDDL(StudioDDLDTO studioDDLDTO);
IResult executeDDL(StudioDDLDTO studioDDLDTO);
boolean clearSession(String session);
......
package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Assert;
import com.dlink.cluster.FlinkCluster;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.ClusterMapper;
......@@ -24,6 +25,18 @@ public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster>
return FlinkCluster.testFlinkJobManagerIP(hosts,host);
}
@Override
public String getJobManagerAddress(Cluster cluster) {
Assert.check(cluster);
String host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
Assert.checkHost(host);
if(!host.equals(cluster.getJobManagerHost())){
cluster.setJobManagerHost(host);
updateById(cluster);
}
return host;
}
@Override
public List<Cluster> listEnabledAll() {
return this.list(new QueryWrapper<Cluster>().eq("enabled",1));
......
......@@ -12,7 +12,9 @@ import com.dlink.explainer.ca.CABuilder;
import com.dlink.explainer.ca.TableCANode;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.model.Cluster;
import com.dlink.result.IResult;
import com.dlink.result.RunResult;
import com.dlink.service.ClusterService;
import com.dlink.service.StudioService;
......@@ -34,7 +36,7 @@ public class StudioServiceImpl implements StudioService {
@Autowired
private ClusterService clusterService;
@Override
/*@Override
public RunResult executeSql(StudioExecuteDTO studioExecuteDTO) {
studioExecuteDTO.setSession(studioExecuteDTO.getClusterId()+"_"+studioExecuteDTO.getSession());
String ExecuteType = Executor.REMOTE;
......@@ -65,53 +67,31 @@ public class StudioServiceImpl implements StudioService {
studioExecuteDTO.getSavePointPath(),
studioExecuteDTO.getJobName()));
return jobManager.execute(studioExecuteDTO.getStatement());
}
}*/
@Override
public Integer executeSqlTest(StudioExecuteDTO studioExecuteDTO) {
studioExecuteDTO.setSession(studioExecuteDTO.getClusterId()+"_"+studioExecuteDTO.getSession());
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
Cluster cluster = clusterService.getById(studioExecuteDTO.getClusterId());
if(cluster==null){
throw new JobException("未获取到集群信息");
}else {
Assert.check(cluster);
String host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
Assert.checkHost(host);
if(!host.equals(cluster.getJobManagerHost())){
cluster.setJobManagerHost(host);
clusterService.updateById(cluster);
}
config.setHost(host);
if(config.isRemote()) {
config.setHost(clusterService.getJobManagerAddress(
clusterService.getById(studioExecuteDTO.getClusterId())
));
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(studioExecuteDTO.getStatement());
}
@Override
public RunResult executeDDL(StudioDDLDTO studioDDLDTO) {
studioDDLDTO.setSession(studioDDLDTO.getClusterId()+"_"+studioDDLDTO.getSession());
String ExecuteType = Executor.REMOTE;
String host =null;
Cluster cluster = clusterService.getById(studioDDLDTO.getClusterId());
if(studioDDLDTO.getClusterId()==0&&cluster==null){
ExecuteType = Executor.LOCAL;
}else {
Assert.check(cluster);
host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
Assert.checkHost(host);
if(!host.equals(cluster.getJobManagerHost())){
cluster.setJobManagerHost(host);
clusterService.updateById(cluster);
}
public IResult executeDDL(StudioDDLDTO studioDDLDTO) {
JobConfig config = studioDDLDTO.getJobConfig();
if(config.isRemote()) {
config.setHost(clusterService.getJobManagerAddress(
clusterService.getById(studioDDLDTO.getClusterId())
));
}
JobManager jobManager = new JobManager(
host,
studioDDLDTO.getSession(),
1000,
new ExecutorSetting(ExecuteType));
return jobManager.execute(studioDDLDTO.getStatement());
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeDDL(studioDDLDTO.getStatement());
}
@Override
public boolean clearSession(String session) {
......
......@@ -86,4 +86,5 @@ public class FlinkCluster {
}
return null;
}
}
......@@ -31,6 +31,10 @@ public interface FlinkSQLConstant {
* show操作
*/
String SHOW = "SHOW";
/**
* DESCRIBE
*/
String DESCRIBE = "DESCRIBE";
/**
* 未知操作类型
*/
......
......@@ -47,4 +47,8 @@ public class Job {
this.startTime = startTime;
this.executor = executor;
}
public JobResult getJobResult(){
return new JobResult(id,jobConfig,jobManagerAddress,status,statement,jobId,error,result,executorSetting,startTime,endTime);
}
}
......@@ -17,7 +17,7 @@ public class JobConfig {
private boolean isResult;
private boolean isSession;
private String session;
private String sessionKey;
private boolean isRemote;
private Integer clusterId;
private String host;
......@@ -29,12 +29,12 @@ public class JobConfig {
private Integer parallelism;
private String savePointPath;
public JobConfig(boolean isResult, boolean isSession, String session, boolean isRemote, Integer clusterId,
public JobConfig(boolean isResult, boolean isSession, String sessionKey, boolean isRemote, Integer clusterId,
Integer taskId, String jobName, boolean useSqlFragment, Integer maxRowNum, Integer checkpoint,
Integer parallelism, String savePointPath) {
this.isResult = isResult;
this.isSession = isSession;
this.session = session;
this.sessionKey = sessionKey;
this.isRemote = isRemote;
this.clusterId = clusterId;
this.taskId = taskId;
......@@ -46,6 +46,14 @@ public class JobConfig {
this.savePointPath = savePointPath;
}
public JobConfig(boolean isResult, boolean isSession, String sessionKey, boolean isRemote, Integer clusterId) {
this.isResult = isResult;
this.isSession = isSession;
this.sessionKey = sessionKey;
this.isRemote = isRemote;
this.clusterId = clusterId;
}
public ExecutorSetting getExecutorSetting(){
String type = Executor.LOCAL;
if(isRemote){
......
......@@ -95,7 +95,7 @@ public class JobManager extends RunTime {
executor = Executor.build(new EnvironmentSetting(jobManagerHost, jobManagerPort), config.getExecutorSetting());
return executor;
} else {
executor = Executor.build(null, executorSetting);
executor = Executor.build(null, config.getExecutorSetting());
return executor;
}
}
......@@ -117,12 +117,12 @@ public class JobManager extends RunTime {
private Executor createExecutorWithSession() {
if(config.isSession()) {
ExecutorEntity executorEntity = SessionPool.get(config.getSession());
ExecutorEntity executorEntity = SessionPool.get(config.getSessionKey());
if (executorEntity != null) {
executor = executorEntity.getExecutor();
} else {
createExecutor();
SessionPool.push(new ExecutorEntity(config.getSession(), executor));
SessionPool.push(new ExecutorEntity(config.getSessionKey(), executor));
}
}else {
createExecutor();
......@@ -239,7 +239,7 @@ public class JobManager extends RunTime {
JobID jobID = tableResult.getJobClient().get().getJobID();
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
InsertResult insertResult = new InsertResult(sqlText, (jobID == null ? "" : jobID.toHexString()), true, timeElapsed, LocalDateTime.now());
InsertResult insertResult = new InsertResult((jobID == null ? "" : jobID.toHexString()), true);
result.setResult(insertResult);
result.setJobId((jobID == null ? "" : jobID.toHexString()));
result.setTime(timeElapsed);
......@@ -276,7 +276,7 @@ public class JobManager extends RunTime {
return result;
}
public Integer executeSql(String statement) {
public JobResult executeSql(String statement) {
Job job = new Job(config,jobManagerHost+NetConstant.COLON+jobManagerPort,
Job.JobStatus.INITIALIZE,statement,executorSetting, LocalDate.now(),executor);
JobContextHolder.setJob(job);
......@@ -323,6 +323,29 @@ public class JobManager extends RunTime {
close();
}
close();
return job.getId();
return job.getJobResult();
}
public IResult executeDDL(String statement) {
String[] statements = statement.split(";");
try {
for (String item : statements) {
if (item.trim().isEmpty()) {
continue;
}
String operationType = Operations.getOperationType(item);
if(FlinkSQLConstant.INSERT.equals(operationType)||FlinkSQLConstant.SELECT.equals(operationType)){
continue;
}
LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(item);
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult);
result.setStartTime(startTime);
return result;
}
} catch (Exception e) {
e.printStackTrace();
}
return new ErrorResult();
}
}
package com.dlink.job;
import com.dlink.executor.ExecutorSetting;
import com.dlink.result.IResult;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDate;
/**
* JobResult
*
* @author wenmo
* @since 2021/6/29 23:56
*/
@Getter
@Setter
public class JobResult {
private Integer id;
private JobConfig jobConfig;
private String jobManagerAddress;
private Job.JobStatus status;
private String statement;
private String jobId;
private String error;
private IResult result;
private ExecutorSetting executorSetting;
private LocalDate startTime;
private LocalDate endTime;
public JobResult(Integer id, JobConfig jobConfig, String jobManagerAddress, Job.JobStatus status, String statement, String jobId, String error, IResult result, ExecutorSetting executorSetting, LocalDate startTime, LocalDate endTime) {
this.id = id;
this.jobConfig = jobConfig;
this.jobManagerAddress = jobManagerAddress;
this.status = status;
this.statement = statement;
this.jobId = jobId;
this.error = error;
this.result = result;
this.executorSetting = executorSetting;
this.startTime = startTime;
this.endTime = endTime;
}
}
package com.dlink.result;
/**
* AbstractBuilder
*
* @author wenmo
* @since 2021/5/25 16:11
**/
public class AbstractBuilder {
protected String operationType;
protected Integer maxRowNum;
protected boolean printRowKind;
protected String nullColumn;
}
package com.dlink.result;
import java.time.LocalDateTime;
/**
* AbstractResult
*
* @author wenmo
* @since 2021/6/29 22:49
*/
public class AbstractResult {
protected boolean success;
protected LocalDateTime startTime;
protected LocalDateTime endTime;
public void setStartTime(LocalDateTime startTime){
this.startTime = startTime;
}
}
package com.dlink.result;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
/**
* DDLResult
*
* @author wenmo
* @since 2021/6/29 22:06
*/
@Setter
@Getter
public class DDLResult extends AbstractResult implements IResult {
public DDLResult(boolean success) {
this.success = success;
this.endTime = LocalDateTime.now();
}
}
package com.dlink.result;
import org.apache.flink.table.api.TableResult;
/**
* DDLResultBuilder
*
* @author wenmo
* @since 2021/6/29 22:43
*/
public class DDLResultBuilder implements ResultBuilder {
@Override
public IResult getResult(TableResult tableResult) {
return new DDLResult(true);
}
}
package com.dlink.result;
import java.time.LocalDateTime;
/**
* ErrorResult
*
* @author wenmo
* @since 2021/6/29 22:57
*/
public class ErrorResult extends AbstractResult implements IResult {
public ErrorResult() {
this.success = false;
this.endTime = LocalDateTime.now();
}
}
package com.dlink.result;
import java.time.LocalDateTime;
/**
* IResult
*
......@@ -7,4 +9,6 @@ package com.dlink.result;
* @since 2021/5/25 16:22
**/
public interface IResult {
void setStartTime(LocalDateTime startTime);
}
package com.dlink.result;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
/**
......@@ -8,58 +11,15 @@ import java.time.LocalDateTime;
* @author wenmo
* @since 2021/5/25 19:08
**/
public class InsertResult implements IResult {
private String statement;
private String jobID;
private boolean success;
private long time;
private LocalDateTime finishDate;
public InsertResult(String statement, String jobID, boolean success, long time, LocalDateTime finishDate) {
this.statement = statement;
this.jobID = jobID;
this.success = success;
this.time = time;
this.finishDate = finishDate;
}
@Getter
@Setter
public class InsertResult extends AbstractResult implements IResult {
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public String getJobID() {
return jobID;
}
private String jobID;
public void setJobID(String jobID) {
public InsertResult(String jobID, boolean success) {
this.jobID = jobID;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public LocalDateTime getFinishDate() {
return finishDate;
}
public void setFinishDate(LocalDateTime finishDate) {
this.finishDate = finishDate;
this.endTime = LocalDateTime.now();
}
}
package com.dlink.result;
import com.dlink.constant.FlinkSQLConstant;
import org.apache.flink.table.api.TableResult;
/**
* InsertBuilder
*
* @author wenmo
* @since 2021/6/29 22:23
*/
public class InsertResultBuilder implements ResultBuilder {
public static final String OPERATION_TYPE = FlinkSQLConstant.INSERT;
@Override
public IResult getResult(TableResult tableResult) {
if(tableResult.getJobClient().isPresent()){
String jobId = tableResult.getJobClient().get().getJobID().toHexString();
return new InsertResult(jobId,true);
}else{
return new InsertResult(null,false);
}
}
}
package com.dlink.result;
import com.dlink.constant.FlinkSQLConstant;
import org.apache.flink.table.api.TableResult;
/**
......@@ -12,10 +13,14 @@ public interface ResultBuilder {
static ResultBuilder build(String operationType, Integer maxRowNum, String nullColumn, boolean printRowKind){
switch (operationType.toUpperCase()){
case SelectBuilder.OPERATION_TYPE:
return new SelectBuilder(operationType,maxRowNum,nullColumn,printRowKind);
case SelectResultBuilder.OPERATION_TYPE:
case FlinkSQLConstant.SHOW:
case FlinkSQLConstant.DESCRIBE:
return new SelectResultBuilder(maxRowNum,nullColumn,printRowKind);
case InsertResultBuilder.OPERATION_TYPE:
return new InsertResultBuilder();
default:
return new SelectBuilder(operationType,maxRowNum,nullColumn,printRowKind);
return new DDLResultBuilder();
}
}
......
package com.dlink.result;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -10,49 +14,25 @@ import java.util.Set;
* @author wenmo
* @since 2021/5/25 16:01
**/
public class SelectResult implements IResult{
@Setter
@Getter
public class SelectResult extends AbstractResult implements IResult{
private String jobID;
private List<Map<String,Object>> rowData;
private Integer total;
private Integer currentCount;
private Set<String> columns;
public SelectResult(List<Map<String, Object>> rowData, Integer total, Integer currentCount, Set<String> columns) {
public SelectResult(List<Map<String, Object>> rowData, Integer total, Integer currentCount, Set<String> columns,
String jobID,boolean success) {
this.rowData = rowData;
this.total = total;
this.currentCount = currentCount;
this.columns = columns;
this.jobID = jobID;
this.success = success;
this.endTime = LocalDateTime.now();
}
public List<Map<String, Object>> getRowData() {
return rowData;
}
public void setRowData(List<Map<String, Object>> rowData) {
this.rowData = rowData;
}
public Integer getTotal() {
return total;
}
public void setTotal(Integer total) {
this.total = total;
}
public Integer getCurrentCount() {
return currentCount;
}
public void setCurrentCount(Integer currentCount) {
this.currentCount = currentCount;
}
public Set<String> getColumns() {
return columns;
}
public void setColumns(Set<String> columns) {
this.columns = columns;
}
}
package com.dlink.result;
import com.dlink.constant.FlinkSQLConstant;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
......@@ -14,12 +15,15 @@ import java.util.stream.Stream;
* @author wenmo
* @since 2021/5/25 16:03
**/
public class SelectBuilder extends AbstractBuilder implements ResultBuilder {
public class SelectResultBuilder implements ResultBuilder {
public static final String OPERATION_TYPE = "SELECT";
public static final String OPERATION_TYPE = FlinkSQLConstant.SELECT;
public SelectBuilder(String operationType, Integer maxRowNum,String nullColumn,boolean printRowKind) {
this.operationType = operationType;
private Integer maxRowNum;
private boolean printRowKind;
private String nullColumn;
public SelectResultBuilder(Integer maxRowNum, String nullColumn, boolean printRowKind) {
this.maxRowNum = maxRowNum;
this.printRowKind = printRowKind;
this.nullColumn = nullColumn;
......@@ -27,25 +31,28 @@ public class SelectBuilder extends AbstractBuilder implements ResultBuilder {
@Override
public IResult getResult(TableResult tableResult) {
String jobId = null;
if(tableResult.getJobClient().isPresent()) {
jobId = tableResult.getJobClient().get().getJobID().toHexString();
}
List<TableColumn> columns = tableResult.getTableSchema().getTableColumns();
int totalCount = 0;
Set<String> column = new LinkedHashSet();
String[] columnNames = (String[]) columns.stream().map(TableColumn::getName).map(s -> s.replace(" ","")).toArray((x$0) -> {
String[] columnNames = columns.stream().map(TableColumn::getName).map(s -> s.replace(" ", "")).toArray((x$0) -> {
return (new String[x$0]);
});
if (printRowKind) {
columnNames = (String[]) Stream.concat(Stream.of("op"), Arrays.stream(columnNames)).toArray((x$0) -> {
columnNames = Stream.concat(Stream.of("op"), Arrays.stream(columnNames)).toArray((x$0) -> {
return new String[x$0];
});
}
long numRows = 0L;
List<Map<String,Object>> rows = new ArrayList<>();
List<Map<String, Object>> rows = new ArrayList<>();
Iterator<Row> it = tableResult.collect();
while(it.hasNext()){
// for (numRows = 0L; it.hasNext() ; ++numRows) {
while (it.hasNext()) {
if (numRows < maxRowNum) {
String[] cols = rowToString((Row) it.next());
Map<String,Object> row = new HashMap<>();
String[] cols = rowToString(it.next());
Map<String, Object> row = new HashMap<>();
for (int i = 0; i < cols.length; i++) {
if (i > columnNames.length) {
column.add("UKN" + i);
......@@ -56,14 +63,13 @@ public class SelectBuilder extends AbstractBuilder implements ResultBuilder {
}
}
rows.add(row);
}else {
} else {
break;
// it.next();
}
numRows++;
totalCount++;
}
return new SelectResult(rows,totalCount,rows.size(),column);
return new SelectResult(rows, totalCount, rows.size(), column, jobId, true);
}
public String[] rowToString(Row row) {
......
@import '~antd/es/style/themes/default.less';
.form_setting{
padding-left: 10px;
}
.form_item{
margin-bottom: 5px;
}
import {connect} from "umi";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {Form, InputNumber,Input,Switch,Select,Tag,Row,Col,Divider,Tooltip,Button} from "antd";
import {InfoCircleOutlined,PlusOutlined,MinusSquareOutlined} from "@ant-design/icons";
import styles from "./index.less";
import {useEffect, useState} from "react";
import {showTables} from "@/components/Studio/StudioEvent/DDL";
const { Option } = Select;
const StudioConfig = (props: any) => {
const {current,form,dispatch,tabs,session} = props;
const [newSesstion, setNewSesstion] = useState<string>('');
form.setFieldsValue(current.task);
const addSession = ()=>{
if(newSesstion!='') {
dispatch && dispatch({
type: "Studio/saveSession",
payload: newSesstion,
});
setNewSesstion('');
}
};
const onValuesChange = (change:any,all:any)=>{
let newTabs = tabs;
for(let i=0;i<newTabs.panes.length;i++){
if(newTabs.panes[i].key==newTabs.activeKey){
for(let key in change){
newTabs.panes[i].task[key]=change[key];
}
break;
}
}
dispatch&&dispatch({
type: "Studio/saveTabs",
payload: newTabs,
});
};
const onChangeClusterSession = ()=>{
showTables(current.task,dispatch);
};
return (
<>
<Row>
<Col span={24}>
<div style={{float: "right"}}>
<Tooltip title="最小化">
<Button
type="text"
icon={<MinusSquareOutlined />}
/>
</Tooltip>
</div>
</Col>
</Row>
<Form
form={form}
layout="vertical"
className={styles.form_setting}
onValuesChange={onValuesChange}
>
<Form.Item
label="作业名" className={styles.form_item} name="jobName"
tooltip='设置任务名称,默认为作业名'
>
<Input placeholder="自定义作业名" />
</Form.Item>
<Row>
<Col span={12}>
<Form.Item
label="预览结果" className={styles.form_item} name="isResult" valuePropName="checked"
tooltip={{ title: '开启预览结果,将同步运行并返回数据结果', icon: <InfoCircleOutlined /> }}
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="最大行数" className={styles.form_item} name="maxRowNum"
tooltip='预览数据的最大行数'
>
<InputNumber min={1} max={9999} defaultValue={100} />
</Form.Item>
</Col>
</Row>
<Form.Item
label="远程执行" className={styles.form_item} name="isRemote" valuePropName="checked"
tooltip={{ title: '开启远程执行,将在远程集群进行任务执行', icon: <InfoCircleOutlined /> }}
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
<Row>
<Col span={10}>
<Form.Item
label="共享会话" className={styles.form_item} name="isSession" valuePropName="checked"
tooltip={{ title: '开启共享会话,将进行 Flink Catalog 的共享', icon: <InfoCircleOutlined /> }}
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
<Col span={14}>
<Form.Item
label="会话 Key" tooltip="设置共享会话的 Key" name="session"
className={styles.form_item}>
<Select
placeholder="选择会话"
allowClear
onChange={onChangeClusterSession}
dropdownRender={menu => (
<div>
{menu}
<Divider style={{ margin: '4px 0' }} />
<div style={{ display: 'flex', flexWrap: 'nowrap', padding: 8 }}>
<Input style={{ flex: 'auto' }} value={newSesstion}
onChange={(e)=>{
setNewSesstion(e.target.value);
}}
/>
<a
style={{ flex: 'none', padding: '8px', display: 'block', cursor: 'pointer' }}
onClick={addSession}
>
<PlusOutlined />
</a>
</div>
</div>
)}
>
{session.map(item => (
<Option key={item}>{item}</Option>
))}
</Select>
</Form.Item>
</Col>
</Row>
</Form>
</>
);
};
export default connect(({Studio}: { Studio: StateType }) => ({
cluster: Studio.cluster,
current: Studio.current,
tabs: Studio.tabs,
session: Studio.session,
}))(StudioConfig);
......@@ -136,7 +136,7 @@ const StudioConnector = (props:any) => {
};
const getTables = () => {
showTables(current.task.clusterId,current.task.clusterName,current.task.session,dispatch);
showTables(current.task,dispatch);
};
const clearSession = () => {
......
......@@ -38,6 +38,9 @@ export type StudioParam = {
clusterId: number,
session: string,
maxRowNum?: number,
isResult:boolean;
isSession:boolean;
isRemote:boolean;
}
export type CAParam = {
statement: string,
......
import {executeDDL} from "@/pages/FlinkSqlStudio/service";
import FlinkSQL from "./FlinkSQL";
import {TaskType} from "@/pages/FlinkSqlStudio/model";
export function showTables(clusterId:number,clusterName:string,session:string,dispatch:any) {
export function showTables(task:TaskType,dispatch:any) {
const res = executeDDL({
statement:FlinkSQL.SHOW_TABLES,
clusterId: clusterId,
session:session,
clusterId: task.clusterId,
session:task.session,
isRemote:task.isRemote,
isSession:task.isSession,
isResult:true,
});
res.then((result)=>{
let tableData = [];
if(result.datas.result.rowData.length>0){
tableData = result.datas.result.rowData;
if(result.datas.rowData.length>0){
tableData = result.datas.rowData;
}
dispatch&&dispatch({
type: "Studio/refreshCurrentSessionCluster",
payload: {
session: session,
clusterId: clusterId,
clusterName: clusterName,
session: task.session,
clusterId: task.clusterId,
clusterName: task.clusterName,
connectors: tableData,
},
});
......
......@@ -44,6 +44,9 @@ const StudioMenu = (props: any) => {
fragment:current.task.fragment,
savePointPath:current.task.savePointPath,
jobName:current.task.jobName,
isResult:current.task.isResult,
isSession:current.task.isSession,
remote:current.task.isRemote,
};
const key = current.key;
const taskKey = (Math.random()*1000)+'';
......@@ -77,7 +80,7 @@ const StudioMenu = (props: any) => {
type: "Studio/saveTabs",
payload: newTabs,
});
showTables(current.task.clusterId,current.task.clusterName,current.task.session,dispatch);
showTables(current.task,dispatch);
})
};
......
......@@ -10,9 +10,8 @@ const { Option } = Select;
const StudioSetting = (props: any) => {
const {cluster,current,form,dispatch,tabs,session} = props;
const {cluster,current,form,dispatch,tabs} = props;
const [clusterOption, setClusterOption] = useState<[]>();
const [newSesstion, setNewSesstion] = useState<string>('');
const getCluster = ()=>{
......@@ -34,16 +33,6 @@ const StudioSetting = (props: any) => {
getCluster();
}, []);
const addSession = ()=>{
if(newSesstion!='') {
dispatch && dispatch({
type: "Studio/saveSession",
payload: newSesstion,
});
setNewSesstion('');
}
};
const onValuesChange = (change:any,all:any)=>{
let newTabs = tabs;
for(let i=0;i<newTabs.panes.length;i++){
......@@ -62,7 +51,7 @@ const StudioSetting = (props: any) => {
};
const onChangeClusterSession = ()=>{
showTables(current.task.clusterId,current.task.clusterName,current.task.session,dispatch);
showTables(current.task,dispatch);
};
return (
<>
......@@ -82,15 +71,8 @@ const StudioSetting = (props: any) => {
form={form}
layout="vertical"
className={styles.form_setting}
//initialValues={current.task}
onValuesChange={onValuesChange}
>
<Form.Item
label="jobName" className={styles.form_item} name="jobName"
tooltip='设置任务名称,默认为作业名'
>
<Input placeholder="hdfs://..." />
</Form.Item>
<Row>
<Col span={12}>
<Form.Item label="CheckPoint" tooltip="设置Flink任务的检查点步长,0 代表不启用" name="checkPoint"
......@@ -115,14 +97,6 @@ const StudioSetting = (props: any) => {
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="MaxRowNum" className={styles.form_item} name="maxRowNum"
tooltip='预览数据的最大行数'
>
<InputNumber min={1} max={9999} defaultValue={100} />
</Form.Item>
</Col>
</Row>
......@@ -152,43 +126,6 @@ const StudioSetting = (props: any) => {
</Form.Item>
</Col>
</Row>
<Row>
<Col span={24}>
<Form.Item
label="共享会话" tooltip="选择会话进行 Flink Catalog 的共享,当未选择时默认禁用该功能" name="session"
className={styles.form_item}>
<Select
placeholder="选择会话"
// defaultValue='admin'
allowClear
onChange={onChangeClusterSession}
dropdownRender={menu => (
<div>
{menu}
<Divider style={{ margin: '4px 0' }} />
<div style={{ display: 'flex', flexWrap: 'nowrap', padding: 8 }}>
<Input style={{ flex: 'auto' }} value={newSesstion}
onChange={(e)=>{
setNewSesstion(e.target.value);
}}
/>
<a
style={{ flex: 'none', padding: '8px', display: 'block', cursor: 'pointer' }}
onClick={addSession}
>
<PlusOutlined />
</a>
</div>
</div>
)}
>
{session.map(item => (
<Option key={item}>{item}</Option>
))}
</Select>
</Form.Item>
</Col>
</Row>
</Form>
</>
);
......
......@@ -139,6 +139,9 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
session:'',
maxRowNum: 100,
jobName:node.name,
isResult:false,
isSession:false,
isRemote:true,
...result.datas
},
console:{
......
......@@ -14,6 +14,7 @@ import StudioConsole from "./StudioConsole";
import StudioSetting from "./StudioSetting";
import StudioEdit from "./StudioEdit";
import StudioConnector from "./StudioConnector";
import StudioConfig from "./StudioConfig";
const {TabPane} = Tabs;
......@@ -77,13 +78,16 @@ const Studio: React.FC<StudioProps> = (props) => {
</Col>
<Col span={4} className={styles["vertical-tabs"]}>
<Tabs defaultActiveKey="1" size="small" tabPosition="right" style={{ height: "100%",border: "1px solid #f0f0f0"}}>
<TabPane tab={<span><SettingOutlined /> 配置</span>} key="1" >
<TabPane tab={<span><SettingOutlined /> 作业配置</span>} key="StudioSetting" >
<StudioSetting form={form} />
</TabPane>
<TabPane tab={<span><ScheduleOutlined /> 详情</span>} key="2" >
<TabPane tab={<span><ScheduleOutlined /> 执行配置</span>} key="StudioConfig" >
<StudioConfig form={form}/>
</TabPane>
<TabPane tab={<span><ScheduleOutlined /> 详情</span>} key="3" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane>
<TabPane tab={<span><AuditOutlined /> 审计</span>} key="3" >
<TabPane tab={<span><AuditOutlined /> 审计</span>} key="4" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane>
</Tabs>
......
......@@ -41,6 +41,9 @@ export type TaskType = {
session: string;
maxRowNum: number;
jobName: string;
isResult:boolean;
isSession:boolean;
isRemote:boolean;
};
export type ConsoleType = {
......@@ -148,6 +151,9 @@ const Model: ModelType = {
maxRowNum: 100,
session: '',
alias: '草稿',
isResult:true,
isSession:false,
isRemote:false,
},
console: {
result: [],
......@@ -176,6 +182,9 @@ const Model: ModelType = {
session: '',
maxRowNum: 100,
alias: '草稿',
isResult:true,
isSession:false,
isRemote:false,
},
console: {
result: [],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment