Commit caaa087c authored by wenmo's avatar wenmo

去除 远程执行配置

parent 745cfd8e
...@@ -29,13 +29,11 @@ public class StudioExecuteDTO extends AbstractStatementDTO{ ...@@ -29,13 +29,11 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
private boolean statementSet; private boolean statementSet;
private boolean useSession; private boolean useSession;
private String session; private String session;
private boolean useRemote;
private Integer clusterId; private Integer clusterId;
private Integer clusterConfigurationId; private Integer clusterConfigurationId;
private Integer databaseId; private Integer databaseId;
private Integer jarId; private Integer jarId;
private boolean fragment; private boolean fragment;
// private String statement;
private String jobName; private String jobName;
private Integer taskId; private Integer taskId;
private Integer maxRowNum; private Integer maxRowNum;
...@@ -61,7 +59,7 @@ public class StudioExecuteDTO extends AbstractStatementDTO{ ...@@ -61,7 +59,7 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
); );
} }
return new JobConfig( return new JobConfig(
type,useResult,useChangeLog,useAutoCancel, useSession, session, useRemote, clusterId, type,useResult,useChangeLog,useAutoCancel, useSession, session, clusterId,
clusterConfigurationId,jarId, taskId, jobName, fragment,statementSet, clusterConfigurationId,jarId, taskId, jobName, fragment,statementSet,
maxRowNum, checkPoint, parallelism,savePointStrategy, savePointPath,config); maxRowNum, checkPoint, parallelism,savePointStrategy, savePointPath,config);
} }
......
...@@ -70,6 +70,13 @@ public class StudioServiceImpl implements StudioService { ...@@ -70,6 +70,13 @@ public class StudioServiceImpl implements StudioService {
} }
} }
private void buildSession(JobConfig config){
// If you are using a shared session, configure the current jobmanager address
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), config.getClusterId()));
}
}
@Override @Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) { public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
if(Dialect.isSql(studioExecuteDTO.getDialect())){ if(Dialect.isSql(studioExecuteDTO.getDialect())){
...@@ -83,13 +90,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -83,13 +90,7 @@ public class StudioServiceImpl implements StudioService {
private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) { private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO); addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
// If you are using a shared session, configure the current jobmanager address buildSession(config);
if(!config.isUseSession()) {
if(GatewayType.LOCAL.equalsValue(config.getType())){
config.setUseRemote(false);
}
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
// To initialize java udf, but it has a bug in the product environment now. // To initialize java udf, but it has a bug in the product environment now.
// initUDF(config,studioExecuteDTO.getStatement()); // initUDF(config,studioExecuteDTO.getStatement());
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
...@@ -134,9 +135,6 @@ public class StudioServiceImpl implements StudioService { ...@@ -134,9 +135,6 @@ public class StudioServiceImpl implements StudioService {
public IResult executeDDL(StudioDDLDTO studioDDLDTO) { public IResult executeDDL(StudioDDLDTO studioDDLDTO) {
JobConfig config = studioDDLDTO.getJobConfig(); JobConfig config = studioDDLDTO.getJobConfig();
if(!config.isUseSession()) { if(!config.isUseSession()) {
if(GatewayType.LOCAL.equalsValue(config.getType())){
config.setUseRemote(false);
}
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId()));
} }
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
...@@ -155,12 +153,9 @@ public class StudioServiceImpl implements StudioService { ...@@ -155,12 +153,9 @@ public class StudioServiceImpl implements StudioService {
private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO) { private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO); addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
if(!config.isUseSession()) { // If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local.
if(GatewayType.LOCAL.equalsValue(config.getType())){ config.buildLocal();
config.setUseRemote(false); buildSession(config);
}
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
// To initialize java udf, but it has a bug in the product environment now. // To initialize java udf, but it has a bug in the product environment now.
// initUDF(config,studioExecuteDTO.getStatement()); // initUDF(config,studioExecuteDTO.getStatement());
JobManager jobManager = JobManager.buildPlanMode(config); JobManager jobManager = JobManager.buildPlanMode(config);
...@@ -190,13 +185,9 @@ public class StudioServiceImpl implements StudioService { ...@@ -190,13 +185,9 @@ public class StudioServiceImpl implements StudioService {
public ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO) { public ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO); addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
config.setType(GatewayType.LOCAL.getLongValue()); // If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local.
if(!config.isUseSession()) { config.buildLocal();
if(GatewayType.LOCAL.equalsValue(config.getType())){ buildSession(config);
config.setUseRemote(false);
}
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.buildPlanMode(config); JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.getStreamGraph(studioExecuteDTO.getStatement()); return jobManager.getStreamGraph(studioExecuteDTO.getStatement());
} }
...@@ -205,13 +196,9 @@ public class StudioServiceImpl implements StudioService { ...@@ -205,13 +196,9 @@ public class StudioServiceImpl implements StudioService {
public ObjectNode getJobPlan(StudioExecuteDTO studioExecuteDTO) { public ObjectNode getJobPlan(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO); addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
config.setType(GatewayType.LOCAL.getLongValue()); // If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local.
if(!config.isUseSession()) { config.buildLocal();
if(GatewayType.LOCAL.equalsValue(config.getType())){ buildSession(config);
config.setUseRemote(false);
}
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.buildPlanMode(config); JobManager jobManager = JobManager.buildPlanMode(config);
String planJson = jobManager.getJobPlanJson(studioExecuteDTO.getStatement()); String planJson = jobManager.getJobPlanJson(studioExecuteDTO.getStatement());
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
......
...@@ -285,9 +285,6 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -285,9 +285,6 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
JobConfig config = task.buildSubmitConfig(); JobConfig config = task.buildSubmitConfig();
config.setJarTask(isJarTask); config.setJarTask(isJarTask);
if (!JobManager.useGateway(config.getType())) { if (!JobManager.useGateway(config.getType())) {
if (GatewayType.LOCAL.equalsValue(config.getType())) {
config.setUseRemote(false);
}
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
} else { } else {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId()); Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
......
...@@ -2,6 +2,7 @@ package com.dlink.job; ...@@ -2,6 +2,7 @@ package com.dlink.job;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.AppConfig; import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.ClusterConfig; import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.FlinkConfig; import com.dlink.gateway.config.FlinkConfig;
...@@ -63,7 +64,7 @@ public class JobConfig { ...@@ -63,7 +64,7 @@ public class JobConfig {
this.config = config; this.config = config;
} }
public JobConfig(String type, boolean useResult, boolean useChangeLog, boolean useAutoCancel, boolean useSession, String session, boolean useRemote, Integer clusterId, public JobConfig(String type, boolean useResult, boolean useChangeLog, boolean useAutoCancel, boolean useSession, String session, Integer clusterId,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment, Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint, Integer parallelism, boolean useStatementSet, Integer maxRowNum, Integer checkpoint, Integer parallelism,
Integer savePointStrategyValue, String savePointPath, Map<String,String> config) { Integer savePointStrategyValue, String savePointPath, Map<String,String> config) {
...@@ -73,7 +74,7 @@ public class JobConfig { ...@@ -73,7 +74,7 @@ public class JobConfig {
this.useAutoCancel = useAutoCancel; this.useAutoCancel = useAutoCancel;
this.useSession = useSession; this.useSession = useSession;
this.session = session; this.session = session;
this.useRemote = useRemote; this.useRemote = true;
this.clusterId = clusterId; this.clusterId = clusterId;
this.clusterConfigurationId = clusterConfigurationId; this.clusterConfigurationId = clusterConfigurationId;
this.jarId = jarId; this.jarId = jarId;
...@@ -213,4 +214,12 @@ public class JobConfig { ...@@ -213,4 +214,12 @@ public class JobConfig {
gatewayConfig.getFlinkConfig().getConfiguration().put(entry.getKey(), (String) entry.getValue()); gatewayConfig.getFlinkConfig().getConfiguration().put(entry.getKey(), (String) entry.getValue());
} }
} }
public boolean isUseRemote() {
return !GatewayType.LOCAL.equalsValue(type);
}
public void buildLocal(){
type = GatewayType.LOCAL.getLongValue();
}
} }
...@@ -26,7 +26,7 @@ public class JobManagerTest { ...@@ -26,7 +26,7 @@ public class JobManagerTest {
@Test @Test
public void cancelJobSelect(){ public void cancelJobSelect(){
JobConfig config = new JobConfig("session-yarn",true,true, true,true, "s1", true, 2, JobConfig config = new JobConfig("session-yarn",true,true, true,true, "s1", 2,
null, null,null, "测试", false,false, 100, 0, null, null,null, "测试", false,false, 100, 0,
1, 0,null,new HashMap<>()); 1, 0,null,new HashMap<>());
if(config.isUseRemote()) { if(config.isUseRemote()) {
......
import {connect} from "umi"; import {connect} from "umi";
import {StateType} from "@/pages/FlinkSqlStudio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import { import {
Form, InputNumber,Switch, Row, Col, Tooltip, Button, Badge, Form, InputNumber,Switch, Row, Col, Tooltip, Button,
Typography
} from "antd"; } from "antd";
import {InfoCircleOutlined,MinusSquareOutlined} from "@ant-design/icons"; import {InfoCircleOutlined,MinusSquareOutlined} from "@ant-design/icons";
import styles from "./index.less"; import styles from "./index.less";
import { Scrollbars } from 'react-custom-scrollbars'; import { Scrollbars } from 'react-custom-scrollbars';
const { Text } = Typography;
const StudioConfig = (props: any) => { const StudioConfig = (props: any) => {
const {current,form,dispatch,tabs,currentSession,toolHeight} = props; const {current,form,dispatch,tabs,toolHeight} = props;
form.setFieldsValue(current.task); form.setFieldsValue(current.task);
...@@ -94,16 +91,6 @@ const StudioConfig = (props: any) => { ...@@ -94,16 +91,6 @@ const StudioConfig = (props: any) => {
</Form.Item> </Form.Item>
</Col> </Col>
</Row> </Row>
<Form.Item
label="远程执行" className={styles.form_item} name="useRemote" valuePropName="checked"
tooltip={{ title: '开启远程执行,将在远程集群进行任务执行', icon: <InfoCircleOutlined /> }}
>
{
currentSession.session?
(currentSession.sessionConfig&&currentSession.sessionConfig.useRemote?(<><Badge status="success"/><Text type="success">已启用</Text></>):(<><Badge status="error"/><Text type="danger">已禁用</Text></>)
):(<Switch checkedChildren="启用" unCheckedChildren="禁用"/>)
}
</Form.Item>
</Form> </Form>
</Scrollbars> </Scrollbars>
</> </>
......
...@@ -91,7 +91,6 @@ export type TaskType = { ...@@ -91,7 +91,6 @@ export type TaskType = {
useChangeLog: boolean; useChangeLog: boolean;
useAutoCancel: boolean; useAutoCancel: boolean;
useSession: boolean; useSession: boolean;
useRemote: boolean;
}; };
export type ConsoleType = { export type ConsoleType = {
...@@ -125,7 +124,6 @@ export type SessionType = { ...@@ -125,7 +124,6 @@ export type SessionType = {
session?: string; session?: string;
sessionConfig?: { sessionConfig?: {
type?: string; type?: string;
useRemote?: boolean;
clusterId?: number; clusterId?: number;
clusterName?: string; clusterName?: string;
address?: string; address?: string;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment