Unverified Commit f53833af authored by ZackYoung's avatar ZackYoung Committed by GitHub

add udf yarn-application; (#1131)

change code style
parent 6ba6cdb0
......@@ -19,11 +19,13 @@
package com.dlink.service;
import com.dlink.gateway.GatewayType;
/**
* @author ZackYoung
* @since 0.6.8
*/
public interface UDFService {
String[] initUDF(String statement);
String[] initUDF(String statement, GatewayType gatewayType);
}
......@@ -164,7 +164,7 @@ public class StudioServiceImpl implements StudioService {
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
if (Dialect.notFlinkSql(studioExecuteDTO.getDialect())) {
return executeCommonSql(SqlDTO.build(studioExecuteDTO.getStatement(),
studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum()));
studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum()));
} else {
return executeFlinkSql(studioExecuteDTO);
}
......@@ -175,7 +175,7 @@ public class StudioServiceImpl implements StudioService {
JobConfig config = studioExecuteDTO.getJobConfig();
buildSession(config);
// To initialize java udf, but it only support local mode.
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement()));
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement(), config.getGatewayConfig().getType()));
JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
RunTimeUtil.recovery(jobManager);
......@@ -229,7 +229,7 @@ public class StudioServiceImpl implements StudioService {
JobConfig config = studioDDLDTO.getJobConfig();
if (!config.isUseSession()) {
config.setAddress(
clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId()));
clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeDDL(studioDDLDTO.getStatement());
......@@ -248,7 +248,7 @@ public class StudioServiceImpl implements StudioService {
Map<String, ProcessEntity> map = ProcessPool.getInstance().getMap();
Map<String, StringBuilder> map2 = ConsolePool.getInstance().getMap();
ProcessEntity process = ProcessContextHolder.registerProcess(
ProcessEntity.init(ProcessType.FLINKEXPLAIN, SaManager.getStpLogic(null).getLoginIdAsInt(), "admin"));
ProcessEntity.init(ProcessType.FLINKEXPLAIN, SaManager.getStpLogic(null).getLoginIdAsInt(), "admin"));
addFlinkSQLEnv(studioExecuteDTO);
......@@ -259,11 +259,11 @@ public class StudioServiceImpl implements StudioService {
buildSession(config);
process.infoSuccess();
// To initialize java udf, but it has a bug in the product environment now.
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement()));
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement(), config.getGatewayConfig().getType()));
process.start();
JobManager jobManager = JobManager.buildPlanMode(config);
List<SqlExplainResult> sqlExplainResults =
jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
process.finish();
return sqlExplainResults;
}
......@@ -311,7 +311,7 @@ public class StudioServiceImpl implements StudioService {
// If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local.
config.buildLocal();
buildSession(config);
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement()));
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement(), config.getGatewayConfig().getType()));
JobManager jobManager = JobManager.buildPlanMode(config);
String planJson = jobManager.getJobPlanJson(studioExecuteDTO.getStatement());
ObjectMapper mapper = new ObjectMapper();
......@@ -335,15 +335,15 @@ public class StudioServiceImpl implements StudioService {
if (sessionDTO.isUseRemote()) {
Cluster cluster = clusterService.getById(sessionDTO.getClusterId());
SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), true,
cluster.getId(), cluster.getAlias(),
clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId()));
sessionDTO.getType(), true,
cluster.getId(), cluster.getAlias(),
clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId()));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
} else {
SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), false,
null, null,
clusterService.buildEnvironmentAddress(false, null));
sessionDTO.getType(), false,
null, null,
clusterService.buildEnvironmentAddress(false, null));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
}
}
......@@ -361,7 +361,7 @@ public class StudioServiceImpl implements StudioService {
@Override
public LineageResult getLineage(StudioCADTO studioCADTO) {
if (Asserts.isNotNullString(studioCADTO.getDialect())
&& !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) {
&& !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) {
if (Asserts.isNull(studioCADTO.getDatabaseId())) {
return null;
}
......@@ -371,10 +371,10 @@ public class StudioServiceImpl implements StudioService {
}
if (studioCADTO.getDialect().equalsIgnoreCase("doris")) {
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql",
dataBase.getDriverConfig());
dataBase.getDriverConfig());
} else {
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(),
studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
}
} else {
addFlinkSQLEnv(studioCADTO);
......@@ -406,7 +406,7 @@ public class StudioServiceImpl implements StudioService {
jobConfig.setAddress(cluster.getJobManagerHost());
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
}
JobManager jobManager = JobManager.build(jobConfig);
......@@ -425,7 +425,7 @@ public class StudioServiceImpl implements StudioService {
// 如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
jobConfig.setTaskId(cluster.getTaskId());
......@@ -484,7 +484,7 @@ public class StudioServiceImpl implements StudioService {
}
for (Catalog catalog : catalogs) {
String statement = FlinkQuery.useCatalog(catalog.getName()) + FlinkQuery.separator()
+ FlinkQuery.showDatabases();
+ FlinkQuery.showDatabases();
studioMetaStoreDTO.setStatement(statement);
IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
......@@ -518,7 +518,7 @@ public class StudioServiceImpl implements StudioService {
}
} else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
// show tables
String tableStatement = baseStatement + FlinkQuery.showTables();
studioMetaStoreDTO.setStatement(tableStatement);
......@@ -556,7 +556,7 @@ public class StudioServiceImpl implements StudioService {
// nothing to do
} else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
// desc tables
String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable());
studioMetaStoreDTO.setStatement(tableStatement);
......@@ -567,12 +567,12 @@ public class StudioServiceImpl implements StudioService {
int i = 1;
for (Map<String, Object> item : rowData) {
FlinkColumn column = FlinkColumn.build(i,
item.get(FlinkQuery.columnName()).toString(),
item.get(FlinkQuery.columnType()).toString(),
item.get(FlinkQuery.columnKey()).toString(),
item.get(FlinkQuery.columnNull()).toString(),
item.get(FlinkQuery.columnExtras()).toString(),
item.get(FlinkQuery.columnWatermark()).toString());
item.get(FlinkQuery.columnName()).toString(),
item.get(FlinkQuery.columnType()).toString(),
item.get(FlinkQuery.columnKey()).toString(),
item.get(FlinkQuery.columnNull()).toString(),
item.get(FlinkQuery.columnExtras()).toString(),
item.get(FlinkQuery.columnWatermark()).toString());
columns.add(column);
i++;
}
......
......@@ -202,7 +202,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.getDatabaseId(), null));
}
JobConfig config = buildJobConfig(task);
config.setJarFiles(udfService.initUDF(task.getStatement()));
config.setJarFiles(udfService.initUDF(task.getStatement(), config.getGatewayConfig().getType()));
JobManager jobManager = JobManager.build(config);
if (!config.isJarTask()) {
return jobManager.executeSql(task.getStatement());
......
......@@ -20,6 +20,7 @@
package com.dlink.service.impl;
import com.dlink.constant.PathConstant;
import com.dlink.exception.BusException;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobManager;
import com.dlink.model.Task;
......@@ -53,15 +54,15 @@ public class UDFServiceImpl implements UDFService {
* 快速获取 session 与 application 等类型,为了减少判断
*/
private static final Map<String, List<GatewayType>> GATEWAY_TYPE_MAP = MapUtil
.builder("session",
Arrays.asList(GatewayType.YARN_SESSION, GatewayType.KUBERNETES_SESSION, GatewayType.STANDALONE))
.build();
.builder("session",
Arrays.asList(GatewayType.YARN_SESSION, GatewayType.KUBERNETES_SESSION, GatewayType.STANDALONE))
.build();
@Resource
TaskService taskService;
@Override
public String[] initUDF(String statement) {
public String[] initUDF(String statement, GatewayType gatewayType) {
ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Initializing Flink UDF...Start");
Opt<String> udfJarPath = Opt.empty();
......@@ -73,10 +74,14 @@ public class UDFServiceImpl implements UDFService {
}, true);
if (codeList.size() > 0) {
udfJarPath = Opt.ofBlankAble(UDFUtil.getUdfNameAndBuildJar(codeList));
} else {
if (gatewayType == GatewayType.KUBERNETES_APPLICATION) {
throw new BusException("udf 暂不支持k8s application");
}
}
process.info("Initializing Flink UDF...Finish");
if (udfJarPath.isPresent()) {
return new String[]{PathConstant.UDF_PATH + udfJarPath.get()};
return new String[] {PathConstant.UDF_PATH + udfJarPath.get()};
} else {
return new String[0];
}
......
......@@ -72,20 +72,15 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.URLUtil;
/**
* JobManager
*
......@@ -166,6 +161,7 @@ public class JobManager {
JobManager manager = new JobManager(config);
manager.init();
manager.executor.initUDF(config.getJarFiles());
config.getGatewayConfig().setJarPaths(config.getJarFiles());
return manager;
}
......@@ -398,7 +394,6 @@ public class JobManager {
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
}
jobGraph.addJars(Arrays.stream(config.getJarFiles()).map(path -> URLUtil.getURL(FileUtil.file(path))).collect(Collectors.toList()));
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
}
job.setResult(InsertResult.success(gatewayResult.getAppId()));
......@@ -461,7 +456,6 @@ public class JobManager {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
}
// Perjob mode need to submit job graph.
jobGraph.addJars(Arrays.stream(config.getJarFiles()).map(path -> URLUtil.getURL(FileUtil.file(path))).collect(Collectors.toList()));
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
}
return gatewayResult;
......
......@@ -42,6 +42,7 @@ import lombok.Setter;
public class GatewayConfig {
private Integer taskId;
private String[] jarPaths;
private GatewayType type;
private ClusterConfig clusterConfig;
private FlinkConfig flinkConfig;
......
......@@ -43,9 +43,13 @@ import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import cn.hutool.core.io.FileUtil;
/**
* YarnApplicationGateway
......@@ -98,6 +102,7 @@ public class YarnApplicationGateway extends YarnGateway {
if (configuration.contains(TaskManagerOptions.NUM_TASK_SLOTS)) {
clusterSpecificationBuilder.setSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS)).createClusterSpecification();
}
yarnClusterDescriptor.addShipFiles(Arrays.stream(config.getJarPaths()).map(FileUtil::file).collect(Collectors.toList()));
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
......
......@@ -40,8 +40,13 @@ import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.URLUtil;
/**
* YarnApplicationGateway
......@@ -83,6 +88,8 @@ public class YarnPerJobGateway extends YarnGateway {
clusterSpecificationBuilder.setSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS)).createClusterSpecification();
}
jobGraph.addJars(Arrays.stream(config.getJarPaths()).map(path -> URLUtil.getURL(FileUtil.file(path))).collect(Collectors.toList()));
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployJobCluster(
clusterSpecificationBuilder.createClusterSpecification(), jobGraph, true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment