Commit a5ba689d authored by wenmo's avatar wenmo

优化执行逻辑,openAPI提供

parent fdbab0fa
......@@ -19,6 +19,7 @@ public class SaTokenConfigure implements WebMvcConfigurer {
// 注册Sa-Token的路由拦截器
registry.addInterceptor(new SaRouteInterceptor())
.addPathPatterns("/**")
.excludePathPatterns("/api/login");
.excludePathPatterns("/api/login")
.excludePathPatterns("/openapi/**");
}
}
package com.dlink.controller;
import com.dlink.common.result.Result;
import com.dlink.dto.APIExecuteSqlDTO;
import com.dlink.job.JobResult;
import com.dlink.service.APIService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* APIController
*
* @author wenmo
* @since 2021/12/11 21:44
*/
@Slf4j
@RestController
@RequestMapping("/openapi")
public class APIController {
@Autowired
private APIService apiService;
@PostMapping("/executeSql")
public Result executeSql(@RequestBody APIExecuteSqlDTO apiExecuteSqlDTO) {
return Result.succeed(apiService.executeSql(apiExecuteSqlDTO),"执行成功");
}
}
package com.dlink.dto;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.HashMap;
import java.util.Map;
/**
* APIExecuteSqlDTO
*
* @author wenmo
* @since 2021/12/11 21:50
*/
@Getter
@Setter
public class APIExecuteSqlDTO {
// RUN_MODE
private String type;
private boolean useResult = false;
private boolean useStatementSet = false;
private String address;
private boolean fragment = false;
private String statement;
private String jobName;
private Integer maxRowNum = 100;
private Integer checkPoint = 0;
private Integer parallelism;
private String savePointPath;
private Map<String, String> configuration;
private GatewayConfig gatewayConfig;
private static final ObjectMapper mapper = new ObjectMapper();
public JobConfig getJobConfig() {
Integer savePointStrategy = 0;
if (Asserts.isNotNullString(savePointPath)) {
savePointStrategy = 3;
}
return new JobConfig(
type, useResult, false, null, true, address, jobName,
fragment, useStatementSet, maxRowNum, checkPoint, parallelism, savePointStrategy,
savePointPath, configuration, gatewayConfig);
}
}
package com.dlink.result;
import com.dlink.job.Job;
import com.dlink.job.JobResult;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
/**
* APIJobResult
*
* @author wenmo
* @since 2021/12/11 22:49
*/
@Getter
@Setter
public class APIJobResult {
private String jobManagerAddress;
private Job.JobStatus status;
private boolean success;
private String jobId;
private String error;
private LocalDateTime startTime;
private LocalDateTime endTime;
public APIJobResult(String jobManagerAddress, Job.JobStatus status, boolean success, String jobId, String error, LocalDateTime startTime, LocalDateTime endTime) {
this.jobManagerAddress = jobManagerAddress;
this.status = status;
this.success = success;
this.jobId = jobId;
this.error = error;
this.startTime = startTime;
this.endTime = endTime;
}
public static APIJobResult build(JobResult jobResult){
return new APIJobResult(jobResult.getJobManagerAddress(),jobResult.getStatus(),jobResult.isSuccess(),
jobResult.getJobId(),jobResult.getError(),jobResult.getStartTime(),jobResult.getEndTime());
}
}
package com.dlink.service;
import com.dlink.dto.APIExecuteSqlDTO;
import com.dlink.result.APIJobResult;
/**
* APIService
*
* @author wenmo
* @since 2021/12/11 21:45
*/
public interface APIService {
APIJobResult executeSql(APIExecuteSqlDTO apiExecuteSqlDTO);
}
package com.dlink.service.impl;
import com.dlink.dto.APIExecuteSqlDTO;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.result.APIJobResult;
import com.dlink.service.APIService;
import com.dlink.utils.RunTimeUtil;
import org.springframework.stereotype.Service;
/**
* APIServiceImpl
*
* @author wenmo
* @since 2021/12/11 21:46
*/
@Service
public class APIServiceImpl implements APIService {
@Override
public APIJobResult executeSql(APIExecuteSqlDTO apiExecuteSqlDTO) {
JobConfig config = apiExecuteSqlDTO.getJobConfig();
JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeSql(apiExecuteSqlDTO.getStatement());
APIJobResult apiJobResult = APIJobResult.build(jobResult);
RunTimeUtil.recovery(jobManager);
return apiJobResult;
}
}
......@@ -27,6 +27,7 @@ import com.dlink.service.StudioService;
import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
import com.dlink.utils.RunTimeUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -67,7 +68,9 @@ public class StudioServiceImpl implements StudioService {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(studioExecuteDTO.getStatement());
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
RunTimeUtil.recovery(jobManager);
return jobResult;
}
@Override
......@@ -86,7 +89,7 @@ public class StudioServiceImpl implements StudioService {
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.explainSql(studioExecuteDTO.getStatement());
}
......@@ -97,7 +100,7 @@ public class StudioServiceImpl implements StudioService {
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.getStreamGraph(studioExecuteDTO.getStatement());
}
......@@ -108,7 +111,7 @@ public class StudioServiceImpl implements StudioService {
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
JobManager jobManager = JobManager.buildPlanMode(config);
String planJson = jobManager.getJobPlanJson(studioExecuteDTO.getStatement());
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
......
/* http://127.0.0.1:8888/openapi/executeSql */
{
/* required-start */
"type":"yarn-per-job",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
"gatewayConfig":{
"clusterConfig":{
"flinkConfigPath":"/opt/src/flink-1.13.3_conf/conf",
"flinkLibPath":"hdfs:///flink13/lib/flinklib",
"yarnConfigPath":"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop"
},
"flinkConfig": {
"configuration":{
"parallelism.default": 1
}
}
},
/* required-end */
/* default-start */
"useResult":false,
"useStatementSet":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}
\ No newline at end of file
/* http://127.0.0.1:8888/openapi/executeSql */
{
/* required-start */
"type":"yarn-session",
"address":"10.1.51.24:8081",
"statement":"CREATE TABLE Orders (\r\n order_number INT,\r\n price DECIMAL(32,2),\r\n order_time TIMESTAMP(3)\r\n) WITH (\r\n 'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_number.kind' = 'sequence',\r\n 'fields.order_number.start' = '1',\r\n 'fields.order_number.end' = '1000'\r\n);\r\nCREATE TABLE pt (\r\nordertotal INT,\r\nnumtotal INT\r\n) WITH (\r\n 'connector' = 'print'\r\n);\r\ninsert into pt select 1 as ordertotal ,sum(order_number)*2 as numtotal from Orders",
/* required-end */
/* default-start */
"useResult":false,
"useStatementSet":false,
"fragment":false,
"maxRowNum":100,
"checkPoint":0,
"parallelism":1,
/* default-start */
/* custom-start */
"jobName":"openapitest",
"savePointPath":"hdfs://ns/flink/savepoints/savepoint-5f4b8c-4326844a6843",
"configuration":{
"table.exec.resource.default-parallelism":2
}
/* custom-end */
}
\ No newline at end of file
......@@ -35,7 +35,7 @@ public class Job {
private Executor executor;
private boolean useGateway;
enum JobStatus {
public enum JobStatus {
INITIALIZE,
RUNNING,
SUCCESS,
......
......@@ -22,6 +22,7 @@ import java.util.Map;
@Setter
public class JobConfig {
// flink run mode
private String type;
private boolean useResult;
private boolean useSession;
......@@ -50,8 +51,8 @@ public class JobConfig {
public JobConfig(String type, boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId,Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint,
Integer parallelism, Integer savePointStrategyValue, String savePointPath, Map<String,String> config) {
boolean useStatementSet, Integer maxRowNum, Integer checkpoint, Integer parallelism,
Integer savePointStrategyValue, String savePointPath, Map<String,String> config) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
......@@ -72,6 +73,28 @@ public class JobConfig {
this.config = config;
}
public JobConfig(String type, boolean useResult, boolean useSession, String session, boolean useRemote, String address,
String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint, Integer parallelism,
Integer savePointStrategyValue, String savePointPath, Map<String,String> config, GatewayConfig gatewayConfig) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
this.session = session;
this.useRemote = useRemote;
this.address = address;
this.jobName = jobName;
this.useSqlFragment = useSqlFragment;
this.useStatementSet = useStatementSet;
this.maxRowNum = maxRowNum;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.savePointStrategy = SavePointStrategy.get(savePointStrategyValue);
this.savePointPath = savePointPath;
this.config = config;
this.gatewayConfig = gatewayConfig;
}
public JobConfig(String type,boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId) {
this.type = type;
this.useResult = useResult;
......
package com.dlink.utils;
import com.dlink.constant.FlinkHistoryConstant;
import java.util.Date;
import java.util.TimeZone;
public class DateFormatUtil {
/**
* 获取一个日期的0:00:00 时间戳 日期必须大于00:00:00否则返回上一天
*
* @param date
* @return
*/
public static long getZeroTimeStamp(Date date) {
return getZeroTimeStamp(date.getTime());
}
public static long getZeroTimeStamp(Long timestamp) {
timestamp += TimeZone.getDefault().getRawOffset();
return timestamp / FlinkHistoryConstant.ONE_DAY * FlinkHistoryConstant.ONE_DAY - TimeZone.getDefault().getRawOffset();
}
/**
* 获取指定时间 当天的最后一秒 23:59:59 日期必须大于00:00:00 否则返回上一天
* @param date
* @return
*/
public static long getLastTimeStampOfOneday(Date date) {
return getLastTimeStampOfOneday(date.getTime());
}
public static long getLastTimeStampOfOneday(Long timestamp) {
timestamp += TimeZone.getDefault().getRawOffset();
return ( timestamp / FlinkHistoryConstant.ONE_DAY * FlinkHistoryConstant.ONE_DAY + FlinkHistoryConstant.ONE_DAY - 100)- TimeZone.getDefault().getRawOffset();
}
}
......@@ -8,6 +8,7 @@ import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -25,7 +26,8 @@ public class FlinkConfig {
private ActionType action;
private SavePointType savePointType;
private String savePoint;
private List<ConfigPara> configParas;
// private List<ConfigPara> configParas;
private Map<String, String> configuration;
private static final ObjectMapper mapper = new ObjectMapper();
......@@ -33,29 +35,30 @@ public class FlinkConfig {
public FlinkConfig() {
}
public FlinkConfig(List<ConfigPara> configParas) {
this.configParas = configParas;
public FlinkConfig(Map<String, String> configuration) {
this.configuration = configuration;
}
public FlinkConfig(String jobName, String jobId, ActionType action, SavePointType savePointType, String savePoint, List<ConfigPara> configParas) {
public FlinkConfig(String jobName, String jobId, ActionType action, SavePointType savePointType, String savePoint, Map<String, String> configuration) {
this.jobName = jobName;
this.jobId = jobId;
this.action = action;
this.savePointType = savePointType;
this.savePoint = savePoint;
this.configParas = configParas;
this.configuration = configuration;
}
public static FlinkConfig build(Map<String, String> paras){
List<ConfigPara> configParasList = new ArrayList<>();
/*List<ConfigPara> configParasList = new ArrayList<>();
for (Map.Entry<String, String> entry : paras.entrySet()) {
configParasList.add(new ConfigPara(entry.getKey(),entry.getValue()));
}
return new FlinkConfig(configParasList);
}*/
return new FlinkConfig(paras);
}
public static FlinkConfig build(String jobName, String jobId, String actionStr, String savePointTypeStr, String savePoint, String configParasStr){
List<ConfigPara> configParasList = new ArrayList<>();
// List<ConfigPara> configParasList = new ArrayList<>();
Map<String, String> configMap = new HashMap<>();
JsonNode paras = null;
if(Asserts.isNotNullString(configParasStr)) {
try {
......@@ -64,11 +67,12 @@ public class FlinkConfig {
e.printStackTrace();
}
paras.forEach((JsonNode node) -> {
configParasList.add(new ConfigPara(node.get("key").asText(), node.get("value").asText()));
configMap.put(node.get("key").asText(),node.get("value").asText());
// configParasList.add(new ConfigPara(node.get("key").asText(), node.get("value").asText()));
}
);
}
return new FlinkConfig(jobName,jobId,ActionType.get(actionStr),SavePointType.get(savePointTypeStr),savePoint,configParasList);
return new FlinkConfig(jobName,jobId,ActionType.get(actionStr),SavePointType.get(savePointTypeStr),savePoint,configMap);
}
public static FlinkConfig build(String jobId, String actionStr, String savePointTypeStr, String savePoint){
......
......@@ -8,7 +8,9 @@ import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* SubmitConfig
......@@ -66,13 +68,13 @@ public class GatewayConfig {
}
if(para.has("configParas")) {
try {
List<ConfigPara> configParas = new ArrayList<>();
Map<String, String> configMap = new HashMap<>();
JsonNode paras = mapper.readTree(para.get("configParas").asText());
paras.forEach((JsonNode node)-> {
configParas.add(new ConfigPara(node.get("key").asText(),node.get("value").asText()));
configMap.put(node.get("key").asText(),node.get("value").asText());
}
);
config.getFlinkConfig().setConfigParas(configParas);
config.getFlinkConfig().setConfiguration(configMap);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
......
......@@ -32,10 +32,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.concurrent.CompletableFuture;
/**
......@@ -63,8 +60,8 @@ public abstract class YarnGateway extends AbstractGateway {
private void initConfig(){
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getFlinkConfigPath());
if(Asserts.isNotNull(config.getFlinkConfig().getConfigParas())) {
addConfigParas(config.getFlinkConfig().getConfigParas());
if(Asserts.isNotNull(config.getFlinkConfig().getConfiguration())) {
addConfigParas(config.getFlinkConfig().getConfiguration());
}
configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())) {
......@@ -87,10 +84,10 @@ public abstract class YarnGateway extends AbstractGateway {
yarnClient.start();
}
private void addConfigParas(List<ConfigPara> configParas){
if(Asserts.isNotNull(configParas)) {
for (ConfigPara configPara : configParas) {
configuration.setString(configPara.getKey(), configPara.getValue());
private void addConfigParas(Map<String, String> configMap){
if(Asserts.isNotNull(configMap)) {
for (Map.Entry<String, String> entry : configMap.entrySet()) {
this.configuration.setString(entry.getKey(), entry.getValue());
}
}
}
......
......@@ -28,7 +28,7 @@ export async function getInitialState(): Promise<{
const fetchUserInfo = async () => {
try {
const result = await queryCurrentUser();
const currentUser:API.CurrentUser = {
const currentUser: API.CurrentUser = {
name: result.datas.nickname,
avatar: result.datas.avatar?result.datas.avatar:'https://gw.alipayobjects.com/zos/antfincdn/XAosXuNZyF/BiazfanxmamNRoxxVxka.png',
userid: result.datas.username,
......@@ -86,7 +86,7 @@ export const request: RequestConfig = {
errorHandler: (error: ResponseError) => {
const { messages } = getIntl(getLocale());
const { request,response } = error;
const writeUrl = ['/api-user/users/current','/api-uaa/oauth/token'];
const writeUrl = ['/api/current'];
if(writeUrl.indexOf(request.originUrl)>-1){
return;
}else {
......
......@@ -463,6 +463,18 @@ export default (): React.ReactNode => {
<li>
<Link>新增 Local 的运行模式选择并优化 JobManager</Link>
</li>
<li>
<Link>修复登录页报错弹框</Link>
</li>
<li>
<Link>优化所有模式的所有功能的执行逻辑</Link>
</li>
<li>
<Link>新增 trigger 的 restAPI 实现</Link>
</li>
<li>
<Link>新增 OpenAPI 的执行 sql 接口</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment