Commit c7fc038f authored by wenmo's avatar wenmo

新增changlog和table查询方式

parent 708b9189
......@@ -20,7 +20,7 @@ public class BannerInitializer implements ApplicationContextInitializer<Configur
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
if (!(applicationContext instanceof AnnotationConfigApplicationContext)) {
LogoBanner logoBanner = new LogoBanner(BannerInitializer.class, "/dlink/logo.txt", "Welcome to Dlink", 5, 6, new Color[5], true);
LogoBanner logoBanner = new LogoBanner(BannerInitializer.class, "/dlink/logo.txt", "Welcome to Dinky", 5, 7, new Color[5], true);
CustomBanner.show(logoBanner, new Description(BannerConstant.VERSION + ":", CommonConstant.PROJECT_VERSION, 0, 1)
, new Description("Github:", "https://github.com/DataLinkDC/dlink", 0, 1)
, new Description("公众号:", "DataLink数据中台", 0, 1)
......
......@@ -24,6 +24,8 @@ public class APIExecuteSqlDTO extends AbstractStatementDTO{
// RUN_MODE
private String type;
private boolean useResult = false;
private boolean useChangeLog = false;
private boolean useAutoCancel = false;
private boolean useStatementSet = false;
private String address;
private boolean fragment = false;
......@@ -42,7 +44,7 @@ public class APIExecuteSqlDTO extends AbstractStatementDTO{
savePointStrategy = 3;
}
return new JobConfig(
type, useResult, false, null, true, address, jobName,
type, useResult,useChangeLog, useChangeLog, false, null, true, address, jobName,
fragment, useStatementSet, maxRowNum, checkPoint, parallelism, savePointStrategy,
savePointPath, configuration, gatewayConfig);
}
......
......@@ -20,9 +20,10 @@ public class StudioDDLDTO {
private boolean useRemote;
private Integer clusterId;
private String statement;
private Integer maxRowNum = 10000;
public JobConfig getJobConfig() {
return new JobConfig(type,useResult, useSession, session, useRemote, clusterId);
return new JobConfig(type,useResult, useSession, session, useRemote, clusterId,maxRowNum);
}
}
......@@ -24,6 +24,8 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
private String type;
private String dialect;
private boolean useResult;
private boolean useChangeLog;
private boolean useAutoCancel;
private boolean statementSet;
private boolean useSession;
private String session;
......@@ -59,7 +61,7 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
);
}
return new JobConfig(
type,useResult, useSession, session, useRemote, clusterId,
type,useResult,useChangeLog,useAutoCancel, useSession, session, useRemote, clusterId,
clusterConfigurationId,jarId, taskId, jobName, fragment,statementSet,
maxRowNum, checkPoint, parallelism,savePointStrategy, savePointPath,config);
}
......
____ __ _ __
/ __ \ / /(_)___ / /__
/ / / // // / __ \/ //_/
/ /_/ // // / / / / <
/_____//__/_/_/ /_/_/|_|
______ _ __
|_ _ `. (_) [ | _
| | `. \ __ _ .--. | | / ] _ __
| | | |[ | [ `.-. | | '' < [ \ [ ]
_| |_.' / | | | | | | | |`\ \ \ '/ /
|______.' [___][___||__][__| \_][\_: /
\__.'
/* http://127.0.0.1:8888/openapi/executeSql */
{
/* required-start */
"type":"kubernetes-session",
"address":"127.0.0.1:8081",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useResult":false,
"useStatementSet":false,
"useChangeLog":false,
"useAutoCancel":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}
\ No newline at end of file
......@@ -6,6 +6,8 @@
/* required-end */
/* default-start */
"useResult":false,
"useChangeLog":false,
"useAutoCancel":false,
"useStatementSet":false,
"fragment":false,
"maxRowNum":100,
......
......@@ -8,6 +8,8 @@
/* default-start */
"useResult":false,
"useStatementSet":false,
"useChangeLog":false,
"useAutoCancel":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
......
......@@ -8,6 +8,8 @@
/* default-start */
"useResult":false,
"useStatementSet":false,
"useChangeLog":false,
"useAutoCancel":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
......
......@@ -27,6 +27,8 @@ public class JobConfig {
// flink run mode
private String type;
private boolean useResult;
private boolean useChangeLog;
private boolean useAutoCancel;
private boolean useSession;
private String session;
private boolean useRemote;
......@@ -61,12 +63,14 @@ public class JobConfig {
this.config = config;
}
public JobConfig(String type, boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
public JobConfig(String type, boolean useResult, boolean useChangeLog, boolean useAutoCancel, boolean useSession, String session, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint, Integer parallelism,
Integer savePointStrategyValue, String savePointPath, Map<String,String> config) {
this.type = type;
this.useResult = useResult;
this.useChangeLog = useChangeLog;
this.useAutoCancel = useAutoCancel;
this.useSession = useSession;
this.session = session;
this.useRemote = useRemote;
......@@ -85,12 +89,14 @@ public class JobConfig {
this.config = config;
}
public JobConfig(String type, boolean useResult, boolean useSession, String session, boolean useRemote, String address,
public JobConfig(String type, boolean useResult, boolean useChangeLog,boolean useAutoCancel, boolean useSession, String session, boolean useRemote, String address,
String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint, Integer parallelism,
Integer savePointStrategyValue, String savePointPath, Map<String,String> config, GatewayConfig gatewayConfig) {
this.type = type;
this.useResult = useResult;
this.useChangeLog = useChangeLog;
this.useAutoCancel = useAutoCancel;
this.useSession = useSession;
this.session = session;
this.useRemote = useRemote;
......@@ -107,13 +113,14 @@ public class JobConfig {
this.gatewayConfig = gatewayConfig;
}
public JobConfig(String type,boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId) {
public JobConfig(String type,boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId, Integer maxRowNum) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
this.session = session;
this.useRemote = useRemote;
this.clusterId = clusterId;
this.maxRowNum = maxRowNum;
}
public JobConfig(String type,boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
......
......@@ -55,7 +55,6 @@ public class JobManager {
private static final Logger logger = LoggerFactory.getLogger(JobManager.class);
private JobHandler handler;
private Integer maxRowNum = 100;
private EnvironmentSetting environmentSetting;
private ExecutorSetting executorSetting;
private JobConfig config;
......@@ -268,7 +267,7 @@ public class JobManager {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
}
if (config.isUseResult()) {
IResult result = ResultBuilder.build(SqlType.INSERT, maxRowNum, true).getResult(tableResult);
IResult result = ResultBuilder.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(),config.isUseAutoCancel()).getResult(tableResult);
job.setResult(result);
}
}
......@@ -300,7 +299,7 @@ public class JobManager {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
}
if (config.isUseResult()) {
IResult result = ResultBuilder.build(item.getType(), maxRowNum, true).getResult(tableResult);
IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(),config.isUseAutoCancel()).getResult(tableResult);
job.setResult(result);
}
}
......@@ -351,7 +350,7 @@ public class JobManager {
}
LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(newStatement);
IResult result = ResultBuilder.build(operationType, maxRowNum, false).getResult(tableResult);
IResult result = ResultBuilder.build(operationType, config.getMaxRowNum(), false,false).getResult(tableResult);
result.setStartTime(startTime);
return result;
}
......
......@@ -11,10 +11,10 @@ import org.apache.flink.table.api.TableResult;
**/
public interface ResultBuilder {
static ResultBuilder build(SqlType operationType, Integer maxRowNum, boolean isChangeLog){
static ResultBuilder build(SqlType operationType, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel){
switch (operationType){
case SELECT:
return new SelectResultBuilder(maxRowNum,isChangeLog);
return new SelectResultBuilder(maxRowNum,isChangeLog,isAutoCancel);
case SHOW:
case DESCRIBE:
return new ShowResultBuilder(false);
......
......@@ -20,12 +20,14 @@ public class ResultRunnable implements Runnable {
private TableResult tableResult;
private Integer maxRowNum;
private boolean isChangeLog;
private boolean isAutoCancel;
private String nullColumn = "";
public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean isChangeLog) {
public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) {
this.tableResult = tableResult;
this.maxRowNum = maxRowNum;
this.isChangeLog = isChangeLog;
this.isAutoCancel = isAutoCancel;
}
@Override
......@@ -36,9 +38,9 @@ public class ResultRunnable implements Runnable {
ResultPool.put(new SelectResult(jobId, new ArrayList<>(), new LinkedHashSet<>()));
}
try {
if(isChangeLog) {
if (isChangeLog) {
catchChangLog(ResultPool.get(jobId));
}else{
} else {
catchData(ResultPool.get(jobId));
}
} catch (Exception e) {
......@@ -56,6 +58,9 @@ public class ResultRunnable implements Runnable {
Iterator<Row> it = tableResult.collect();
while (it.hasNext()) {
if (rows.size() >= maxRowNum) {
if (isAutoCancel && tableResult.getJobClient().isPresent()) {
tableResult.getJobClient().get().cancel();
}
break;
}
Map<String, Object> map = new LinkedHashMap<>();
......@@ -64,9 +69,9 @@ public class ResultRunnable implements Runnable {
for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i);
if (field == null) {
map.put(columns.get(i+1), nullColumn);
map.put(columns.get(i + 1), nullColumn);
} else {
map.put(columns.get(i+1), StringUtils.arrayAwareToString(field));
map.put(columns.get(i + 1), StringUtils.arrayAwareToString(field));
}
}
rows.add(map);
......@@ -95,7 +100,7 @@ public class ResultRunnable implements Runnable {
}
if (RowKind.UPDATE_BEFORE == row.getKind() || RowKind.DELETE == row.getKind()) {
rows.remove(map);
}else {
} else {
rows.add(map);
}
}
......
......@@ -19,17 +19,19 @@ public class SelectResultBuilder implements ResultBuilder {
private Integer maxRowNum;
private boolean isChangeLog;
private boolean isAutoCancel;
public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog) {
public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) {
this.maxRowNum = maxRowNum;
this.isChangeLog = isChangeLog;
this.isAutoCancel = isAutoCancel;
}
@Override
public IResult getResult(TableResult tableResult) {
if (tableResult.getJobClient().isPresent()) {
String jobId = tableResult.getJobClient().get().getJobID().toHexString();
ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, isChangeLog);
ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, isChangeLog,isAutoCancel);
Thread thread = new Thread(runnable, jobId);
thread.start();
return SelectResult.buildSuccess(jobId);
......
......@@ -26,7 +26,7 @@ public class JobManagerTest {
@Test
public void cancelJobSelect(){
JobConfig config = new JobConfig("session-yarn",true, true, "s1", true, 2,
JobConfig config = new JobConfig("session-yarn",true,true, true,true, "s1", true, 2,
null, null,null, "测试", false,false, 100, 0,
1, 0,null,new HashMap<>());
if(config.isUseRemote()) {
......
......@@ -12,7 +12,8 @@ public enum GatewayType {
LOCAL("l","local"),STANDALONE("s","standalone"),
YARN_SESSION("ys","yarn-session"),YARN_APPLICATION("ya","yarn-application"),
YARN_PER_JOB("ypj","yarn-per-job"),KUBERNETES_APPLICATION("ka","kubernetes-application");
YARN_PER_JOB("ypj","yarn-per-job"),KUBERNETES_SESSION("ks","kubernetes-session")
,KUBERNETES_APPLICATION("ka","kubernetes-application");
private String value;
private String longValue;
......
import {connect} from "umi";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {
Form, InputNumber, Input, Switch, Select, Tag, Row, Col, Divider, Tooltip, Button, Badge,
Form, InputNumber,Switch, Row, Col, Tooltip, Button, Badge,
Typography
} from "antd";
import {InfoCircleOutlined,PlusOutlined,MinusSquareOutlined} from "@ant-design/icons";
import {InfoCircleOutlined,MinusSquareOutlined} from "@ant-design/icons";
import styles from "./index.less";
import {useEffect, useState} from "react";
import {showTables} from "@/components/Studio/StudioEvent/DDL";
import { Scrollbars } from 'react-custom-scrollbars';
const { Option } = Select;
const { Text } = Typography;
const StudioConfig = (props: any) => {
......@@ -68,6 +65,17 @@ const StudioConfig = (props: any) => {
/>
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="打印流" className={styles.form_item} name="useChangeLog" valuePropName="checked"
tooltip={{ title: '开启打印流,将同步运行并返回含有 op 信息的 ChangeLog,默认不开启且返回最终结果 Table', icon: <InfoCircleOutlined /> }}
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
</Row>
<Row>
<Col span={12}>
<Form.Item
label="最大行数" className={styles.form_item} name="maxRowNum"
......@@ -76,6 +84,15 @@ const StudioConfig = (props: any) => {
<InputNumber min={1} max={9999} defaultValue={100} />
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="自动停止" className={styles.form_item} name="useAutoCancel" valuePropName="checked"
tooltip={{ title: '开启自动停止,将在捕获最大行数记录后自动停止任务', icon: <InfoCircleOutlined /> }}
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
</Row>
<Form.Item
label="远程执行" className={styles.form_item} name="useRemote" valuePropName="checked"
......
......@@ -154,6 +154,8 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
maxRowNum: 100,
jobName:node!.name,
useResult:false,
useChangeLog:false,
useAutoCancel:false,
useSession:false,
useRemote:true,
...result.datas,
......
......@@ -88,6 +88,8 @@ export type TaskType = {
maxRowNum: number;
jobName: string;
useResult: boolean;
useChangeLog: boolean;
useAutoCancel: boolean;
useSession: boolean;
useRemote: boolean;
};
......@@ -229,6 +231,8 @@ const Model: ModelType = {
alias: '草稿',
dialect: 'FlinkSql',
useResult:true,
useChangeLog:false,
useAutoCancel:false,
useSession:false,
useRemote:false,
},
......@@ -273,6 +277,8 @@ const Model: ModelType = {
alias: '草稿',
dialect: 'FlinkSql',
useResult:true,
useChangeLog:false,
useAutoCancel:false,
useSession:false,
useRemote:false,
},
......
......@@ -536,7 +536,7 @@ export default (): React.ReactNode => {
<Link>新增 Yarn 的 Kerboros 验证</Link>
</li>
<li>
<Link>新增 ChangLog 和 Table 的查询实现</Link>
<Link>新增 ChangLog 和 Table 的查询及自动停止实现</Link>
</li>
<li>
<Link>修改项目名为 Dinky 以及图标</Link>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment