Commit 5c4dd254 authored by wenmo's avatar wenmo

FlinkSQL及SQL导出

parent df19373b
......@@ -113,5 +113,13 @@ public class TaskController {
public Result listFlinkSQLEnv() {
return Result.succeed(taskService.listFlinkSQLEnv(),"获取成功");
}
/**
* 导出 sql
*/
@GetMapping(value = "/exportSql")
public Result exportSql(@RequestParam Integer id) {
return Result.succeed(taskService.exportSql(id),"获取成功");
}
}
......@@ -22,4 +22,6 @@ public interface TaskService extends ISuperService<Task> {
boolean saveOrUpdateTask(Task task);
List<Task> listFlinkSQLEnv();
String exportSql(Integer id);
}
......@@ -8,6 +8,7 @@ import com.dlink.config.Dialect;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.dto.SqlDTO;
import com.dlink.gateway.GatewayType;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
......@@ -64,55 +65,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return studioService.executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(),null));
}
boolean isJarTask = isJarTask(task);
if(!isJarTask&&Asserts.isNotNull(task.getEnvId())){
Task envTask = getTaskInfoById(task.getEnvId());
if(Asserts.isNotNull(envTask)&&Asserts.isNotNullString(envTask.getStatement())) {
task.setStatement(envTask.getStatement() + "\r\n" + task.getStatement());
}
}
JobConfig config = task.buildSubmitConfig();
if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
} else {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())||GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if(!isJarTask) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas", systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarMainAppClass", systemConfiguration.getSqlSubmitJarMainAppClass());
}else{
Jar jar = jarService.getById(task.getJarId());
Assert.check(jar);
gatewayConfig.put("userJarPath", jar.getPath());
gatewayConfig.put("userJarParas", jar.getParas());
gatewayConfig.put("userJarMainAppClass", jar.getMainClass());
}
}
config.buildGatewayConfig(gatewayConfig);
config.addGatewayConfig(task.parseConfig());
}
switch (config.getSavePointStrategy()) {
case LATEST:
Savepoints latestSavepoints = savepointsService.getLatestSavepointByTaskId(id);
if (Asserts.isNotNull(latestSavepoints)) {
config.setSavePointPath(latestSavepoints.getPath());
}
break;
case EARLIEST:
Savepoints earliestSavepoints = savepointsService.getEarliestSavepointByTaskId(id);
if (Asserts.isNotNull(earliestSavepoints)) {
config.setSavePointPath(earliestSavepoints.getPath());
}
break;
case CUSTOM:
break;
default:
config.setSavePointPath(null);
}
JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config);
if(!isJarTask) {
if(!config.isJarTask()) {
return jobManager.executeSql(task.getStatement());
}else{
return jobManager.executeJar();
......@@ -180,4 +135,71 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return this.list(new QueryWrapper<Task>().eq("dialect", Dialect.FLINKSQLENV).eq("enabled",1));
}
@Override
public String exportSql(Integer id) {
Task task = getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if(Dialect.isSql(task.getDialect())){
return task.getStatement();
}
JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config);
if(!config.isJarTask()) {
return jobManager.exportSql(task.getStatement());
}else{
return "";
}
}
private JobConfig buildJobConfig(Task task){
boolean isJarTask = isJarTask(task);
if(!isJarTask&&Asserts.isNotNull(task.getEnvId())){
Task envTask = getTaskInfoById(task.getEnvId());
if(Asserts.isNotNull(envTask)&&Asserts.isNotNullString(envTask.getStatement())) {
task.setStatement(envTask.getStatement() + "\r\n" + task.getStatement());
}
}
JobConfig config = task.buildSubmitConfig();
config.setJarTask(isJarTask);
if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
} else {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())||GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if(!isJarTask) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas", systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarMainAppClass", systemConfiguration.getSqlSubmitJarMainAppClass());
}else{
Jar jar = jarService.getById(task.getJarId());
Assert.check(jar);
gatewayConfig.put("userJarPath", jar.getPath());
gatewayConfig.put("userJarParas", jar.getParas());
gatewayConfig.put("userJarMainAppClass", jar.getMainClass());
}
}
config.buildGatewayConfig(gatewayConfig);
config.addGatewayConfig(task.parseConfig());
}
switch (config.getSavePointStrategy()) {
case LATEST:
Savepoints latestSavepoints = savepointsService.getLatestSavepointByTaskId(task.getId());
if (Asserts.isNotNull(latestSavepoints)) {
config.setSavePointPath(latestSavepoints.getPath());
}
break;
case EARLIEST:
Savepoints earliestSavepoints = savepointsService.getEarliestSavepointByTaskId(task.getId());
if (Asserts.isNotNull(earliestSavepoints)) {
config.setSavePointPath(earliestSavepoints.getPath());
}
break;
case CUSTOM:
break;
default:
config.setSavePointPath(null);
}
return config;
}
}
......@@ -19,7 +19,7 @@ public class SqlUtil {
public static String removeNote(String sql){
if(Asserts.isNotNullString(sql)) {
sql = sql.replaceAll("\u00A0", " ").replaceAll("--([^'\r\n]{0,}('[^'\r\n]{0,}'){0,1}[^'\r\n]{0,}){0,}", "").trim();
sql = sql.replaceAll("\u00A0", " ").replaceAll("--([^'\r\n]{0,}('[^'\r\n]{0,}'){0,1}[^'\r\n]{0,}){0,}", "").replaceAll("[\r\n]+", "\r\n").trim();
}
return sql;
}
......
......@@ -33,6 +33,7 @@ public class JobConfig {
private Integer clusterId;
private Integer clusterConfigurationId;
private Integer jarId;
private boolean isJarTask=false;
private String address;
private Integer taskId;
private String jobName;
......
......@@ -2,6 +2,7 @@ package com.dlink.job;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.config.Dialect;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.EnvironmentSetting;
import com.dlink.executor.Executor;
......@@ -26,14 +27,22 @@ import com.dlink.session.SessionPool;
import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* JobManager
......@@ -447,4 +456,38 @@ public class JobManager {
public static TestResult testGateway(GatewayConfig gatewayConfig){
return Gateway.build(gatewayConfig).test();
}
public String exportSql(String sql){
String statement = executor.pretreatStatement(sql);
StringBuilder sb = new StringBuilder();
if(Asserts.isNotNullString(config.getJobName())){
sb.append("set " + PipelineOptions.NAME.key() + " = " + config.getJobName() + ";\r\n");
}
if(Asserts.isNotNull(config.getParallelism())){
sb.append("set " + CoreOptions.DEFAULT_PARALLELISM.key() + " = " + config.getParallelism() + ";\r\n");
}
if(Asserts.isNotNull(config.getCheckpoint())){
sb.append("set " + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key() + " = " + config.getCheckpoint() + ";\r\n");
}
if(Asserts.isNotNullString(config.getSavePointPath())){
sb.append("set " + SavepointConfigOptions.SAVEPOINT_PATH + " = " + config.getSavePointPath() + ";\r\n");
}
if(Asserts.isNotNull(config.getGatewayConfig().getFlinkConfig().getConfiguration())) {
for (Map.Entry<String, String> entry : config.getGatewayConfig().getFlinkConfig().getConfiguration().entrySet()) {
sb.append("set " + entry.getKey() + " = " + entry.getValue() + ";\r\n");
}
}
switch (GatewayType.get(config.getType())){
case YARN_PER_JOB:
case YARN_APPLICATION:
sb.append("set " + DeploymentOptions.TARGET.key() + " = " + GatewayType.get(config.getType()).getLongValue() + ";\r\n");
sb.append("set " + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + " = " + Collections.singletonList(config.getGatewayConfig().getClusterConfig().getFlinkLibPath()) + ";\r\n");
if(Asserts.isNotNullString(config.getGatewayConfig().getFlinkConfig().getJobName())) {
sb.append("set " + YarnConfigOptions.APPLICATION_NAME.key() + " = " + config.getGatewayConfig().getFlinkConfig().getJobName() + ";\r\n");
}
}
sb.append(statement);
return sb.toString();
}
}
import styles from "./index.less";
import {Menu, Dropdown, Tooltip, Row, Col, Popconfirm, notification, Modal, message} from "antd";
import {
PauseCircleTwoTone, CopyTwoTone, DeleteTwoTone, PlayCircleTwoTone, DiffTwoTone,
PauseCircleTwoTone, CopyTwoTone, DeleteTwoTone, PlayCircleTwoTone, DiffTwoTone,SnippetsTwoTone,
FileAddTwoTone, FolderOpenTwoTone, SafetyCertificateTwoTone, SaveTwoTone, FlagTwoTone,
EnvironmentOutlined, SmileOutlined, RocketTwoTone, QuestionCircleTwoTone, MessageOutlined, ClusterOutlined
} from "@ant-design/icons";
......@@ -19,6 +19,10 @@ import {showCluster, showTables, saveTask} from "@/components/Studio/StudioEvent
import {useEffect, useState} from "react";
import StudioExplain from "../StudioConsole/StudioExplain";
import {DIALECT, isSql} from "@/components/Studio/conf";
import {
ModalForm,
} from '@ant-design/pro-form';
import SqlExport from "@/pages/FlinkSqlStudio/SqlExport";
const menu = (
<Menu>
......@@ -31,6 +35,7 @@ const StudioMenu = (props: any) => {
const {tabs, current, currentPath, form, refs, dispatch, currentSession} = props;
const [modalVisible, handleModalVisible] = useState<boolean>(false);
const [exportModalVisible, handleExportModalVisible] = useState<boolean>(false);
const [graphModalVisible, handleGraphModalVisible] = useState<boolean>(false);
const [explainData, setExplainData] = useState([]);
const [graphData, setGraphData] = useState();
......@@ -248,6 +253,10 @@ const StudioMenu = (props: any) => {
saveTask(current,dispatch);
};
const exportSql = () => {
handleExportModalVisible(true);
};
const runMenu = (
<Menu>
<Menu.Item onClick={execute}>同步执行</Menu.Item>
......@@ -344,6 +353,13 @@ const StudioMenu = (props: any) => {
onClick={saveSqlAndSettingToTask}
/>
</Tooltip>
<Tooltip title="导出当前的 Sql 及配置">
<Button
type="text"
icon={<SnippetsTwoTone />}
onClick={exportSql}
/>
</Tooltip>
<Divider type="vertical"/>
<Tooltip title="检查当前的 FlinkSql">
<Button
......@@ -432,6 +448,27 @@ const StudioMenu = (props: any) => {
>
<StudioGraph data={graphData} />
</Modal>
<ModalForm
title={`${current.task.alias} 的 ${current.task.dialect} 导出`}
visible={exportModalVisible}
width={1000}
modalProps={{
maskClosable:false,
bodyStyle:{
padding: '5px'
}
}}
onVisibleChange={handleExportModalVisible}
submitter={{
submitButtonProps: {
style: {
display: 'none',
},
},
}}
>
<SqlExport id={current.task.id} />
</ModalForm>
</Row>
);
};
......
......@@ -3,7 +3,7 @@ import {connect} from "umi";
import styles from './index.less';
import {} from "@ant-design/icons";
import StudioMenu from "./StudioMenu";
import {Row, Col, Card, Form, BackTop} from "antd";
import {Row, Col, Card, Form} from "antd";
import StudioTabs from "./StudioTabs";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import StudioConsole from "./StudioConsole";
......@@ -151,7 +151,6 @@ const Studio: React.FC<StudioProps> = (props) => {
</Row>
</DraggleVerticalLayout>
</Card>
<BackTop/>
</div>
)
};
......
import {Typography} from 'antd';
import {useEffect, useState} from "react";
import {getData} from "@/components/Common/crud";
const {Text, Paragraph} = Typography;
const SqlExport = (props: any) => {
const {id} = props;
const [statement, setStatement] = useState<string>();
const refreshStatement = async () => {
const msg = await getData('api/task/exportSql', {id: id});
setStatement(msg.datas);
};
useEffect(() => {
refreshStatement();
}, []);
return (<>
<Paragraph copyable={{text: statement}}>
</Paragraph>
<Paragraph>
<pre style={{height: '300px'}}>{statement}</pre>
</Paragraph></>)
};
export default SqlExport;
......@@ -517,6 +517,9 @@ export default (): React.ReactNode => {
<li>
<Link>新增 元数据查看表和字段信息</Link>
</li>
<li>
<Link>新增 FlinkSQL 及 SQL 导出</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment