Unverified Commit 0459b324 authored by Licho's avatar Licho Committed by GitHub

refactor: improve code readability. eliminate hide problem, wait for 0.7 (#1155)

* [WIP]refactor: improve code readability.

* refactor: constructor invoke constructor

* refactor: Asserts.java simple and format.

* refactor: simple code

* refactor: simple code

* chore: reformat code

* chore: remove unused import

* feat: improve variable concurrency strict

* feat: encapsulate start end time set function.

* feat: add AutoClosable to Driver interface.

* refactor: remove double bracket init, it is hide danger.
parent 30fad324
...@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; ...@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/** /**
* StudioExecuteDTO * StudioExecuteDTO
...@@ -40,6 +41,7 @@ import lombok.Setter; ...@@ -40,6 +41,7 @@ import lombok.Setter;
*/ */
@Getter @Getter
@Setter @Setter
@Slf4j
public class StudioExecuteDTO extends AbstractStatementDTO { public class StudioExecuteDTO extends AbstractStatementDTO {
// RUN_MODE // RUN_MODE
private String type; private String type;
...@@ -67,10 +69,9 @@ public class StudioExecuteDTO extends AbstractStatementDTO { ...@@ -67,10 +69,9 @@ public class StudioExecuteDTO extends AbstractStatementDTO {
public JobConfig getJobConfig() { public JobConfig getJobConfig() {
Map<String, String> config = new HashMap<>(); Map<String, String> config = new HashMap<>();
JsonNode paras = null;
if (Asserts.isNotNullString(configJson)) { if (Asserts.isNotNullString(configJson)) {
try { try {
paras = mapper.readTree(configJson); JsonNode paras = mapper.readTree(configJson);
paras.forEach((JsonNode node) -> { paras.forEach((JsonNode node) -> {
if (!node.isNull()) { if (!node.isNull()) {
config.put(node.get("key").asText(), node.get("value").asText()); config.put(node.get("key").asText(), node.get("value").asText());
...@@ -78,7 +79,7 @@ public class StudioExecuteDTO extends AbstractStatementDTO { ...@@ -78,7 +79,7 @@ public class StudioExecuteDTO extends AbstractStatementDTO {
} }
); );
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
e.printStackTrace(); log.error(e.getMessage());
} }
} }
return new JobConfig( return new JobConfig(
......
...@@ -171,9 +171,8 @@ public class DataBaseServiceImpl extends SuperServiceImpl<DataBaseMapper, DataBa ...@@ -171,9 +171,8 @@ public class DataBaseServiceImpl extends SuperServiceImpl<DataBaseMapper, DataBa
@Override @Override
public List<String> listEnabledFlinkWith() { public List<String> listEnabledFlinkWith() {
List<DataBase> dataBases = listEnabledAll();
List<String> list = new ArrayList<>(); List<String> list = new ArrayList<>();
for (DataBase dataBase : dataBases) { for (DataBase dataBase : listEnabledAll()) {
if (Asserts.isNotNullString(dataBase.getFlinkConfig())) { if (Asserts.isNotNullString(dataBase.getFlinkConfig())) {
list.add(dataBase.getName() + ":=" + dataBase.getFlinkConfig() + "\n;\n"); list.add(dataBase.getName() + ":=" + dataBase.getFlinkConfig() + "\n;\n");
} }
......
...@@ -48,11 +48,10 @@ public class FragmentVariableServiceImpl extends SuperServiceImpl<FragmentVariab ...@@ -48,11 +48,10 @@ public class FragmentVariableServiceImpl extends SuperServiceImpl<FragmentVariab
@Override @Override
public Map<String, String> listEnabledVariables() { public Map<String, String> listEnabledVariables() {
List<FragmentVariable> fragmentVariables = listEnabledAll();
Map<String, String> variables = new LinkedHashMap<>(); Map<String, String> variables = new LinkedHashMap<>();
for (FragmentVariable fragmentVariable : fragmentVariables) { for (FragmentVariable fragmentVariable : listEnabledAll()) {
variables.put(fragmentVariable.getName(), fragmentVariable.getFragmentValue()); variables.put(fragmentVariable.getName(), fragmentVariable.getFragmentValue());
} }
return variables; return variables;
} }
} }
\ No newline at end of file
...@@ -52,8 +52,6 @@ import com.dlink.model.UDFPath; ...@@ -52,8 +52,6 @@ import com.dlink.model.UDFPath;
import com.dlink.process.context.ProcessContextHolder; import com.dlink.process.context.ProcessContextHolder;
import com.dlink.process.model.ProcessEntity; import com.dlink.process.model.ProcessEntity;
import com.dlink.process.model.ProcessType; import com.dlink.process.model.ProcessType;
import com.dlink.process.pool.ConsolePool;
import com.dlink.process.pool.ProcessPool;
import com.dlink.result.DDLResult; import com.dlink.result.DDLResult;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import com.dlink.result.SelectResult; import com.dlink.result.SelectResult;
...@@ -72,15 +70,13 @@ import com.dlink.session.SessionPool; ...@@ -72,15 +70,13 @@ import com.dlink.session.SessionPool;
import com.dlink.sql.FlinkQuery; import com.dlink.sql.FlinkQuery;
import com.dlink.utils.RunTimeUtil; import com.dlink.utils.RunTimeUtil;
import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
...@@ -101,22 +97,29 @@ public class StudioServiceImpl implements StudioService { ...@@ -101,22 +97,29 @@ public class StudioServiceImpl implements StudioService {
private static final Logger logger = LoggerFactory.getLogger(StudioServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(StudioServiceImpl.class);
@Autowired private final ClusterService clusterService;
private ClusterService clusterService; private final ClusterConfigurationService clusterConfigurationService;
@Autowired private final SavepointsService savepointsService;
private ClusterConfigurationService clusterConfigurationService; private final DataBaseService dataBaseService;
@Autowired private final TaskService taskService;
private SavepointsService savepointsService; private final FragmentVariableService fragmentVariableService;
@Autowired private final UDFService udfService;
private DataBaseService dataBaseService;
@Autowired public StudioServiceImpl(ClusterService clusterService,
private TaskService taskService; ClusterConfigurationService clusterConfigurationService,
SavepointsService savepointsService,
@Autowired DataBaseService dataBaseService,
private FragmentVariableService fragmentVariableService; TaskService taskService,
FragmentVariableService fragmentVariableService,
@Autowired UDFService udfService) {
private UDFService udfService; this.clusterService = clusterService;
this.clusterConfigurationService = clusterConfigurationService;
this.savepointsService = savepointsService;
this.dataBaseService = dataBaseService;
this.taskService = taskService;
this.fragmentVariableService = fragmentVariableService;
this.udfService = udfService;
}
private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) { private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) {
ProcessEntity process = ProcessContextHolder.getProcess(); ProcessEntity process = ProcessContextHolder.getProcess();
...@@ -156,7 +159,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -156,7 +159,7 @@ public class StudioServiceImpl implements StudioService {
} }
private void buildSession(JobConfig config) { private void buildSession(JobConfig config) {
// If you are using a shared session, configure the current jobmanager address // If you are using a shared session, configure the current jobManager address
if (!config.isUseSession()) { if (!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), config.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), config.getClusterId()));
} }
...@@ -165,8 +168,10 @@ public class StudioServiceImpl implements StudioService { ...@@ -165,8 +168,10 @@ public class StudioServiceImpl implements StudioService {
@Override @Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) { public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
if (Dialect.notFlinkSql(studioExecuteDTO.getDialect())) { if (Dialect.notFlinkSql(studioExecuteDTO.getDialect())) {
return executeCommonSql(SqlDTO.build(studioExecuteDTO.getStatement(), return executeCommonSql(SqlDTO.build(
studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum())); studioExecuteDTO.getStatement(),
studioExecuteDTO.getDatabaseId(),
studioExecuteDTO.getMaxRowNum()));
} else { } else {
return executeFlinkSql(studioExecuteDTO); return executeFlinkSql(studioExecuteDTO);
} }
...@@ -176,6 +181,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -176,6 +181,7 @@ public class StudioServiceImpl implements StudioService {
addFlinkSQLEnv(studioExecuteDTO); addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
buildSession(config); buildSession(config);
// To initialize java udf, but it only support local mode. // To initialize java udf, but it only support local mode.
UDFPath udfPath = udfService.initUDF(studioExecuteDTO.getStatement(), GatewayType.get(config.getType())); UDFPath udfPath = udfService.initUDF(studioExecuteDTO.getStatement(), GatewayType.get(config.getType()));
config.setJarFiles(udfPath.getJarPaths()); config.setJarFiles(udfPath.getJarPaths());
...@@ -199,33 +205,36 @@ public class StudioServiceImpl implements StudioService { ...@@ -199,33 +205,36 @@ public class StudioServiceImpl implements StudioService {
public JobResult executeCommonSql(SqlDTO sqlDTO) { public JobResult executeCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult(); JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement()); result.setStatement(sqlDTO.getStatement());
result.setStartTime(LocalDateTime.now()); result.setStartTimeNow();
if (Asserts.isNull(sqlDTO.getDatabaseId())) { if (Asserts.isNull(sqlDTO.getDatabaseId())) {
result.setSuccess(false); result.setSuccess(false);
result.setError("请指定数据源"); result.setError("请指定数据源");
result.setEndTime(LocalDateTime.now()); result.setEndTimeNow();
return result; return result;
} else { }
DataBase dataBase = dataBaseService.getById(sqlDTO.getDatabaseId());
if (Asserts.isNull(dataBase)) { DataBase dataBase = dataBaseService.getById(sqlDTO.getDatabaseId());
result.setSuccess(false); if (Asserts.isNull(dataBase)) {
result.setError("数据源不存在"); result.setSuccess(false);
result.setEndTime(LocalDateTime.now()); result.setError("数据源不存在");
return result; result.setEndTimeNow();
}
Driver driver = Driver.build(dataBase.getDriverConfig());
JdbcSelectResult selectResult = driver.executeSql(sqlDTO.getStatement(), sqlDTO.getMaxRowNum());
driver.close();
result.setResult(selectResult);
if (selectResult.isSuccess()) {
result.setSuccess(true);
} else {
result.setSuccess(false);
result.setError(selectResult.getError());
}
result.setEndTime(LocalDateTime.now());
return result; return result;
} }
JdbcSelectResult selectResult;
try (Driver driver = Driver.build(dataBase.getDriverConfig())) {
selectResult = driver.executeSql(sqlDTO.getStatement(), sqlDTO.getMaxRowNum());
}
result.setResult(selectResult);
if (selectResult.isSuccess()) {
result.setSuccess(true);
} else {
result.setSuccess(false);
result.setError(selectResult.getError());
}
result.setEndTimeNow();
return result;
} }
@Override @Override
...@@ -233,7 +242,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -233,7 +242,7 @@ public class StudioServiceImpl implements StudioService {
JobConfig config = studioDDLDTO.getJobConfig(); JobConfig config = studioDDLDTO.getJobConfig();
if (!config.isUseSession()) { if (!config.isUseSession()) {
config.setAddress( config.setAddress(
clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId())); clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId()));
} }
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
return jobManager.executeDDL(studioDDLDTO.getStatement()); return jobManager.executeDDL(studioDDLDTO.getStatement());
...@@ -249,10 +258,8 @@ public class StudioServiceImpl implements StudioService { ...@@ -249,10 +258,8 @@ public class StudioServiceImpl implements StudioService {
} }
private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO) { private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO) {
Map<String, ProcessEntity> map = ProcessPool.getInstance().getMap();
Map<String, StringBuilder> map2 = ConsolePool.getInstance().getMap();
ProcessEntity process = ProcessContextHolder.registerProcess( ProcessEntity process = ProcessContextHolder.registerProcess(
ProcessEntity.init(ProcessType.FLINKEXPLAIN, SaManager.getStpLogic(null).getLoginIdAsInt(), "admin")); ProcessEntity.init(ProcessType.FLINKEXPLAIN, SaManager.getStpLogic(null).getLoginIdAsInt(), "admin"));
addFlinkSQLEnv(studioExecuteDTO); addFlinkSQLEnv(studioExecuteDTO);
...@@ -262,40 +269,32 @@ public class StudioServiceImpl implements StudioService { ...@@ -262,40 +269,32 @@ public class StudioServiceImpl implements StudioService {
config.buildLocal(); config.buildLocal();
buildSession(config); buildSession(config);
process.infoSuccess(); process.infoSuccess();
// To initialize java udf, but it has a bug in the product environment now. // To initialize java udf, but it has a bug in the product environment now.
UDFPath udfPath = udfService.initUDF(studioExecuteDTO.getStatement(), GatewayType.get(config.getType())); UDFPath udfPath = udfService.initUDF(studioExecuteDTO.getStatement(), GatewayType.get(config.getType()));
config.setJarFiles(udfPath.getJarPaths()); config.setJarFiles(udfPath.getJarPaths());
config.setPyFiles(udfPath.getPyPaths()); config.setPyFiles(udfPath.getPyPaths());
process.start(); process.start();
JobManager jobManager = JobManager.buildPlanMode(config); JobManager jobManager = JobManager.buildPlanMode(config);
List<SqlExplainResult> sqlExplainResults = List<SqlExplainResult> sqlExplainResults =
jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults(); jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
process.finish(); process.finish();
return sqlExplainResults; return sqlExplainResults;
} }
private List<SqlExplainResult> explainCommonSql(StudioExecuteDTO studioExecuteDTO) { private List<SqlExplainResult> explainCommonSql(StudioExecuteDTO studioExecuteDTO) {
if (Asserts.isNull(studioExecuteDTO.getDatabaseId())) { if (Asserts.isNull(studioExecuteDTO.getDatabaseId())) {
return new ArrayList<SqlExplainResult>() { return Collections.singletonList(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "请指定数据源"));
}
{ DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId());
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "请指定数据源")); if (Asserts.isNull(dataBase)) {
} return Collections.singletonList(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "数据源不存在"));
}; }
} else {
DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return new ArrayList<SqlExplainResult>() {
{ try (Driver driver = Driver.build(dataBase.getDriverConfig())) {
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "数据源不存在")); return driver.explain(studioExecuteDTO.getStatement());
}
};
}
Driver driver = Driver.build(dataBase.getDriverConfig());
List<SqlExplainResult> sqlExplainResults = driver.explain(studioExecuteDTO.getStatement());
driver.close();
return sqlExplainResults;
} }
} }
...@@ -343,15 +342,15 @@ public class StudioServiceImpl implements StudioService { ...@@ -343,15 +342,15 @@ public class StudioServiceImpl implements StudioService {
if (sessionDTO.isUseRemote()) { if (sessionDTO.isUseRemote()) {
Cluster cluster = clusterService.getById(sessionDTO.getClusterId()); Cluster cluster = clusterService.getById(sessionDTO.getClusterId());
SessionConfig sessionConfig = SessionConfig.build( SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), true, sessionDTO.getType(), true,
cluster.getId(), cluster.getAlias(), cluster.getId(), cluster.getAlias(),
clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId())); clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId()));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser); return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
} else { } else {
SessionConfig sessionConfig = SessionConfig.build( SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), false, sessionDTO.getType(), false,
null, null, null, null,
clusterService.buildEnvironmentAddress(false, null)); clusterService.buildEnvironmentAddress(false, null));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser); return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
} }
} }
...@@ -369,7 +368,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -369,7 +368,7 @@ public class StudioServiceImpl implements StudioService {
@Override @Override
public LineageResult getLineage(StudioCADTO studioCADTO) { public LineageResult getLineage(StudioCADTO studioCADTO) {
if (Asserts.isNotNullString(studioCADTO.getDialect()) if (Asserts.isNotNullString(studioCADTO.getDialect())
&& !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) { && !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) {
if (Asserts.isNull(studioCADTO.getDatabaseId())) { if (Asserts.isNull(studioCADTO.getDatabaseId())) {
return null; return null;
} }
...@@ -379,10 +378,10 @@ public class StudioServiceImpl implements StudioService { ...@@ -379,10 +378,10 @@ public class StudioServiceImpl implements StudioService {
} }
if (studioCADTO.getDialect().equalsIgnoreCase("doris")) { if (studioCADTO.getDialect().equalsIgnoreCase("doris")) {
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql", return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql",
dataBase.getDriverConfig()); dataBase.getDriverConfig());
} else { } else {
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(),
studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig()); studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
} }
} else { } else {
addFlinkSQLEnv(studioCADTO); addFlinkSQLEnv(studioCADTO);
...@@ -414,7 +413,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -414,7 +413,7 @@ public class StudioServiceImpl implements StudioService {
jobConfig.setAddress(cluster.getJobManagerHost()); jobConfig.setAddress(cluster.getJobManagerHost());
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) { if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId()); clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig); jobConfig.buildGatewayConfig(gatewayConfig);
} }
JobManager jobManager = JobManager.build(jobConfig); JobManager jobManager = JobManager.build(jobConfig);
...@@ -430,17 +429,15 @@ public class StudioServiceImpl implements StudioService { ...@@ -430,17 +429,15 @@ public class StudioServiceImpl implements StudioService {
JobConfig jobConfig = new JobConfig(); JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost()); jobConfig.setAddress(cluster.getJobManagerHost());
jobConfig.setType(cluster.getType()); jobConfig.setType(cluster.getType());
// 如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) { if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = // 如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId()); Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig); jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName()); jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
jobConfig.setTaskId(cluster.getTaskId()); jobConfig.setTaskId(cluster.getTaskId());
useGateway = true; useGateway = true;
} } else {
// 用户选择外部的平台来托管集群信息,但是集群上的任务不一定是通过dlink提交的 // 用户选择外部的平台来托管集群信息,但是集群上的任务不一定是通过dlink提交的
else {
jobConfig.setTaskId(taskId); jobConfig.setTaskId(taskId);
} }
JobManager jobManager = JobManager.build(jobConfig); JobManager jobManager = JobManager.build(jobConfig);
...@@ -451,6 +448,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -451,6 +448,7 @@ public class StudioServiceImpl implements StudioService {
if (jobConfig.getTaskId().equals(0)) { if (jobConfig.getTaskId().equals(0)) {
return true; return true;
} }
for (JobInfo item : savePointResult.getJobInfos()) { for (JobInfo item : savePointResult.getJobInfos()) {
if (Asserts.isEqualsIgnoreCase(jobId, item.getJobId()) && Asserts.isNotNull(jobConfig.getTaskId())) { if (Asserts.isEqualsIgnoreCase(jobId, item.getJobId()) && Asserts.isNotNull(jobConfig.getTaskId())) {
Savepoints savepoints = new Savepoints(); Savepoints savepoints = new Savepoints();
...@@ -480,34 +478,29 @@ public class StudioServiceImpl implements StudioService { ...@@ -480,34 +478,29 @@ public class StudioServiceImpl implements StudioService {
} else { } else {
studioMetaStoreDTO.setStatement(FlinkQuery.showCatalogs()); studioMetaStoreDTO.setStatement(FlinkQuery.showCatalogs());
IResult result = executeMSFlinkSql(studioMetaStoreDTO); IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) { if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result; DDLResult ddlResult = (DDLResult) result;
Iterator<String> iterator = ddlResult.getColumns().iterator(); ddlResult.getColumns().stream().findFirst().ifPresent(key -> {
if (iterator.hasNext()) { for (Map<String, Object> item : ddlResult.getRowData()) {
String key = iterator.next(); catalogs.add(Catalog.build(item.get(key).toString()));
List<Map<String, Object>> rowData = ddlResult.getRowData(); }
for (Map<String, Object> item : rowData) {
catalogs.add(Catalog.build(item.get(key).toString()));
} }
} );
for (Catalog catalog : catalogs) { for (Catalog catalog : catalogs) {
String statement = FlinkQuery.useCatalog(catalog.getName()) + FlinkQuery.separator() String statement = FlinkQuery.useCatalog(catalog.getName()) + FlinkQuery.separator() + FlinkQuery.showDatabases();
+ FlinkQuery.showDatabases();
studioMetaStoreDTO.setStatement(statement); studioMetaStoreDTO.setStatement(statement);
IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO); IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) { DDLResult tableDDLResult = (DDLResult) tableResult;
DDLResult tableDDLResult = (DDLResult) tableResult; tableDDLResult.getColumns().stream().findFirst().ifPresent(key -> {
Iterator<String> tableIterator = tableDDLResult.getColumns().iterator(); List<Map<String, Object>> rowData = tableDDLResult.getRowData();
if (tableIterator.hasNext()) { List<Schema> schemas = new ArrayList<>();
String key = tableIterator.next(); for (Map<String, Object> item : rowData) {
List<Map<String, Object>> rowData = tableDDLResult.getRowData(); schemas.add(Schema.build(item.get(key).toString()));
List<Schema> schemas = new ArrayList<>();
for (Map<String, Object> item : rowData) {
schemas.add(Schema.build(item.get(key).toString()));
}
catalog.setSchemas(schemas);
} }
} catalog.setSchemas(schemas);
});
} }
} }
} }
...@@ -525,24 +518,25 @@ public class StudioServiceImpl implements StudioService { ...@@ -525,24 +518,25 @@ public class StudioServiceImpl implements StudioService {
tables.addAll(driver.listTables(studioMetaStoreDTO.getDatabase())); tables.addAll(driver.listTables(studioMetaStoreDTO.getDatabase()));
} }
} else { } else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator() String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog())
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator(); + FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase())
+ FlinkQuery.separator();
// show tables // show tables
String tableStatement = baseStatement + FlinkQuery.showTables(); String tableStatement = baseStatement + FlinkQuery.showTables();
studioMetaStoreDTO.setStatement(tableStatement); studioMetaStoreDTO.setStatement(tableStatement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO); IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) { if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result; DDLResult ddlResult = (DDLResult) result;
Iterator<String> iterator = ddlResult.getColumns().iterator(); ddlResult.getColumns().stream().findFirst().ifPresent(key -> {
if (iterator.hasNext()) {
String key = iterator.next();
List<Map<String, Object>> rowData = ddlResult.getRowData(); List<Map<String, Object>> rowData = ddlResult.getRowData();
for (Map<String, Object> item : rowData) { for (Map<String, Object> item : rowData) {
Table table = Table.build(item.get(key).toString(), studioMetaStoreDTO.getDatabase()); Table table = Table.build(item.get(key).toString(), studioMetaStoreDTO.getDatabase());
table.setCatalog(studioMetaStoreDTO.getCatalog()); table.setCatalog(studioMetaStoreDTO.getCatalog());
tables.add(table); tables.add(table);
} }
} });
} }
// show views // show views
schema.setViews(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showViews())); schema.setViews(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showViews()));
...@@ -560,11 +554,13 @@ public class StudioServiceImpl implements StudioService { ...@@ -560,11 +554,13 @@ public class StudioServiceImpl implements StudioService {
@Override @Override
public List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO) { public List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO) {
List<FlinkColumn> columns = new ArrayList<>(); List<FlinkColumn> columns = new ArrayList<>();
if (Dialect.notFlinkSql(studioMetaStoreDTO.getDialect())) { if (!Dialect.notFlinkSql(studioMetaStoreDTO.getDialect())) {
// nothing to do String baseStatement = FlinkQuery.useCatalog(
} else { studioMetaStoreDTO.getCatalog())
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator() + FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator(); + FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase())
+ FlinkQuery.separator();
// desc tables // desc tables
String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable()); String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable());
studioMetaStoreDTO.setStatement(tableStatement); studioMetaStoreDTO.setStatement(tableStatement);
...@@ -591,19 +587,15 @@ public class StudioServiceImpl implements StudioService { ...@@ -591,19 +587,15 @@ public class StudioServiceImpl implements StudioService {
private List<String> showInfo(StudioMetaStoreDTO studioMetaStoreDTO, String baseStatement, String statement) { private List<String> showInfo(StudioMetaStoreDTO studioMetaStoreDTO, String baseStatement, String statement) {
List<String> infos = new ArrayList<>(); List<String> infos = new ArrayList<>();
String tableStatement = baseStatement + statement; studioMetaStoreDTO.setStatement(baseStatement + statement);
studioMetaStoreDTO.setStatement(tableStatement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO); IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) { if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result; DDLResult ddlResult = (DDLResult) result;
Iterator<String> iterator = ddlResult.getColumns().iterator(); ddlResult.getColumns().stream().findFirst().ifPresent(key -> {
if (iterator.hasNext()) { for (Map<String, Object> item : ddlResult.getRowData()) {
String key = iterator.next();
List<Map<String, Object>> rowData = ddlResult.getRowData();
for (Map<String, Object> item : rowData) {
infos.add(item.get(key).toString()); infos.add(item.get(key).toString());
} }
} });
} }
return infos; return infos;
} }
......
...@@ -21,8 +21,10 @@ package com.dlink.assertion; ...@@ -21,8 +21,10 @@ package com.dlink.assertion;
import com.dlink.exception.RunTimeException; import com.dlink.exception.RunTimeException;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
* Asserts * Asserts
...@@ -44,17 +46,11 @@ public class Asserts { ...@@ -44,17 +46,11 @@ public class Asserts {
} }
public static boolean isNullString(String str) { public static boolean isNullString(String str) {
return isNull(str) || "".equals(str); return isNull(str) || str.isEmpty();
} }
public static boolean isAllNullString(String... str) { public static boolean isAllNullString(String... str) {
boolean isNull = true; return Arrays.stream(str).allMatch(Asserts::isNullString);
for (String item : str) {
if (isNotNullString(item)) {
isNull = false;
}
}
return isNull;
} }
public static boolean isNotNullString(String str) { public static boolean isNotNullString(String str) {
...@@ -62,33 +58,15 @@ public class Asserts { ...@@ -62,33 +58,15 @@ public class Asserts {
} }
public static boolean isAllNotNullString(String... str) { public static boolean isAllNotNullString(String... str) {
boolean isNotNull = true; return Arrays.stream(str).noneMatch(Asserts::isNullString);
for (String item : str) {
if (isNullString(item)) {
isNotNull = false;
}
}
return isNotNull;
} }
public static boolean isEquals(String str1, String str2) { public static boolean isEquals(String str1, String str2) {
if (isNull(str1) && isNull(str2)) { return Objects.equals(str1, str2);
return true;
} else if (isNull(str1) || isNull(str2)) {
return false;
} else {
return str1.equals(str2);
}
} }
public static boolean isEqualsIgnoreCase(String str1, String str2) { public static boolean isEqualsIgnoreCase(String str1, String str2) {
if (isNull(str1) && isNull(str2)) { return (str1 == null && str2 == null) || (str1 != null && str1.equalsIgnoreCase(str2));
return true;
} else if (isNull(str1) || isNull(str2)) {
return false;
} else {
return str1.equalsIgnoreCase(str2);
}
} }
public static boolean isNullCollection(Collection<?> collection) { public static boolean isNullCollection(Collection<?> collection) {
...@@ -100,7 +78,7 @@ public class Asserts { ...@@ -100,7 +78,7 @@ public class Asserts {
} }
public static boolean isNullMap(Map<?, ?> map) { public static boolean isNullMap(Map<?, ?> map) {
return isNull(map) || map.size() == 0; return isNull(map) || map.isEmpty();
} }
public static boolean isNotNullMap(Map<?, ?> map) { public static boolean isNotNullMap(Map<?, ?> map) {
......
...@@ -66,4 +66,12 @@ public class JobResult { ...@@ -66,4 +66,12 @@ public class JobResult {
this.startTime = startTime; this.startTime = startTime;
this.endTime = endTime; this.endTime = endTime;
} }
public void setStartTimeNow() {
this.setStartTime(LocalDateTime.now());
}
public void setEndTimeNow() {
this.setEndTime(LocalDateTime.now());
}
} }
...@@ -53,6 +53,7 @@ import cn.hutool.core.io.FileUtil; ...@@ -53,6 +53,7 @@ import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Dict; import cn.hutool.core.lang.Dict;
import cn.hutool.core.lang.Opt; import cn.hutool.core.lang.Opt;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.MD5; import cn.hutool.crypto.digest.MD5;
...@@ -69,6 +70,9 @@ import groovy.lang.GroovyClassLoader; ...@@ -69,6 +70,9 @@ import groovy.lang.GroovyClassLoader;
*/ */
public class UDFUtil { public class UDFUtil {
private UDFUtil() {
}
protected static final Logger log = LoggerFactory.getLogger(UDFUtil.class); protected static final Logger log = LoggerFactory.getLogger(UDFUtil.class);
/** /**
* 存放 udf md5与版本对应的k,v值 * 存放 udf md5与版本对应的k,v值
...@@ -130,7 +134,7 @@ public class UDFUtil { ...@@ -130,7 +134,7 @@ public class UDFUtil {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
List<String> classNameList = udfList.stream().map(UDF::getClassName).collect(Collectors.toList()); List<String> classNameList = udfList.stream().map(UDF::getClassName).collect(Collectors.toList());
process.info(StringUtils.join(",", classNameList)); process.info(StringUtils.join(",", classNameList));
process.info(StrUtil.format("A total of {} UDF have been Parsed.", classNameList.size())); process.info(CharSequenceUtil.format("A total of {} UDF have been Parsed.", classNameList.size()));
return udfList; return udfList;
} }
......
...@@ -41,7 +41,7 @@ import java.util.Set; ...@@ -41,7 +41,7 @@ import java.util.Set;
* @author wenmo * @author wenmo
* @since 2021/7/19 23:15 * @since 2021/7/19 23:15
*/ */
public interface Driver { public interface Driver extends AutoCloseable {
static Optional<Driver> get(DriverConfig config) { static Optional<Driver> get(DriverConfig config) {
Asserts.checkNotNull(config, "数据源配置不能为空"); Asserts.checkNotNull(config, "数据源配置不能为空");
......
...@@ -27,7 +27,7 @@ import java.util.ArrayList; ...@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.text.CharSequenceUtil;
/** /**
* Process * Process
...@@ -121,7 +121,7 @@ public class ProcessEntity { ...@@ -121,7 +121,7 @@ public class ProcessEntity {
if (isNullProcess()) { if (isNullProcess()) {
return; return;
} }
String message = StrUtil.format("\n[{}] {} CONFIG: {}", type.getValue(), LocalDateTime.now(), str); String message = CharSequenceUtil.format("\n[{}] {} CONFIG: {}", type.getValue(), LocalDateTime.now(), str);
steps.get(stepIndex - 1).appendInfo(message); steps.get(stepIndex - 1).appendInfo(message);
ConsolePool.write(message, userId); ConsolePool.write(message, userId);
} }
...@@ -130,7 +130,7 @@ public class ProcessEntity { ...@@ -130,7 +130,7 @@ public class ProcessEntity {
if (isNullProcess()) { if (isNullProcess()) {
return; return;
} }
String message = StrUtil.format("\n[{}] {} INFO: {}", type.getValue(), LocalDateTime.now(), str); String message = CharSequenceUtil.format("\n[{}] {} INFO: {}", type.getValue(), LocalDateTime.now(), str);
steps.get(stepIndex - 1).appendInfo(message); steps.get(stepIndex - 1).appendInfo(message);
ConsolePool.write(message, userId); ConsolePool.write(message, userId);
} }
...@@ -155,7 +155,7 @@ public class ProcessEntity { ...@@ -155,7 +155,7 @@ public class ProcessEntity {
if (isNullProcess()) { if (isNullProcess()) {
return; return;
} }
String message = StrUtil.format("\n[{}] {} ERROR: {}", type.getValue(), LocalDateTime.now(), str); String message = CharSequenceUtil.format("\n[{}] {} ERROR: {}", type.getValue(), LocalDateTime.now(), str);
steps.get(stepIndex - 1).appendInfo(message); steps.get(stepIndex - 1).appendInfo(message);
steps.get(stepIndex - 1).appendError(message); steps.get(stepIndex - 1).appendError(message);
ConsolePool.write(message, userId); ConsolePool.write(message, userId);
......
...@@ -41,11 +41,10 @@ public class ProcessStep { ...@@ -41,11 +41,10 @@ public class ProcessStep {
} }
public ProcessStep(ProcessStatus stepStatus, LocalDateTime startTime) { public ProcessStep(ProcessStatus stepStatus, LocalDateTime startTime) {
this.stepStatus = stepStatus; this(stepStatus, startTime, null, 0, null, null);
this.startTime = startTime;
} }
public ProcessStep(int index, ProcessStatus stepStatus, LocalDateTime startTime, LocalDateTime endTime, long time, public ProcessStep(ProcessStatus stepStatus, LocalDateTime startTime, LocalDateTime endTime, long time,
StringBuilder info, StringBuilder error) { StringBuilder info, StringBuilder error) {
this.stepStatus = stepStatus; this.stepStatus = stepStatus;
this.startTime = startTime; this.startTime = startTime;
......
...@@ -32,9 +32,9 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -32,9 +32,9 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class ConsolePool extends AbstractPool<StringBuilder> { public class ConsolePool extends AbstractPool<StringBuilder> {
private static volatile Map<String, StringBuilder> consoleEntityMap = new ConcurrentHashMap<>(); private static final Map<String, StringBuilder> consoleEntityMap = new ConcurrentHashMap<>();
private static ConsolePool instance = new ConsolePool(); private static final ConsolePool instance = new ConsolePool();
public static ConsolePool getInstance() { public static ConsolePool getInstance() {
return instance; return instance;
...@@ -51,13 +51,8 @@ public class ConsolePool extends AbstractPool<StringBuilder> { ...@@ -51,13 +51,8 @@ public class ConsolePool extends AbstractPool<StringBuilder> {
} }
public static void write(String str, Integer userId) { public static void write(String str, Integer userId) {
String user = userId.toString(); String user = String.valueOf(userId);
if (consoleEntityMap.containsKey(user)) { consoleEntityMap.getOrDefault(user, new StringBuilder("Dinky User Console:")).append(str);
consoleEntityMap.get(user).append(str);
} else {
StringBuilder sb = new StringBuilder("Dinky User Console:");
consoleEntityMap.put(user, sb.append(str));
}
} }
} }
...@@ -33,9 +33,9 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -33,9 +33,9 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class ProcessPool extends AbstractPool<ProcessEntity> { public class ProcessPool extends AbstractPool<ProcessEntity> {
private static volatile Map<String, ProcessEntity> processEntityMap = new ConcurrentHashMap<>(); private static final Map<String, ProcessEntity> processEntityMap = new ConcurrentHashMap<>();
private static ProcessPool instance = new ProcessPool(); private static final ProcessPool instance = new ProcessPool();
public static ProcessPool getInstance() { public static ProcessPool getInstance() {
return instance; return instance;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment