Commit af1f18f9 authored by wenmo's avatar wenmo

新增openAPI的执行Jar、任务停止和SavePoint接口

parent 60ca28fd
......@@ -319,6 +319,12 @@ AGG BY TOP2(value) as (value,rank);
[SpringBoot]()
## 致谢
感谢 [JetBrains](https://www.jetbrains.com/?from=dlink) 提供的免费开源 License 赞助
[![JetBrains](https://gitee.com/DataLinkDC/dlink/raw/main/dlink-doc/images/main/jetbrains.svg)](https://www.jetbrains.com/?from=dlink)
## 近期计划
1.支持同时托管多版本的Flink实例
......
......@@ -130,21 +130,6 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
</dependency>
<!--<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<scope>provided</scope>
</dependency>-->
</dependencies>
<build>
<plugins>
......
package com.dlink.controller;
import com.dlink.common.result.Result;
import com.dlink.dto.APIExecuteSqlDTO;
import com.dlink.dto.APIExplainSqlDTO;
import com.dlink.dto.*;
import com.dlink.service.APIService;
import com.dlink.service.StudioService;
import com.dlink.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
......@@ -24,6 +24,13 @@ public class APIController {
private APIService apiService;
@Autowired
private StudioService studioService;
@Autowired
private TaskService taskService;
@GetMapping(value = "/submitTask")
public Result submitTask(@RequestParam Integer id) {
return Result.succeed(taskService.submitByTaskId(id),"执行成功");
}
@PostMapping("/executeSql")
public Result executeSql(@RequestBody APIExecuteSqlDTO apiExecuteSqlDTO) {
......@@ -49,4 +56,19 @@ public class APIController {
public Result getJobData(@RequestParam String jobId) {
return Result.succeed(studioService.getJobData(jobId),"获取成功");
}
@PostMapping("/cancel")
public Result cancel(@RequestBody APICancelDTO apiCancelDTO) {
return Result.succeed(apiService.cancel(apiCancelDTO),"执行成功");
}
@PostMapping("/savepoint")
public Result savepoint(@RequestBody APISavePointDTO apiSavePointDTO) {
return Result.succeed(apiService.savepoint(apiSavePointDTO),"执行成功");
}
@PostMapping("/executeJar")
public Result executeJar(@RequestBody APIExecuteJarDTO apiExecuteJarDTO) {
return Result.succeed(apiService.executeJar(apiExecuteJarDTO),"执行成功");
}
}
......@@ -105,13 +105,5 @@ public class TaskController {
Task task = taskService.getTaskInfoById(id);
return Result.succeed(task,"获取成功");
}
/**
* 提交作业
*/
/*@GetMapping(value = "/submitApplication")
public Result submitApplicationByTaskId(@RequestParam Integer id) {
return taskService.submitApplicationByTaskId(id);
}*/
}
package com.dlink.dto;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.job.JobConfig;
import lombok.Getter;
import lombok.Setter;
/**
* APICancelDTO
*
* @author wenmo
* @since 2021/12/12 18:53
*/
@Getter
@Setter
public class APICancelDTO {
private String jobId;
private String address;
private GatewayConfig gatewayConfig;
public JobConfig getJobConfig() {
JobConfig config = new JobConfig();
config.setAddress(address);
config.setGatewayConfig(gatewayConfig);
return config;
}
}
package com.dlink.dto;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.job.JobConfig;
import lombok.Getter;
import lombok.Setter;
/**
* APIExecuteJarDTO
*
* @author wenmo
* @since 2021/12/12 19:46
*/
@Getter
@Setter
public class APIExecuteJarDTO {
private String type;
private String jobName;
private String savePointPath;
private GatewayConfig gatewayConfig;
public JobConfig getJobConfig() {
JobConfig config = new JobConfig();
config.setType(type);
config.setJobName(jobName);
config.setSavePointStrategy(SavePointStrategy.CUSTOM);
config.setSavePointPath(savePointPath);
config.setGatewayConfig(gatewayConfig);
return config;
}
}
package com.dlink.dto;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.job.JobConfig;
import lombok.Getter;
import lombok.Setter;
/**
* APISavePointDTO
*
* @author wenmo
* @since 2021/12/12 19:09
*/
@Getter
@Setter
public class APISavePointDTO {
private String jobId;
private String savePointType;
private String savePoint;
private String address;
private GatewayConfig gatewayConfig;
public JobConfig getJobConfig() {
JobConfig config = new JobConfig();
config.setAddress(address);
config.setGatewayConfig(gatewayConfig);
return config;
}
}
package com.dlink.service;
import com.dlink.dto.APIExecuteSqlDTO;
import com.dlink.dto.APIExplainSqlDTO;
import com.dlink.dto.*;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.result.APIJobResult;
import com.dlink.result.ExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
......@@ -21,4 +21,10 @@ public interface APIService {
ObjectNode getJobPlan(APIExplainSqlDTO apiExplainSqlDTO);
ObjectNode getStreamGraph(APIExplainSqlDTO apiExplainSqlDTO);
boolean cancel(APICancelDTO apiCancelDTO);
SavePointResult savepoint(APISavePointDTO apiSavePointDTO);
APIJobResult executeJar(APIExecuteJarDTO apiExecuteJarDTO);
}
package com.dlink.service.impl;
import com.dlink.dto.APIExecuteSqlDTO;
import com.dlink.dto.APIExplainSqlDTO;
import com.dlink.assertion.Asserts;
import com.dlink.dto.*;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
......@@ -15,6 +16,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* APIServiceImpl
*
......@@ -68,4 +71,32 @@ public class APIServiceImpl implements APIService {
RunTimeUtil.recovery(jobManager);
return streamGraph;
}
@Override
public boolean cancel(APICancelDTO apiCancelDTO) {
JobConfig jobConfig = apiCancelDTO.getJobConfig();
JobManager jobManager = JobManager.build(jobConfig);
boolean cancel = jobManager.cancel(apiCancelDTO.getJobId());
RunTimeUtil.recovery(jobManager);
return cancel;
}
@Override
public SavePointResult savepoint(APISavePointDTO apiSavePointDTO) {
JobConfig jobConfig = apiSavePointDTO.getJobConfig();
JobManager jobManager = JobManager.build(jobConfig);
SavePointResult savepoint = jobManager.savepoint(apiSavePointDTO.getJobId(), apiSavePointDTO.getSavePointType(), apiSavePointDTO.getSavePoint());
RunTimeUtil.recovery(jobManager);
return savepoint;
}
@Override
public APIJobResult executeJar(APIExecuteJarDTO apiExecuteJarDTO) {
JobConfig config = apiExecuteJarDTO.getJobConfig();
JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeJar();
APIJobResult apiJobResult = APIJobResult.build(jobResult);
RunTimeUtil.recovery(jobManager);
return apiJobResult;
}
}
......@@ -198,7 +198,6 @@ public class StudioServiceImpl implements StudioService {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
}
jobConfig.setUseRestAPI(SystemConfiguration.getInstances().isUseRestAPI());
JobManager jobManager = JobManager.build(jobConfig);
return jobManager.cancel(jobId);
}
......@@ -216,10 +215,9 @@ public class StudioServiceImpl implements StudioService {
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
jobConfig.setTaskId(cluster.getTaskId());
}
jobConfig.setUseRestAPI(SystemConfiguration.getInstances().isUseRestAPI());
JobManager jobManager = JobManager.build(jobConfig);
jobManager.setUseGateway(true);
SavePointResult savePointResult = jobManager.savepoint(jobId, savePointType);
SavePointResult savePointResult = jobManager.savepoint(jobId, savePointType,null);
if(Asserts.isNotNull(savePointResult)){
for(JobInfo item : savePointResult.getJobInfos()){
if(Asserts.isEqualsIgnoreCase(jobId,item.getJobId())){
......
/* http://127.0.0.1:8888/openapi/explainSql */
{
/* required-start */
"jobId":"195352b0a4518e16699983a13205f059",
/* required-end */
/* custom-start */
"address":"127.0.0.1:8081",
"gatewayConfig":{
"clusterConfig":{
"appId":"application_1637739262398_0032",
"flinkConfigPath":"/opt/src/flink-1.13.3_conf/conf",
"flinkLibPath":"hdfs:///flink13/lib/flinklib",
"yarnConfigPath":"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
}
}
/* custom-start */
}
\ No newline at end of file
/* http://127.0.0.1:8888/openapi/executeJar */
{
/* required-start */
"type":"yarn-application",
"gatewayConfig":{
"clusterConfig":{
"flinkConfigPath":"/opt/src/flink-1.13.3_conf/conf",
"flinkLibPath":"hdfs:///flink13/lib/flinklib",
"yarnConfigPath":"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
},
"appConfig":{
"userJarPath":"hdfs:///flink12/jar/currencyAppJar.jar",
"userJarParas":["--id","2774,2775,2776"," --type","dwd"],
"userJarMainAppClass":"com.app.MainApp"
},
"flinkConfig": {
"configuration":{
"parallelism.default": 1
}
}
},
/* required-end */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843"
/* custom-end */
}
\ No newline at end of file
/* http://127.0.0.1:8888/openapi/executeSql */
{
/* required-start */
"type":"local",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useResult":false,
"useStatementSet":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}
\ No newline at end of file
/* http://127.0.0.1:8888/openapi/executeSql */
{
/* required-start */
"type":"standalone",
"address":"127.0.0.1:8081",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useResult":false,
"useStatementSet":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}
\ No newline at end of file
/* http://127.0.0.1:8888/openapi/executeSql */
{
/* required-start */
"type":"yarn-session", // standalone|local
"address":"10.1.51.24:8081",
"type":"yarn-session",
"address":"127.0.0.1:8081",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
......
/* http://127.0.0.1:8888/openapi/getJobData?jobId=195352b0a4518e16699983a13205f059 */
/* http://127.0.0.1:8888/openapi/explainSql */
{
/* required-start */
"jobId":"195352b0a4518e16699983a13205f059",
"savePointType":"trigger", // trigger | stop | cancel
/* required-end */
/* custom-start */
"savePoint":"195352b0a4518e16699983a13205f059",
"address":"127.0.0.1:8081",
"gatewayConfig":{
"clusterConfig":{
"appId":"application_1637739262398_0032",
"flinkConfigPath":"/opt/src/flink-1.13.3_conf/conf",
"flinkLibPath":"hdfs:///flink13/lib/flinklib",
"yarnConfigPath":"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
}
}
/* custom-start */
}
\ No newline at end of file
/* http://127.0.0.1:8888/openapi/submitTask?id=1 */
\ No newline at end of file
......@@ -24,6 +24,7 @@ public class SystemConfiguration {
add(systemConfiguration.sqlSubmitJarParas);
add(systemConfiguration.sqlSubmitJarMainAppClass);
add(systemConfiguration.useRestAPI);
add(systemConfiguration.sqlSeparator);
}};
private Configuration sqlSubmitJarPath = new Configuration(
......@@ -54,6 +55,13 @@ public class SystemConfiguration {
true,
"在运维 Flink 任务时是否使用 RestAPI"
);
private Configuration sqlSeparator = new Configuration(
"sqlSeparator",
"FlinkSQL语句分割符",
ValueType.STRING,
";",
"Flink SQL 的语句分割符"
);
public void setConfiguration(JsonNode jsonNode){
for(Configuration item : CONFIGURATION_LIST){
......@@ -116,6 +124,14 @@ public class SystemConfiguration {
this.useRestAPI.setValue(useRestAPI);
}
public String getSqlSeparator() {
return sqlSeparator.getValue().toString();
}
public void setSqlSeparator(String sqlSeparator) {
this.sqlSeparator.setValue(sqlSeparator);
}
enum ValueType{
STRING,INT,DOUBLE,FLOAT,BOOLEAN,DATE
}
......
......@@ -35,23 +35,25 @@ public class Explainer {
private Executor executor;
private boolean useStatementSet;
private String sqlSeparator = FlinkSQLConstant.SEPARATOR;
private ObjectMapper mapper = new ObjectMapper();
public Explainer(Executor executor) {
this.executor = executor;
}
public Explainer(Executor executor, boolean useStatementSet) {
public Explainer(Executor executor, boolean useStatementSet,String sqlSeparator) {
this.executor = executor;
this.useStatementSet = useStatementSet;
this.sqlSeparator = sqlSeparator;
}
public static Explainer build(Executor executor){
return new Explainer(executor,false);
return new Explainer(executor,false,";");
}
public static Explainer build(Executor executor, boolean useStatementSet){
return new Explainer(executor,useStatementSet);
public static Explainer build(Executor executor, boolean useStatementSet, String sqlSeparator){
return new Explainer(executor,useStatementSet,sqlSeparator);
}
public JobParam pretreatStatements(String[] statements) {
......@@ -77,7 +79,7 @@ public class Explainer {
@Deprecated
public List<SqlExplainResult> explainSqlResult(String statement) {
String[] sqls = SqlUtil.getStatements(statement);
String[] sqls = SqlUtil.getStatements(statement,sqlSeparator);
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
int index = 1;
for (String item : sqls) {
......@@ -121,7 +123,7 @@ public class Explainer {
}
public ExplainResult explainSql(String statement) {
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement,sqlSeparator));
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
int index = 1;
boolean correct = true;
......@@ -211,7 +213,7 @@ public class Explainer {
for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) {
String[] statements = SqlUtil.getStatements(item.getSql());
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator);
for(String str : statements){
strPlans.add(str);
}
......@@ -230,7 +232,7 @@ public class Explainer {
for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) {
String[] statements = SqlUtil.getStatements(item.getSql());
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator);
for(String str : statements){
strPlans.add(str);
}
......
......@@ -42,7 +42,6 @@ public class JobConfig {
private SavePointStrategy savePointStrategy;
private String savePointPath;
private GatewayConfig gatewayConfig;
private boolean useRestAPI;
private Map<String,String> config;
......
......@@ -6,7 +6,6 @@ import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.EnvironmentSetting;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.explainer.Explainer;
import com.dlink.gateway.Gateway;
import com.dlink.gateway.GatewayType;
......@@ -17,6 +16,7 @@ import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.model.SystemConfiguration;
import com.dlink.parser.SqlType;
import com.dlink.result.*;
import com.dlink.session.ExecutorEntity;
......@@ -27,14 +27,12 @@ import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
......@@ -57,6 +55,7 @@ public class JobManager {
private boolean isPlanMode = false;
private boolean useStatementSet = false;
private boolean useRestAPI = false;
private String sqlSeparator = FlinkSQLConstant.SEPARATOR;
private GatewayType runMode = GatewayType.LOCAL;
public JobManager() {
......@@ -94,6 +93,14 @@ public class JobManager {
this.useRestAPI = useRestAPI;
}
public String getSqlSeparator() {
return sqlSeparator;
}
public void setSqlSeparator(String sqlSeparator) {
this.sqlSeparator = sqlSeparator;
}
public JobManager(JobConfig config) {
this.config = config;
}
......@@ -179,7 +186,8 @@ public class JobManager {
handler = JobHandler.build();
}
useStatementSet = config.isUseStatementSet();
useRestAPI = config.isUseRestAPI();
useRestAPI = SystemConfiguration.getInstances().isUseRestAPI();
sqlSeparator = SystemConfiguration.getInstances().getSqlSeparator();
initExecutorSetting();
createExecutorWithSession();
return false;
......@@ -211,7 +219,7 @@ public class JobManager {
ready();
String currentSql = "";
// JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
JobParam jobParam = Explainer.build(executor,useStatementSet).pretreatStatements(SqlUtil.getStatements(statement));
JobParam jobParam = Explainer.build(executor,useStatementSet,sqlSeparator).pretreatStatements(SqlUtil.getStatements(statement,sqlSeparator));
try {
for (StatementParam item : jobParam.getDdl()) {
currentSql = item.getValue();
......@@ -223,7 +231,7 @@ public class JobManager {
for (StatementParam item : jobParam.getTrans()) {
inserts.add(item.getValue());
}
currentSql = String.join(FlinkSQLConstant.SEPARATOR, inserts);
currentSql = String.join(sqlSeparator, inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = null;
if (GatewayType.YARN_APPLICATION.equals(runMode)) {
......@@ -242,7 +250,7 @@ public class JobManager {
}
}
if (inserts.size() > 0) {
currentSql = String.join(FlinkSQLConstant.SEPARATOR, inserts);
currentSql = String.join(sqlSeparator, inserts);
TableResult tableResult = executor.executeStatementSet(inserts);
if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
......@@ -258,7 +266,7 @@ public class JobManager {
inserts.add(item.getValue());
break;
}
currentSql = String.join(FlinkSQLConstant.SEPARATOR, inserts);
currentSql = String.join(sqlSeparator, inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = null;
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())) {
......@@ -316,7 +324,7 @@ public class JobManager {
}
public IResult executeDDL(String statement) {
String[] statements = SqlUtil.getStatements(statement);
String[] statements = SqlUtil.getStatements(statement,sqlSeparator);
try {
for (String item : statements) {
String newStatement = executor.pretreatStatement(item);
......@@ -363,15 +371,15 @@ public class JobManager {
}
public ExplainResult explainSql(String statement) {
return Explainer.build(executor,useStatementSet).explainSql(statement);
return Explainer.build(executor,useStatementSet,sqlSeparator).explainSql(statement);
}
public ObjectNode getStreamGraph(String statement) {
return Explainer.build(executor,useStatementSet).getStreamGraph(statement);
return Explainer.build(executor,useStatementSet,sqlSeparator).getStreamGraph(statement);
}
public String getJobPlanJson(String statement) {
return Explainer.build(executor,useStatementSet).getJobPlanInfo(statement).getJsonPlan();
return Explainer.build(executor,useStatementSet,sqlSeparator).getJobPlanInfo(statement).getJsonPlan();
}
public boolean cancel(String jobId) {
......@@ -390,11 +398,11 @@ public class JobManager {
}
}
public SavePointResult savepoint(String jobId,String savePointType) {
public SavePointResult savepoint(String jobId,String savePointType,String savePoint) {
if (useGateway && !useRestAPI) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(),
savePointType, null));
return Gateway.build(config.getGatewayConfig()).savepointJob();
return Gateway.build(config.getGatewayConfig()).savepointJob(savePoint);
} else {
return FlinkAPI.build(config.getAddress()).savepoints(jobId,savePointType);
}
......
<svg xmlns="http://www.w3.org/2000/svg" width="120.1" height="130.2"><linearGradient id="a" gradientUnits="userSpaceOnUse" x1="31.841" y1="120.558" x2="110.24" y2="73.24"><stop offset="0" stop-color="#fcee39"/><stop offset="1" stop-color="#f37b3d"/></linearGradient><path d="M118.6 71.8c.9-.8 1.4-1.9 1.5-3.2.1-2.6-1.8-4.7-4.4-4.9-1.2-.1-2.4.4-3.3 1.1l-83.8 45.9c-1.9.8-3.6 2.2-4.7 4.1-2.9 4.8-1.3 11 3.6 13.9 3.4 2 7.5 1.8 10.7-.2.2-.2.5-.3.7-.5l78-54.8c.4-.3 1.5-1.1 1.7-1.4z" fill="url(#a)"/><linearGradient id="b" gradientUnits="userSpaceOnUse" x1="48.361" y1="6.908" x2="119.918" y2="69.555"><stop offset="0" stop-color="#ef5a6b"/><stop offset=".57" stop-color="#f26f4e"/><stop offset="1" stop-color="#f37b3d"/></linearGradient><path d="M118.8 65.1L55 2.5C53.6 1 51.6 0 49.3 0c-4.3 0-7.7 3.5-7.7 7.7 0 2.1.8 3.9 2.1 5.3.4.4.8.7 1.2 1l67.4 57.7c.8.7 1.8 1.2 3 1.3 2.6.1 4.7-1.8 4.9-4.4 0-1.3-.5-2.6-1.4-3.5z" fill="url(#b)"/><linearGradient id="c" gradientUnits="userSpaceOnUse" x1="52.947" y1="63.641" x2="10.538" y2="37.156"><stop offset="0" stop-color="#7c59a4"/><stop offset=".385" stop-color="#af4c92"/><stop offset=".765" stop-color="#dc4183"/><stop offset=".957" stop-color="#ed3d7d"/></linearGradient><path d="M57.1 59.5c-.1 0-39.4-31-40.2-31.5l-1.8-.9c-5.8-2.2-12.2.8-14.4 6.6-1.9 5.1.2 10.7 4.6 13.4.7.4 1.3.7 2 .9.4.2 45.4 18.8 45.4 18.8 1.8.8 3.9.3 5.1-1.2 1.5-1.9 1.2-4.6-.7-6.1z" fill="url(#c)"/><linearGradient id="d" gradientUnits="userSpaceOnUse" x1="52.174" y1="3.702" x2="10.771" y2="37.897"><stop offset="0" stop-color="#ef5a6b"/><stop offset=".364" stop-color="#ee4e72"/><stop offset="1" stop-color="#ed3d7d"/></linearGradient><path d="M49.3 0c-1.7 0-3.3.6-4.6 1.5L4.9 28.3c-.1.1-.2.1-.2.2h-.1c-1.7 1.2-3.1 3-3.9 5.1-2.2 5.8.8 12.3 6.6 14.4 3.6 1.4 7.5.7 10.4-1.4.7-.5 1.3-1 1.8-1.6l34.6-31.2c1.8-1.4 3-3.6 3-6.1 0-4.2-3.5-7.7-7.8-7.7z" fill="url(#d)"/><path d="M34.6 37.4h51v51h-51z"/><path fill="#fff" d="M39 78.8h19.1V82H39z"/><g fill="#fff"><path d="M38.8 50.8l1.5-1.4c.4.5.8.8 1.3.8.6 0 .9-.4.9-1.2v-5.3h2.3V49c0 1-.3 1.8-.8 2.3-.5.5-1.3.8-2.3.8-1.5.1-2.3-.5-2.9-1.3zm6.5-7H52v1.9h-4.4V47h4v1.8h-4v1.3h4.5v2h-6.7l-.1-8.3zm9.7 2h-2.5v-2h7.3v2h-2.5v6.3H55v-6.3zM39 54h4.3c1 0 1.8.3 2.3.7.3.3.5.8.5 1.4 0 1-.5 1.5-1.3 1.9 1 .3 1.6.9 1.6 2 0 1.4-1.2 2.3-3.1 2.3H39V54zm4.8 2.6c0-.5-.4-.7-1-.7h-1.5v1.5h1.4c.7-.1 1.1-.3 1.1-.8zM43 59h-1.8v1.5H43c.7 0 1.1-.3 1.1-.8s-.4-.7-1.1-.7zm3.8-5h3.9c1.3 0 2.1.3 2.7.9.5.5.7 1.1.7 1.9 0 1.3-.7 2.1-1.7 2.6l2 2.9h-2.6l-1.7-2.5h-1v2.5h-2.3V54zm3.8 4c.8 0 1.2-.4 1.2-1 0-.7-.5-1-1.2-1h-1.5v2h1.5z"/><path d="M56.8 54H59l3.5 8.4H60l-.6-1.5h-3.2l-.6 1.5h-2.4l3.6-8.4zm2 5l-.9-2.3L57 59h1.8zm4-5h2.3v8.3h-2.3V54zm2.9 0h2.1l3.4 4.4V54h2.3v8.3h-2L68 57.8v4.6h-2.3V54zm8 7.1l1.3-1.5c.8.7 1.7 1 2.7 1 .6 0 1-.2 1-.6 0-.4-.3-.5-1.4-.8-1.8-.4-3.1-.9-3.1-2.6 0-1.5 1.2-2.7 3.2-2.7 1.4 0 2.5.4 3.4 1.1l-1.2 1.6c-.8-.5-1.6-.8-2.3-.8-.6 0-.8.2-.8.5 0 .4.3.5 1.4.8 1.9.4 3.1 1 3.1 2.6 0 1.7-1.3 2.7-3.4 2.7-1.5.1-2.9-.4-3.9-1.3z"/></g></svg>
\ No newline at end of file
......@@ -11,11 +11,11 @@ import com.dlink.constant.FlinkSQLConstant;
*/
public class SqlUtil {
public static String[] getStatements(String sql){
public static String[] getStatements(String sql,String sqlSeparator){
if(Asserts.isNullString(sql)){
return new String[0];
}
return sql.split(FlinkSQLConstant.SEPARATOR);
return sql.split(sqlSeparator);
}
public static String removeNote(String sql){
......
......@@ -55,8 +55,12 @@ public interface Gateway {
SavePointResult savepointCluster();
SavePointResult savepointCluster(String savePoint);
SavePointResult savepointJob();
SavePointResult savepointJob(String savePoint);
TestResult test();
}
......@@ -93,6 +93,10 @@ public abstract class YarnGateway extends AbstractGateway {
}
public SavePointResult savepointCluster(){
return savepointCluster(null);
}
public SavePointResult savepointCluster(String savePoint){
if(Asserts.isNull(yarnClient)){
init();
}
......@@ -123,7 +127,7 @@ public abstract class YarnGateway extends AbstractGateway {
jobInfo.setStatus(JobInfo.JobStatus.RUN);
jobInfos.add(jobInfo);
}
runSavePointJob(jobInfos,clusterClient);
runSavePointJob(jobInfos,clusterClient,savePoint);
result.setJobInfos(jobInfos);
}catch (Exception e){
e.printStackTrace();
......@@ -134,6 +138,10 @@ public abstract class YarnGateway extends AbstractGateway {
}
public SavePointResult savepointJob(){
return savepointJob(null);
}
public SavePointResult savepointJob(String savePoint){
if(Asserts.isNull(yarnClient)){
init();
}
......@@ -163,7 +171,7 @@ public abstract class YarnGateway extends AbstractGateway {
applicationId).getClusterClient()){
List<JobInfo> jobInfos = new ArrayList<>();
jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(),JobInfo.JobStatus.FAIL));
runSavePointJob(jobInfos,clusterClient);
runSavePointJob(jobInfos,clusterClient,savePoint);
result.setJobInfos(jobInfos);
}catch (Exception e){
e.printStackTrace();
......@@ -173,18 +181,7 @@ public abstract class YarnGateway extends AbstractGateway {
return result;
}
private void runSavePointJob(List<JobInfo> jobInfos,ClusterClient<ApplicationId> clusterClient) throws Exception{
String savePoint = null;
/*String savePoint = FlinkConfig.DEFAULT_SAVEPOINT_PREFIX;
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())){
savePoint = config.getFlinkConfig().getSavePoint();
}
if(Asserts.isNotNull(config.getTaskId())){
if(savePoint.lastIndexOf("/")!=savePoint.length()){
savePoint = savePoint + "/";
}
savePoint = savePoint + config.getTaskId();
}*/
private void runSavePointJob(List<JobInfo> jobInfos,ClusterClient<ApplicationId> clusterClient,String savePoint) throws Exception{
for( JobInfo jobInfo: jobInfos){
if(ActionType.CANCEL== config.getFlinkConfig().getAction()){
clusterClient.cancel(JobID.fromHexString(jobInfo.getJobId()));
......
......@@ -9,12 +9,13 @@ type FlinkConfigProps = {
sqlSubmitJarParas: SettingsStateType['sqlSubmitJarParas'];
sqlSubmitJarMainAppClass: SettingsStateType['sqlSubmitJarMainAppClass'];
useRestAPI: SettingsStateType['useRestAPI'];
sqlSeparator: SettingsStateType['sqlSeparator'];
dispatch: any;
};
const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
const {sqlSubmitJarPath, sqlSubmitJarParas, sqlSubmitJarMainAppClass,useRestAPI, dispatch} = props;
const {sqlSubmitJarPath, sqlSubmitJarParas, sqlSubmitJarMainAppClass,useRestAPI,sqlSeparator, dispatch} = props;
const [editName, setEditName] = useState<string>('');
const [formValues, setFormValues] = useState(props);
const [form] = Form.useForm();
......@@ -73,6 +74,18 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
<Switch checkedChildren="启用" unCheckedChildren="禁用"
checked={useRestAPI}
/></Form.Item>],
},{
title: 'FlinkSQL语句分割符',
description: (
editName!='sqlSeparator'?
(sqlSeparator?sqlSeparator:'未设置'):(<Input
id='sqlSeparator'
defaultValue={sqlSeparator}
onChange={onChange}
placeholder=";" />)),
actions: editName!='sqlSeparator'?[<a onClick={({}) => handleEditClick('sqlSeparator')}>修改</a>]:
[<a onClick={({}) => handleSaveClick('sqlSeparator')}>保存</a>,
<a onClick={({}) => handleCancelClick()}>取消</a>],
},
];
......@@ -134,4 +147,5 @@ export default connect(({Settings}: { Settings: SettingsStateType }) => ({
sqlSubmitJarParas: Settings.sqlSubmitJarParas,
sqlSubmitJarMainAppClass: Settings.sqlSubmitJarMainAppClass,
useRestAPI: Settings.useRestAPI,
sqlSeparator: Settings.sqlSeparator,
}))(FlinkConfigView);
import {Effect, Reducer} from "umi";
export type SettingsStateType = {
sqlSubmitJarPath:string,
sqlSubmitJarParas:string,
sqlSubmitJarMainAppClass:string,
useRestAPI:boolean,
sqlSubmitJarPath: string,
sqlSubmitJarParas: string,
sqlSubmitJarMainAppClass: string,
useRestAPI: boolean,
sqlSeparator: string,
};
export type ModelType = {
......
......@@ -475,6 +475,9 @@ export default (): React.ReactNode => {
<li>
<Link>新增 OpenAPI 的执行sql、校验sql、获取计划图、获取StreamGraph、获取预览数据接口</Link>
</li>
<li>
<Link>新增 OpenAPI 的执行Jar、停止、SavePoint接口</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment