Unverified Commit 4947b119 authored by ZackYoung's avatar ZackYoung Committed by GitHub

[Feature]python udf online development (#1138)

* add python udf backend

* add python udf frontend

* 修改 flink-python 1.15只支持2.12版本

* add required file

* add required file

* Modify code structure

* Modify code structure;
修改获取python udf函数或字段名
parent 2e37c4d1
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.model;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
/**
* @author ZackYoung
* @since 0.6.8
*/
@Getter
@Setter
@Builder
public class UDFPath {
String[] jarPaths;
String[] pyPaths;
}
......@@ -20,6 +20,7 @@
package com.dlink.service;
import com.dlink.gateway.GatewayType;
import com.dlink.model.UDFPath;
/**
* @author ZackYoung
......@@ -27,5 +28,5 @@ import com.dlink.gateway.GatewayType;
*/
public interface UDFService {
String[] initUDF(String statement, GatewayType gatewayType);
UDFPath initUDF(String statement, GatewayType gatewayType);
}
......@@ -31,6 +31,7 @@ import com.dlink.dto.StudioExecuteDTO;
import com.dlink.dto.StudioMetaStoreDTO;
import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.job.JobConfig;
......@@ -47,6 +48,7 @@ import com.dlink.model.Schema;
import com.dlink.model.SystemConfiguration;
import com.dlink.model.Table;
import com.dlink.model.Task;
import com.dlink.model.UDFPath;
import com.dlink.process.context.ProcessContextHolder;
import com.dlink.process.model.ProcessEntity;
import com.dlink.process.model.ProcessType;
......@@ -175,7 +177,9 @@ public class StudioServiceImpl implements StudioService {
JobConfig config = studioExecuteDTO.getJobConfig();
buildSession(config);
// To initialize java udf, but it only support local mode.
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement(), config.getGatewayConfig().getType()));
UDFPath udfPath = udfService.initUDF(studioExecuteDTO.getStatement(), config.getGatewayConfig() == null ? null : config.getGatewayConfig().getType());
config.setJarFiles(udfPath.getJarPaths());
config.setPyFiles(udfPath.getPyPaths());
JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
RunTimeUtil.recovery(jobManager);
......@@ -259,7 +263,9 @@ public class StudioServiceImpl implements StudioService {
buildSession(config);
process.infoSuccess();
// To initialize java udf, but it has a bug in the product environment now.
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement(), config.getGatewayConfig().getType()));
UDFPath udfPath = udfService.initUDF(studioExecuteDTO.getStatement(), GatewayType.get(config.getType()));
config.setJarFiles(udfPath.getJarPaths());
config.setPyFiles(udfPath.getPyPaths());
process.start();
JobManager jobManager = JobManager.buildPlanMode(config);
List<SqlExplainResult> sqlExplainResults =
......@@ -311,7 +317,9 @@ public class StudioServiceImpl implements StudioService {
// If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local.
config.buildLocal();
buildSession(config);
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement(), config.getGatewayConfig().getType()));
UDFPath udfPath = udfService.initUDF(studioExecuteDTO.getStatement(), GatewayType.get(config.getType()));
config.setJarFiles(udfPath.getJarPaths());
config.setPyFiles(udfPath.getPyPaths());
JobManager jobManager = JobManager.buildPlanMode(config);
String planJson = jobManager.getJobPlanJson(studioExecuteDTO.getStatement());
ObjectMapper mapper = new ObjectMapper();
......
......@@ -73,6 +73,7 @@ import com.dlink.model.Task;
import com.dlink.model.TaskOperatingSavepointSelect;
import com.dlink.model.TaskOperatingStatus;
import com.dlink.model.TaskVersion;
import com.dlink.model.UDFPath;
import com.dlink.result.SqlExplainResult;
import com.dlink.result.TaskOperatingResult;
import com.dlink.service.AlertGroupService;
......@@ -93,6 +94,7 @@ import com.dlink.service.TaskVersionService;
import com.dlink.service.UDFService;
import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.UDFUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -190,7 +192,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private String buildParas(Integer id) {
return "--id " + id + " --driver " + driver + " --url " + url + " --username " + username + " --password "
+ password;
+ password;
}
@Override
......@@ -199,10 +201,12 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if (Dialect.notFlinkSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
task.getDatabaseId(), null));
}
JobConfig config = buildJobConfig(task);
config.setJarFiles(udfService.initUDF(task.getStatement(), config.getGatewayConfig().getType()));
UDFPath udfPath = udfService.initUDF(task.getStatement(), config.getGatewayConfig().getType());
config.setJarFiles(udfPath.getJarPaths());
config.setPyFiles(udfPath.getPyPaths());
JobManager jobManager = JobManager.build(config);
if (!config.isJarTask()) {
return jobManager.executeSql(task.getStatement());
......@@ -218,7 +222,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.setStep(JobLifeCycle.ONLINE.getValue());
if (Dialect.notFlinkSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
task.getDatabaseId(), null));
}
JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config);
......@@ -238,7 +242,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
if (Dialect.notFlinkSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
task.getDatabaseId(), null));
}
if (StringUtils.isBlank(savePointPath)) {
task.setSavePointStrategy(SavePointStrategy.LATEST.getValue());
......@@ -359,17 +363,19 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
public boolean saveOrUpdateTask(Task task) {
// to compiler java udf
if (Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.equalsVal(task.getDialect())
&& Asserts.isNotNullString(task.getStatement())) {
&& Asserts.isNotNullString(task.getStatement())) {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement());
task.setSavePointPath(compiler.getFullClassName());
} else if (Dialect.PYTHON.equalsVal(task.getDialect())) {
task.setSavePointPath(task.getName() + "." + UDFUtil.getPyUDFAttr(task.getStatement()));
}
// if modify task else create task
if (task.getId() != null) {
Task taskInfo = getById(task.getId());
Assert.check(taskInfo);
if (JobLifeCycle.RELEASE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
|| JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
throw new BusException("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止修改!");
}
task.setStep(JobLifeCycle.DEVELOP.getValue());
......@@ -478,7 +484,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public Task getUDFByClassName(String className) {
Task task = getOne(
new QueryWrapper<Task>().eq("dialect", "Java").eq("enabled", 1).eq("save_point_path", className));
new QueryWrapper<Task>().in("dialect", "Java", "Python").eq("enabled", 1).eq("save_point_path", className));
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
return task;
......@@ -487,7 +493,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public List<Task> getAllUDF() {
List<Task> tasks =
list(new QueryWrapper<Task>().eq("dialect", "Java").eq("enabled", 1).isNotNull("save_point_path"));
list(new QueryWrapper<Task>().in("dialect", "Java", "Python").eq("enabled", 1).isNotNull("save_point_path"));
return tasks.stream().peek(task -> {
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
......@@ -496,6 +502,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public Result releaseTask(Integer id) {
Task task = getTaskInfoById(id);
Assert.check(task);
if (JobLifeCycle.DEVELOP.equalsValue(task.getStep())) {
......@@ -523,7 +530,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
List<TaskVersion> taskVersions = taskVersionService.getTaskVersionByTaskId(task.getId());
List<Integer> versionIds = taskVersions.stream().map(TaskVersion::getVersionId).collect(Collectors.toList());
Map<Integer, TaskVersion> versionMap =
taskVersions.stream().collect(Collectors.toMap(TaskVersion::getVersionId, t -> t));
taskVersions.stream().collect(Collectors.toMap(TaskVersion::getVersionId, t -> t));
TaskVersion taskVersion = new TaskVersion();
BeanUtil.copyProperties(task, taskVersion);
......@@ -562,14 +569,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Task taskInfo = getTaskInfoById(dto.getId());
if (JobLifeCycle.RELEASE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
|| JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
// throw new BusException("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
return Result.failed("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
}
LambdaQueryWrapper<TaskVersion> queryWrapper = new LambdaQueryWrapper<TaskVersion>()
.eq(TaskVersion::getTaskId, dto.getId()).eq(TaskVersion::getVersionId, dto.getVersionId());
.eq(TaskVersion::getTaskId, dto.getId()).eq(TaskVersion::getVersionId, dto.getVersionId());
TaskVersion taskVersion = taskVersionService.getOne(queryWrapper);
......@@ -716,7 +723,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
useGateway = true;
......@@ -752,7 +759,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private JobConfig buildJobConfig(Task task) {
boolean isJarTask = Dialect.FLINKJAR.equalsVal(task.getDialect())
|| Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect());
|| Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect());
if (!isJarTask && Asserts.isNotNull(task.getFragment()) ? task.getFragment() : false) {
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
if (Asserts.isNotNullString(flinkWithSql)) {
......@@ -772,20 +779,20 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
// support custom K8s app submit, rather than clusterConfiguration
else if (Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect())
&& GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
&& GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
Map<String, Object> gatewayConfig = JSONUtil.toMap(task.getStatement(), String.class, Object.class);
config.buildGatewayConfig(gatewayConfig);
} else {
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
// submit application type with clusterConfiguration
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if (!isJarTask) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas",
systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarMainAppClass", systemConfiguration.getSqlSubmitJarMainAppClass());
} else {
Jar jar = jarService.getById(task.getJarId());
......@@ -842,7 +849,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(
clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
}
jobInfoDetail.setHistory(history);
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(id));
......@@ -852,12 +859,12 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return jobInfoDetail.getInstance();
}
JobHistory jobHistoryJson =
jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(),
jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(),
jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
jobInfoDetail.setJobHistory(jobHistory);
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus())
&& (Asserts.isNull(jobHistory.getJob()) || jobHistory.isError())) {
&& (Asserts.isNull(jobHistory.getJob()) || jobHistory.isError())) {
return jobInfoDetail.getInstance();
}
String status = jobInfoDetail.getInstance().getStatus();
......@@ -866,12 +873,12 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue());
} else {
jobInfoDetail.getInstance().setDuration(
jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
jobInfoDetail.getInstance()
.setStatus(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
.setStatus(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
}
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus())
&& !status.equals(jobInfoDetail.getInstance().getStatus())) {
&& !status.equals(jobInfoDetail.getInstance().getStatus())) {
jobStatusChanged = true;
jobInfoDetail.getInstance().setFinishTime(LocalDateTime.now());
// handleJobDone(jobInfoDetail.getInstance());
......@@ -888,7 +895,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private boolean inRefreshPlan(JobInstance jobInstance) {
if ((!JobStatus.isDone(jobInstance.getStatus())) || (Asserts.isNotNull(jobInstance.getFinishTime())
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
return true;
} else {
return false;
......@@ -935,9 +942,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
// clusterConfigurationName
if (Asserts.isNotNull(task.getClusterConfigurationId())) {
ClusterConfiguration clusterConfiguration =
clusterConfigurationService.getById(task.getClusterConfigurationId());
clusterConfigurationService.getById(task.getClusterConfigurationId());
((ObjectNode) jsonNode).put("clusterConfigurationName",
Asserts.isNotNull(clusterConfiguration) ? clusterConfiguration.getName() : null);
Asserts.isNotNull(clusterConfiguration) ? clusterConfiguration.getName() : null);
}
// databaseName
if (Asserts.isNotNull(task.getDatabaseId())) {
......@@ -1013,15 +1020,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
if (Asserts.isNotNull(task.getClusterConfigurationName())) {
ClusterConfiguration clusterConfiguration = clusterConfigurationService
.getOne(new QueryWrapper<ClusterConfiguration>().eq("name",
task.getClusterConfigurationName()));
.getOne(new QueryWrapper<ClusterConfiguration>().eq("name",
task.getClusterConfigurationName()));
if (Asserts.isNotNull(clusterConfiguration)) {
task.setClusterConfigurationId(clusterConfiguration.getId());
}
}
if (Asserts.isNotNull(task.getDatabaseName())) {
DataBase dataBase =
dataBaseService.getOne(new QueryWrapper<DataBase>().eq("name", task.getDatabaseName()));
dataBaseService.getOne(new QueryWrapper<DataBase>().eq("name", task.getDatabaseName()));
if (Asserts.isNotNull(dataBase)) {
task.setDatabaseId(dataBase.getId());
}
......@@ -1038,7 +1045,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
*/
if (Asserts.isNotNull(task.getAlertGroupName())) {
AlertGroup alertGroup =
alertGroupService.getOne(new QueryWrapper<AlertGroup>().eq("name", task.getAlertGroupName()));
alertGroupService.getOne(new QueryWrapper<AlertGroup>().eq("name", task.getAlertGroupName()));
if (Asserts.isNotNull(alertGroup)) {
task.setAlertGroupId(alertGroup.getId());
}
......@@ -1074,7 +1081,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return Result.failed("一共" + jsonNodes.size() + "个作业,全部导入失败");
} else if (errorNumber > 0) {
return Result.failed("一共" + jsonNodes.size() + "个作业,其中成功导入" + (jsonNode.size() - errorNumber) + "个,失败"
+ errorNumber + "个");
+ errorNumber + "个");
}
return Result.succeed("成功导入" + jsonNodes.size() + "个作业");
}
......@@ -1125,7 +1132,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
long minutes = ChronoUnit.MINUTES.between(startTime, endTime);
long seconds = ChronoUnit.SECONDS.between(startTime, endTime);
String duration = days + "天 " + (hours - (days * 24)) + "小时 " + (minutes - (hours * 60)) + "分 "
+ (seconds - (minutes * 60)) + "秒";
+ (seconds - (minutes * 60)) + "秒";
return duration;
}
......@@ -1205,7 +1212,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private void sendAlert(AlertInstance alertInstance, JobInstance jobInstance, Task task, AlertMsg alertMsg) {
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(),
JSONUtil.toMap(alertInstance.getParams()));
JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.build(alertConfig);
String title = "任务【" + task.getAlias() + "】:" + jobInstance.getStatus();
String content = alertMsg.toString();
......@@ -1224,10 +1231,10 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public Result queryAllCatalogue() {
final LambdaQueryWrapper<Catalogue> queryWrapper = new LambdaQueryWrapper<Catalogue>()
.select(Catalogue::getId, Catalogue::getName, Catalogue::getParentId)
.eq(Catalogue::getIsLeaf, 0)
.eq(Catalogue::getEnabled, 1)
.isNull(Catalogue::getTaskId);
.select(Catalogue::getId, Catalogue::getName, Catalogue::getParentId)
.eq(Catalogue::getIsLeaf, 0)
.eq(Catalogue::getEnabled, 1)
.isNull(Catalogue::getTaskId);
final List<Catalogue> catalogueList = catalogueService.list(queryWrapper);
return Result.succeed(TreeUtil.build(dealWithCatalogue(catalogueList), -1).get(0));
}
......@@ -1253,7 +1260,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
public Result<List<Task>> queryOnLineTaskByDoneStatus(List<JobLifeCycle> jobLifeCycle, List<JobStatus> jobStatuses,
boolean includeNull, Integer catalogueId) {
final Tree<Integer> node = ((Tree<Integer>) queryAllCatalogue().getDatas())
.getNode(Objects.isNull(catalogueId) ? 0 : catalogueId);
.getNode(Objects.isNull(catalogueId) ? 0 : catalogueId);
final List<Integer> parentIds = new ArrayList<>(0);
parentIds.add(node.getId());
childrenNodeParse(node, parentIds);
......@@ -1264,8 +1271,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private List<Task> getTasks(List<JobLifeCycle> jobLifeCycle, List<JobStatus> jobStatuses, boolean includeNull,
List<Integer> parentIds) {
return this.baseMapper.queryOnLineTaskByDoneStatus(parentIds,
jobLifeCycle.stream().filter(Objects::nonNull).map(JobLifeCycle::getValue).collect(Collectors.toList()),
includeNull, jobStatuses.stream().map(JobStatus::name).collect(Collectors.toList()));
jobLifeCycle.stream().filter(Objects::nonNull).map(JobLifeCycle::getValue).collect(Collectors.toList()),
includeNull, jobStatuses.stream().map(JobStatus::name).collect(Collectors.toList()));
}
private void childrenNodeParse(Tree<Integer> node, List<Integer> parentIds) {
......@@ -1285,7 +1292,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public void selectSavepointOnLineTask(TaskOperatingResult taskOperatingResult) {
final JobInstance jobInstanceByTaskId =
jobInstanceService.getJobInstanceByTaskId(taskOperatingResult.getTask().getId());
jobInstanceService.getJobInstanceByTaskId(taskOperatingResult.getTask().getId());
if (jobInstanceByTaskId == null) {
startGoingLiveTask(taskOperatingResult, null);
return;
......@@ -1304,8 +1311,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private void findTheConditionSavePointToOnline(TaskOperatingResult taskOperatingResult,
JobInstance jobInstanceByTaskId) {
final LambdaQueryWrapper<JobHistory> queryWrapper = new LambdaQueryWrapper<JobHistory>()
.select(JobHistory::getId, JobHistory::getCheckpointsJson)
.eq(JobHistory::getId, jobInstanceByTaskId.getId());
.select(JobHistory::getId, JobHistory::getCheckpointsJson)
.eq(JobHistory::getId, jobInstanceByTaskId.getId());
final JobHistory jobHistory = jobHistoryService.getOne(queryWrapper);
if (jobHistory != null && StringUtils.isNotBlank(jobHistory.getCheckpointsJson())) {
final ObjectNode jsonNodes = JSONUtil.parseObject(jobHistory.getCheckpointsJson());
......
......@@ -22,23 +22,27 @@ package com.dlink.service.impl;
import com.dlink.constant.PathConstant;
import com.dlink.exception.BusException;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobManager;
import com.dlink.model.Task;
import com.dlink.model.UDFPath;
import com.dlink.process.context.ProcessContextHolder;
import com.dlink.process.model.ProcessEntity;
import com.dlink.service.TaskService;
import com.dlink.service.UDFService;
import com.dlink.udf.UDF;
import com.dlink.utils.UDFUtil;
import org.apache.flink.table.catalog.FunctionLanguage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.map.MapUtil;
......@@ -62,24 +66,44 @@ public class UDFServiceImpl implements UDFService {
TaskService taskService;
@Override
public String[] initUDF(String statement, GatewayType gatewayType) {
public UDFPath initUDF(String statement, GatewayType gatewayType) {
if (gatewayType == GatewayType.KUBERNETES_APPLICATION) {
throw new BusException("udf 暂不支持k8s application");
}
ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Initializing Flink UDF...Start");
Opt<String> udfJarPath = Opt.empty();
List<String> udfClassNameList = UDFUtil.getUDFClassName(statement);
List<String> codeList = CollUtil.map(udfClassNameList, x -> {
Task task = taskService.getUDFByClassName(x);
JobManager.initMustSuccessUDF(x, task.getStatement());
return task.getStatement();
}, true);
if (codeList.size() > 0) {
udfJarPath = Opt.ofBlankAble(UDFUtil.getUdfNameAndBuildJar(codeList));
} else {
if (gatewayType == GatewayType.KUBERNETES_APPLICATION) {
throw new BusException("udf 暂不支持k8s application");
List<UDF> udfList = UDFUtil.getUDF(statement);
List<UDF> javaUdf = new ArrayList<>();
List<UDF> pythonUdf = new ArrayList<>();
udfList.forEach(udf -> {
Task task = taskService.getUDFByClassName(udf.getClassName());
udf.setCode(task.getStatement());
if (udf.getFunctionLanguage() == FunctionLanguage.PYTHON) {
pythonUdf.add(udf);
} else {
javaUdf.add(udf);
}
}
});
String[] javaUDFPath = initJavaUDF(javaUdf);
String[] pythonUDFPath = initPythonUDF(pythonUdf);
process.info("Initializing Flink UDF...Finish");
return UDFPath.builder().jarPaths(javaUDFPath).pyPaths(pythonUDFPath).build();
}
private static String[] initPythonUDF(List<UDF> udfList) {
return new String[] {UDFUtil.buildPy(udfList)};
}
private static String[] initJavaUDF(List<UDF> udfList) {
Opt<String> udfJarPath = Opt.empty();
if (udfList.size() > 0) {
List<String> codeList = udfList.stream().map(UDF::getCode).collect(Collectors.toList());
udfJarPath = Opt.ofBlankAble(UDFUtil.getUdfFileAndBuildJar(codeList));
}
if (udfJarPath.isPresent()) {
return new String[] {PathConstant.UDF_PATH + udfJarPath.get()};
} else {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
/** Categorizes the language semantics of a {@link CatalogFunction}. */
public enum FunctionLanguage {
JAVA,
SCALA,
PYTHON
}
......@@ -29,7 +29,7 @@ import com.dlink.assertion.Asserts;
**/
public enum Dialect {
FLINKSQL("FlinkSql"), FLINKJAR("FlinkJar"), FLINKSQLENV("FlinkSqlEnv"), SQL("Sql"), JAVA("Java"),
FLINKSQL("FlinkSql"), FLINKJAR("FlinkJar"), FLINKSQLENV("FlinkSqlEnv"), SQL("Sql"), JAVA("Java"),PYTHON("PYTHON"),
MYSQL("Mysql"), ORACLE("Oracle"), SQLSERVER("SqlServer"), POSTGRESQL("PostgreSql"), CLICKHOUSE("ClickHouse"),
DORIS("Doris"), PHOENIX("Phoenix"), HIVE("Hive"), STARROCKS("StarRocks"), KUBERNETES_APPLICATION("KubernetesApplaction");
......
......@@ -23,6 +23,7 @@ import java.io.File;
/**
* 文件路径常量
*
* @author ZackYoung
* @since 0.6.8
*/
......@@ -40,6 +41,7 @@ public class PathConstant {
* udf路径
*/
public static final String UDF_PATH = TMP_PATH + "udf" + File.separator;
public static final String UDF_PYTHON_PATH = TMP_PATH + "udf" + File.separator + "python" + File.separator;
/**
* udf jar规则
*/
......
......@@ -65,6 +65,7 @@ public class JobConfig {
private String address;
private Integer taskId;
private String[] jarFiles;
private String[] pyFiles;
private String jobName;
private boolean useSqlFragment;
private boolean useStatementSet;
......
......@@ -161,7 +161,11 @@ public class JobManager {
JobManager manager = new JobManager(config);
manager.init();
manager.executor.initUDF(config.getJarFiles());
config.getGatewayConfig().setJarPaths(config.getJarFiles());
manager.executor.initPyUDF(config.getPyFiles());
if (config.getGatewayConfig() != null) {
config.getGatewayConfig().setJarPaths(config.getJarFiles());
}
return manager;
}
......@@ -187,7 +191,7 @@ public class JobManager {
public static boolean useGateway(String type) {
return (GatewayType.YARN_PER_JOB.equalsValue(type) || GatewayType.YARN_APPLICATION.equalsValue(type)
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(type));
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(type));
}
private Executor createExecutor() {
......@@ -270,7 +274,7 @@ public class JobManager {
ready();
String currentSql = "";
JobParam jobParam = Explainer.build(executor, useStatementSet, sqlSeparator)
.pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
.pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
try {
for (StatementParam item : jobParam.getDdl()) {
currentSql = item.getValue();
......@@ -315,10 +319,10 @@ public class JobManager {
if (config.isUseResult()) {
// Build insert result.
IResult result =
ResultBuilder
.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(),
config.isUseAutoCancel(), executor.getTimeZone())
.getResult(tableResult);
ResultBuilder
.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(),
config.isUseAutoCancel(), executor.getTimeZone())
.getResult(tableResult);
job.setResult(result);
}
}
......@@ -339,13 +343,13 @@ public class JobManager {
for (StatementParam item : jobParam.getTrans()) {
currentSql = item.getValue();
FlinkInterceptorResult flinkInterceptorResult =
FlinkInterceptor.build(executor, item.getValue());
FlinkInterceptor.build(executor, item.getValue());
if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) {
if (config.isUseResult()) {
IResult result = ResultBuilder
.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(),
config.isUseAutoCancel(), executor.getTimeZone())
.getResult(flinkInterceptorResult.getTableResult());
.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(),
config.isUseAutoCancel(), executor.getTimeZone())
.getResult(flinkInterceptorResult.getTableResult());
job.setResult(result);
}
} else {
......@@ -362,9 +366,9 @@ public class JobManager {
}
if (config.isUseResult()) {
IResult result =
ResultBuilder.build(item.getType(), config.getMaxRowNum(),
config.isUseChangeLog(), config.isUseAutoCancel(),
executor.getTimeZone()).getResult(tableResult);
ResultBuilder.build(item.getType(), config.getMaxRowNum(),
config.isUseChangeLog(), config.isUseAutoCancel(),
executor.getTimeZone()).getResult(tableResult);
job.setResult(result);
}
}
......@@ -392,7 +396,7 @@ public class JobManager {
JobGraph jobGraph = streamGraph.getJobGraph();
if (Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
}
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
}
......@@ -419,8 +423,8 @@ public class JobManager {
}
if (config.isUseResult()) {
IResult result =
ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(),
config.isUseAutoCancel(), executor.getTimeZone()).getResult(null);
ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(),
config.isUseAutoCancel(), executor.getTimeZone()).getResult(null);
job.setResult(result);
}
}
......@@ -485,7 +489,7 @@ public class JobManager {
LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(newStatement);
result = ResultBuilder.build(operationType, config.getMaxRowNum(), false, false, executor.getTimeZone())
.getResult(tableResult);
.getResult(tableResult);
result.setStartTime(startTime);
}
return result;
......@@ -506,12 +510,12 @@ public class JobManager {
Executor sessionExecutor = null;
if (sessionConfig.isUseRemote()) {
sessionExecutor = Executor.buildRemoteExecutor(EnvironmentSetting.build(sessionConfig.getAddress()),
ExecutorSetting.DEFAULT);
ExecutorSetting.DEFAULT);
} else {
sessionExecutor = Executor.buildLocalExecutor(sessionConfig.getExecutorSetting());
}
ExecutorEntity executorEntity =
new ExecutorEntity(session, sessionConfig, createUser, LocalDateTime.now(), sessionExecutor);
new ExecutorEntity(session, sessionConfig, createUser, LocalDateTime.now(), sessionExecutor);
SessionPool.push(executorEntity);
return SessionInfo.build(executorEntity);
}
......@@ -535,7 +539,7 @@ public class JobManager {
public boolean cancel(String jobId) {
if (useGateway && !useRestAPI) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(),
null, null));
null, null));
Gateway.build(config.getGatewayConfig()).savepointJob();
return true;
} else {
......@@ -551,7 +555,7 @@ public class JobManager {
public SavePointResult savepoint(String jobId, String savePointType, String savePoint) {
if (useGateway && !useRestAPI) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(),
savePointType, null));
savePointType, null));
return Gateway.build(config.getGatewayConfig()).savepointJob(savePoint);
} else {
return FlinkAPI.build(config.getAddress()).savepoints(jobId, savePointType);
......@@ -573,7 +577,7 @@ public class JobManager {
success();
} catch (Exception e) {
String error = LogUtil.getError(
"Exception in executing Jar:\n" + config.getGatewayConfig().getAppConfig().getUserJarPath(), e);
"Exception in executing Jar:\n" + config.getGatewayConfig().getAppConfig().getUserJarPath(), e);
job.setEndTime(LocalDateTime.now());
job.setStatus(Job.JobStatus.FAILED);
job.setError(error);
......@@ -599,15 +603,15 @@ public class JobManager {
}
if (Asserts.isNotNull(config.getCheckpoint())) {
sb.append("set " + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key() + " = "
+ config.getCheckpoint() + ";\r\n");
+ config.getCheckpoint() + ";\r\n");
}
if (Asserts.isNotNullString(config.getSavePointPath())) {
sb.append("set " + SavepointConfigOptions.SAVEPOINT_PATH + " = " + config.getSavePointPath() + ";\r\n");
}
if (Asserts.isNotNull(config.getGatewayConfig())
&& Asserts.isNotNull(config.getGatewayConfig().getFlinkConfig().getConfiguration())) {
&& Asserts.isNotNull(config.getGatewayConfig().getFlinkConfig().getConfiguration())) {
for (Map.Entry<String, String> entry : config.getGatewayConfig().getFlinkConfig().getConfiguration()
.entrySet()) {
.entrySet()) {
sb.append("set " + entry.getKey() + " = " + entry.getValue() + ";\r\n");
}
}
......@@ -616,16 +620,16 @@ public class JobManager {
case YARN_PER_JOB:
case YARN_APPLICATION:
sb.append("set " + DeploymentOptions.TARGET.key() + " = "
+ GatewayType.get(config.getType()).getLongValue() + ";\r\n");
+ GatewayType.get(config.getType()).getLongValue() + ";\r\n");
if (Asserts.isNotNull(config.getGatewayConfig())) {
sb.append("set " + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + " = "
+ Collections.singletonList(config.getGatewayConfig().getClusterConfig().getFlinkLibPath())
+ ";\r\n");
+ Collections.singletonList(config.getGatewayConfig().getClusterConfig().getFlinkLibPath())
+ ";\r\n");
}
if (Asserts.isNotNull(config.getGatewayConfig())
&& Asserts.isNotNullString(config.getGatewayConfig().getFlinkConfig().getJobName())) {
&& Asserts.isNotNullString(config.getGatewayConfig().getFlinkConfig().getJobName())) {
sb.append("set " + YarnConfigOptions.APPLICATION_NAME.key() + " = "
+ config.getGatewayConfig().getFlinkConfig().getJobName() + ";\r\n");
+ config.getGatewayConfig().getFlinkConfig().getJobName() + ";\r\n");
}
break;
default:
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.udf;
import org.apache.flink.table.catalog.FunctionLanguage;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
/**
* @author ZackYoung
* @since 0.6.8
*/
@Getter
@Setter
@Builder
public class UDF {
String className;
FunctionLanguage functionLanguage;
String code;
}
......@@ -24,9 +24,12 @@ import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool;
import com.dlink.process.context.ProcessContextHolder;
import com.dlink.process.model.ProcessEntity;
import com.dlink.udf.UDF;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.catalog.FunctionLanguage;
import java.io.File;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
......@@ -36,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.slf4j.Logger;
......@@ -44,6 +48,8 @@ import org.slf4j.LoggerFactory;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.PatternPool;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
......@@ -64,19 +70,51 @@ public class UDFUtil {
*/
protected static final Map<String, Integer> UDF_MD5_MAP = new HashMap<>();
private static final String FUNCTION_REGEX = "function (.*?)'(.*?)'";
private static final String LANGUAGE_REGEX = "language (.*);";
public static final String PYTHON_UDF_ATTR = "(\\S)\\s+=\\s+ud(?:f|tf|af|taf)";
public static final String PYTHON_UDF_DEF = "@ud(?:f|tf|af|taf).*\\n+def\\s+(.*)\\(.*\\):";
public static List<String> getUDFClassName(String statement) {
public static List<UDF> getUDF(String statement) {
ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Parse UDF class name:");
Pattern pattern = Pattern.compile(FUNCTION_REGEX, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(statement);
List<String> classNameList = new ArrayList<>();
List<UDF> udfList = new ArrayList<>();
while (matcher.find()) {
classNameList.add(matcher.group(2));
UDF udf = UDF.builder().className(matcher.group(2))
.functionLanguage(FunctionLanguage.valueOf(Opt.ofNullable(ReUtil.getGroup1(PatternPool.get(LANGUAGE_REGEX, Pattern.CASE_INSENSITIVE), statement)).orElse("JAVA").toUpperCase()))
.build();
udfList.add(udf);
}
List<String> classNameList = udfList.stream().map(UDF::getClassName).collect(Collectors.toList());
process.info(StringUtils.join(",", classNameList));
process.info(StrUtil.format("A total of {} UDF have been Parsed.", classNameList.size()));
return classNameList;
return udfList;
}
public static String buildPy(List<UDF> udfList) {
InputStream[] inputStreams = udfList.stream().map(x -> {
String s = buildPy(x);
return FileUtil.getInputStream(s);
}).toArray(InputStream[]::new);
String[] paths = udfList.stream().map(x -> StrUtil.split(x.getClassName(), ".").get(0) + ".py").toArray(String[]::new);
File file = FileUtil.file(PathConstant.UDF_PYTHON_PATH + "python_udf.zip");
FileUtil.del(file);
try (ZipUtils zipWriter = new ZipUtils(file, Charset.defaultCharset())) {
zipWriter.add(paths, inputStreams);
}
return file.getAbsolutePath();
}
public static String buildPy(UDF udf) {
File file = FileUtil.writeUtf8String(udf.getCode(), PathConstant.UDF_PYTHON_PATH + StrUtil.split(udf.getClassName(), ".").get(0) + ".py");
return file.getAbsolutePath();
}
public static String getPyUDFAttr(String code) {
return Opt.ofBlankAble(ReUtil.getGroup1(UDFUtil.PYTHON_UDF_ATTR, code))
.orElse(ReUtil.getGroup1(UDFUtil.PYTHON_UDF_DEF, code));
}
public static Boolean buildClass(String code) {
......@@ -129,17 +167,17 @@ public class UDFUtil {
}
});
String[] clazzs = successList.stream().map(className -> StrUtil.replace(className, ".", "/") + ".class")
.toArray(String[]::new);
.toArray(String[]::new);
InputStream[] fileInputStreams =
successList.stream().map(className -> tmpPath + StrUtil.replace(className, ".", "/") + ".class")
.map(FileUtil::getInputStream).toArray(InputStream[]::new);
successList.stream().map(className -> tmpPath + StrUtil.replace(className, ".", "/") + ".class")
.map(FileUtil::getInputStream).toArray(InputStream[]::new);
// 编译好的文件打包jar
try (ZipUtils zipWriter = new ZipUtils(FileUtil.file(udfJarPath), Charset.defaultCharset())) {
zipWriter.add(clazzs, fileInputStreams);
}
String md5 = md5sum(udfJarPath);
return MapUtil.builder("success", successList).put("failed", failedList)
.put("md5", Collections.singletonList(md5)).build();
.put("md5", Collections.singletonList(md5)).build();
}
/**
......@@ -148,7 +186,7 @@ public class UDFUtil {
* @param codeList 代码列表
* @return {@link java.lang.String}
*/
public static String getUdfNameAndBuildJar(List<String> codeList) {
public static String getUdfFileAndBuildJar(List<String> codeList) {
// 1. 检查所有jar的版本,通常名字为 udf-${version}.jar;如 udf-1.jar,没有这个目录则跳过
String md5 = buildJar(codeList).get("md5").get(0);
if (!FileUtil.exist(PathConstant.UDF_PATH)) {
......@@ -184,10 +222,10 @@ public class UDFUtil {
private static void scanUDFMD5() {
List<String> fileList = FileUtil.listFileNames(PathConstant.UDF_PATH);
fileList.stream().filter(fileName -> ReUtil.isMatch(PathConstant.UDF_JAR_RULE, fileName)).distinct()
.forEach(fileName -> {
Integer version = Convert.toInt(ReUtil.getGroup0(PathConstant.UDF_VERSION_RULE, fileName));
UDF_MD5_MAP.put(md5sum(PathConstant.UDF_PATH + fileName), version);
});
.forEach(fileName -> {
Integer version = Convert.toInt(ReUtil.getGroup0(PathConstant.UDF_VERSION_RULE, fileName));
UDF_MD5_MAP.put(md5sum(PathConstant.UDF_PATH + fileName), version);
});
}
private static String md5sum(String filePath) {
......
......@@ -30,6 +30,7 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
......@@ -268,6 +269,14 @@ public abstract class Executor {
JarUtils.getJarFiles(udfFilePath).forEach(Executor::loadJar);
}
public void initPyUDF(String... udfPyFilePath) {
Map<String, String> config = executorSetting.getConfig();
if (Asserts.isAllNotNullString(udfPyFilePath) && Asserts.isNotNull(config)) {
config.put(PythonOptions.PYTHON_FILES.key(), String.join(",", udfPyFilePath));
}
update(executorSetting);
}
private static void loadJar(final URL jarUrl) {
// 从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
......
......@@ -39,6 +39,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
......
......@@ -39,6 +39,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
......
......@@ -39,6 +39,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
......
......@@ -122,6 +122,11 @@
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
......
......@@ -41,6 +41,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
......
......@@ -57,6 +57,9 @@ const StudioRightTool = (props: any) => {
if (DIALECT.JAVA === current.task.dialect) {
return renderUDFContent();
}
if (DIALECT.PYTHON === current.task.dialect) {
return renderUDFContent();
}
if (DIALECT.KUBERNETES_APPLICATION === current.task.dialect) {
return renderKubernetesContent();
}
......
......@@ -131,11 +131,22 @@ const EditorTabs = (props: any) => {
// sqlMetaData={pane.sqlMetaData}
height={height ? height : (toolHeight - 32)}
width={width}
language={current.task.dialect === DIALECT.JAVA ? 'java' : 'sql'}
language={getLanguage(current.task.dialect)}
/>
</TabPane>)
}
}
const getLanguage = (dialect: string) => {
switch (dialect) {
case DIALECT.JAVA:
return DIALECT.JAVA.toLowerCase()
case DIALECT.PYTHON:
return DIALECT.PYTHON.toLowerCase()
default:
return DIALECT.SQL.toLowerCase()
}
}
return (
<>
......
......@@ -90,6 +90,7 @@ const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => {
<Option value={DIALECT.PHOENIX}>{DIALECT.PHOENIX}</Option>
<Option value={DIALECT.STARROCKS}>{DIALECT.STARROCKS}</Option>
<Option value={DIALECT.JAVA}>{DIALECT.JAVA}</Option>
<Option value={DIALECT.PYTHON}>{DIALECT.PYTHON}</Option>
<Option value={DIALECT.SQL}>{DIALECT.SQL}</Option>
</Select>
</Form.Item>) : undefined}
......
......@@ -44,6 +44,7 @@ export const DIALECT = {
STARROCKS: 'StarRocks',
KUBERNETES_APPLICATION: 'KubernetesApplaction',
JAVA: 'Java',
PYTHON: 'Python',
};
export const CHART = {
......
......@@ -49,6 +49,8 @@ export const getIcon = (type: string) => {
return (<Icon component={StarRocksSvg}/>);
case DIALECT.JAVA:
return (<Icon component={JavaSvg}/>);
case DIALECT.PYTHON:
return (<Icon component={PythonSvg}/>);
default:
return (<Icon component={FlinkSQLSvg}/>);
}
......@@ -387,6 +389,17 @@ export const JavaSvg = () => (
fill="#6699FF" p-id="19722"></path>
</svg>
);
export const PythonSvg = () => (
<svg t="1666454409766" className="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg"
p-id="2303" width={svgSize} height={svgSize}>
<path
d="M366.635375 495.627875c8.93024999-1.488375 17.8605-2.480625 26.79075-2.48062499h-7.44187499 241.61287499c10.418625 0 20.341125-1.488375 30.26362501-3.969 44.65124999-12.403125 77.3955-52.093125 77.3955-101.20950002V185.053625c0-57.5505-49.116375-101.2095-107.65912501-110.63587501-37.209375-5.9535-91.287-8.93024999-128.00025-8.93024999-36.71325001 0-71.938125 3.472875-103.194 8.93024999C305.115875 90.29374999 288.74374999 123.534125 288.74374999 185.053625v66.48075h223.25625001V288.74374999H216.3095C133.456625 288.74374999 65.983625 387.96874999 65.4875 510.0155v1.9845c0 22.325625 1.9845 43.659 6.449625 63.504C90.29374999 667.78325001 147.84424999 735.25625001 216.3095 735.25625001h35.224875v-106.66687501c0-62.51174999 46.63574999-120.558375 115.101-132.9615z m23.814-283.7835c-22.325625 0-40.68225001-18.356625-40.18612499-40.68225 0-22.325625 17.8605-40.68225001 40.18612499-40.68225s40.68225001 18.356625 40.68225 40.68225c-0.496125 22.82175001-18.356625 40.68225001-40.68225 40.68225z"
fill="#0075AA" p-id="2304"></path>
<path
d="M949.086125 434.108375C927.75275001 349.271 872.682875 288.74374999 807.6905 288.74374999h-35.224875v94.75987501c0 78.883875-51.597 135.93825001-115.101 145.86075-6.449625 0.99224999-12.89925001 1.488375-19.34887499 1.48837501H396.402875c-10.418625 0-20.341125 1.488375-30.26362499 3.969-44.65124999 11.907-77.3955 48.62025001-77.3955 96.74437499V834.48125001c0 57.5505 58.046625 91.783125 115.10099998 108.15524999 67.969125 19.845 142.387875 23.317875 224.24850002 0 54.077625-15.379875 107.163-46.63574999 107.16299998-108.15525001v-61.5195h-223.25624999V735.25625001h295.6905c58.54275001 0 109.643625-49.6125 134.449875-122.04675001 10.418625-30.263625 16.372125-64.49625001 16.372125-101.2095 0-27.286875-3.472875-53.5815-9.426375-77.891625z m-316.52775 372.58987501c22.325625 0 40.186125 18.356625 40.186125 40.68224999 0 22.325625-18.356625 40.68225001-40.186125 40.68225001-22.325625 0-40.68225001-18.356625-40.68225-40.68225001 0.496125-22.325625 18.356625-40.68225001 40.68225-40.68225001z"
fill="#FFD400" p-id="2305"></path>
</svg>
)
export const UDFSvg = () => (
<svg t="1640792304032" className="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg"
p-id="28067" width={svgSize} height={svgSize}>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment