Unverified Commit 56f235d3 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-915][admin,core] Add global variables takes effect in flinksql (#916)

Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent ee28a13b
......@@ -19,6 +19,8 @@
package com.dlink.dto;
import java.util.Map;
/**
* AbstractStatementDTO
*
......@@ -30,6 +32,7 @@ public class AbstractStatementDTO {
private String statement;
private Integer envId;
private boolean fragment = false;
private Map<String, String> variables;
public String getStatement() {
return statement;
......@@ -54,4 +57,12 @@ public class AbstractStatementDTO {
public void setFragment(boolean fragment) {
this.fragment = fragment;
}
public Map<String, String> getVariables() {
return variables;
}
public void setVariables(Map<String, String> variables) {
this.variables = variables;
}
}
......@@ -19,12 +19,11 @@
package com.dlink.dto;
import com.dlink.assertion.Asserts;
import com.dlink.job.JobConfig;
import java.util.HashMap;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -84,6 +83,6 @@ public class StudioExecuteDTO extends AbstractStatementDTO {
return new JobConfig(
type, useResult, useChangeLog, useAutoCancel, useSession, session, clusterId,
clusterConfigurationId, jarId, taskId, jobName, isFragment(), statementSet, batchModel,
maxRowNum, checkPoint, parallelism, savePointStrategy, savePointPath, config);
maxRowNum, checkPoint, parallelism, savePointStrategy, savePointPath, getVariables(), config);
}
}
......@@ -45,6 +45,6 @@ public class StudioMetaStoreDTO extends AbstractStatementDTO {
GatewayType.LOCAL.getLongValue(), true, false, false, false,
null, null, null, null, null,
null, isFragment(), false, false, 0,
null, null, null, null, null);
null, null, null, null, null, null);
}
}
......@@ -20,6 +20,9 @@
package com.dlink.service;
import java.util.List;
import java.util.Map;
import com.dlink.db.service.ISuperService;
import com.dlink.model.FragmentVariable;
......@@ -31,4 +34,7 @@ import com.dlink.model.FragmentVariable;
*/
public interface FragmentVariableService extends ISuperService<FragmentVariable> {
List<FragmentVariable> listEnabledAll();
Map<String, String> listEnabledVariables();
}
......@@ -19,11 +19,17 @@
package com.dlink.service.impl;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.FragmentVariableMapper;
import com.dlink.model.FragmentVariable;
import com.dlink.service.FragmentVariableService;
import org.springframework.stereotype.Service;
/**
......@@ -34,4 +40,18 @@ import org.springframework.stereotype.Service;
*/
@Service
public class FragmentVariableServiceImpl extends SuperServiceImpl<FragmentVariableMapper, FragmentVariable> implements FragmentVariableService {
@Override
public List<FragmentVariable> listEnabledAll() {
return list(new QueryWrapper<FragmentVariable>().eq("enabled", 1));
}
@Override
public Map<String, String> listEnabledVariables() {
List<FragmentVariable> fragmentVariables = listEnabledAll();
Map<String, String> variables = new LinkedHashMap<>();
for (FragmentVariable fragmentVariable : fragmentVariables) {
variables.put(fragmentVariable.getName(), fragmentVariable.getFragmentValue());
}
return variables;
}
}
\ No newline at end of file
......@@ -19,6 +19,17 @@
package com.dlink.service.impl;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.config.Dialect;
......@@ -54,6 +65,7 @@ import com.dlink.result.SqlExplainResult;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.DataBaseService;
import com.dlink.service.FragmentVariableService;
import com.dlink.service.SavepointsService;
import com.dlink.service.StudioService;
import com.dlink.service.TaskService;
......@@ -62,18 +74,6 @@ import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
import com.dlink.sql.FlinkQuery;
import com.dlink.utils.RunTimeUtil;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -100,8 +100,11 @@ public class StudioServiceImpl implements StudioService {
private DataBaseService dataBaseService;
@Autowired
private TaskService taskService;
@Autowired
private FragmentVariableService fragmentVariableService;
private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) {
statementDTO.setVariables(fragmentVariableService.listEnabledVariables());
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
if (statementDTO.isFragment() && Asserts.isNotNullString(flinkWithSql)) {
statementDTO.setStatement(flinkWithSql + "\r\n" + statementDTO.getStatement());
......@@ -218,14 +221,14 @@ public class StudioServiceImpl implements StudioService {
private List<SqlExplainResult> explainCommonSql(StudioExecuteDTO studioExecuteDTO) {
if (Asserts.isNull(studioExecuteDTO.getDatabaseId())) {
return new ArrayList<SqlExplainResult>() {{
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "请指定数据源"));
}};
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "请指定数据源"));
}};
} else {
DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return new ArrayList<SqlExplainResult>() {{
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "数据源不存在"));
}};
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "数据源不存在"));
}};
}
Driver driver = Driver.build(dataBase.getDriverConfig());
List<SqlExplainResult> sqlExplainResults = driver.explain(studioExecuteDTO.getStatement());
......
......@@ -81,6 +81,7 @@ import com.dlink.service.CatalogueService;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.DataBaseService;
import com.dlink.service.FragmentVariableService;
import com.dlink.service.HistoryService;
import com.dlink.service.JarService;
import com.dlink.service.JobHistoryService;
......@@ -169,6 +170,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private TaskVersionService taskVersionService;
@Autowired
private CatalogueService catalogueService;
@Autowired
private FragmentVariableService fragmentVariableService;
@Value("${spring.datasource.driver-class-name}")
private String driver;
......@@ -756,6 +759,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
default:
config.setSavePointPath(null);
}
config.setVariables(fragmentVariableService.listEnabledVariables());
return config;
}
......
......@@ -20,18 +20,22 @@
package com.dlink.job;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.*;
import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.session.SessionConfig;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
import java.util.Map;
/**
* JobConfig
*
......@@ -68,7 +72,7 @@ public class JobConfig {
private SavePointStrategy savePointStrategy;
private String savePointPath;
private GatewayConfig gatewayConfig;
private Map<String, String> variables;
private Map<String, String> config;
public JobConfig() {
......@@ -87,7 +91,7 @@ public class JobConfig {
public JobConfig(String type, boolean useResult, boolean useChangeLog, boolean useAutoCancel, boolean useSession, String session, Integer clusterId,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet, boolean useBatchModel, Integer maxRowNum, Integer checkpoint, Integer parallelism,
Integer savePointStrategyValue, String savePointPath, Map<String, String> config) {
Integer savePointStrategyValue, String savePointPath, Map<String, String> variables, Map<String, String> config) {
this.type = type;
this.useResult = useResult;
this.useChangeLog = useChangeLog;
......@@ -108,6 +112,7 @@ public class JobConfig {
this.parallelism = parallelism;
this.savePointStrategy = SavePointStrategy.get(savePointStrategyValue);
this.savePointPath = savePointPath;
this.variables = variables;
this.config = config;
}
......@@ -205,10 +210,10 @@ public class JobConfig {
gatewayConfig.setFlinkConfig(FlinkConfig.build((Map<String, String>) config.get("flinkConfig")));
}
if (config.containsKey("kubernetesConfig")) {
Map<String,Object> kubernetesConfig = (Map<String,Object>) config.get("kubernetesConfig");
Map<String, Object> kubernetesConfig = (Map<String, Object>) config.get("kubernetesConfig");
//构建GatewayConfig时,将k8s集群默认配置和自定义参数配置加载到FlinkConfig里
for(Map.Entry<String,Object> entry:kubernetesConfig.entrySet()){
gatewayConfig.getFlinkConfig().getConfiguration().put(entry.getKey(),entry.getValue().toString());
for (Map.Entry<String, Object> entry : kubernetesConfig.entrySet()) {
gatewayConfig.getFlinkConfig().getConfiguration().put(entry.getKey(), entry.getValue().toString());
}
}
}
......
......@@ -212,6 +212,7 @@ public class JobManager {
} else {
createExecutor();
}
executor.getSqlManager().registerSqlFragment(config.getVariables());
return executor;
}
......
......@@ -20,14 +20,15 @@
package com.dlink.core;
import java.util.HashMap;
import org.junit.Test;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.result.ResultPool;
import com.dlink.result.SelectResult;
import org.junit.Test;
import java.util.HashMap;
/**
* JobManagerTest
......@@ -41,20 +42,20 @@ public class JobManagerTest {
public void cancelJobSelect() {
JobConfig config = new JobConfig("session-yarn", true, true, true, true, "s1", 2,
null, null, null, "测试", false, false, false, 100, 0,
1, 0, null, new HashMap<>());
null, null, null, "测试", false, false, false, 100, 0,
1, 0, null, new HashMap<>(), new HashMap<>());
if (config.isUseRemote()) {
config.setAddress("192.168.123.157:8081");
}
JobManager jobManager = JobManager.build(config);
String sql1 = "CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" order_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1'\n" +
");";
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" order_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1'\n" +
");";
String sql3 = "select order_number,price,order_time from Orders";
String sql = sql1 + sql3;
JobResult result = jobManager.executeSql(sql);
......
......@@ -20,9 +20,10 @@
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.model.SystemConfiguration;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
......@@ -31,13 +32,18 @@ import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.model.SystemConfiguration;
/**
* Flink Sql Fragment Manager
......@@ -73,18 +79,31 @@ public final class SqlManager {
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
format("The fragment of sql %s already exists.", sqlFragmentName));
}
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Registers a fragment map of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentMap a fragment map of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(Map<String, String> sqlFragmentMap) {
if (Asserts.isNotNull(sqlFragmentMap)) {
sqlFragments.putAll(sqlFragmentMap);
}
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
......@@ -96,14 +115,14 @@ public final class SqlManager {
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
......@@ -116,14 +135,14 @@ public final class SqlManager {
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment