Unverified Commit 88b77052 authored by ZackYoung's avatar ZackYoung Committed by GitHub

[Fix][udf,core] fix udf can not use 3 languages at the same time (#1162)

* 修复udf不能三种语言代码同时使用

* 添加 python udf yarn-application使用
parent 35810a97
...@@ -31,6 +31,7 @@ import com.dlink.trans.Operations; ...@@ -31,6 +31,7 @@ import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.python.PythonOptions;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
...@@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory; ...@@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory;
* @since 2021/10/27 * @since 2021/10/27
**/ **/
public class Submiter { public class Submiter {
private static final Logger logger = LoggerFactory.getLogger(Submiter.class); private static final Logger logger = LoggerFactory.getLogger(Submiter.class);
private static String getQuerySQL(Integer id) throws SQLException { private static String getQuerySQL(Integer id) throws SQLException {
...@@ -66,8 +68,8 @@ public class Submiter { ...@@ -66,8 +68,8 @@ public class Submiter {
throw new SQLException("请指定任务ID"); throw new SQLException("请指定任务ID");
} }
return "select id, name, alias as jobName, type,check_point as checkpoint," return "select id, name, alias as jobName, type,check_point as checkpoint,"
+ "save_point_path as savePointPath, parallelism,fragment as useSqlFragment,statement_set as useStatementSet,config_json as config," + "save_point_path as savePointPath, parallelism,fragment as useSqlFragment,statement_set as useStatementSet,config_json as config,"
+ " env_id as envId,batch_model AS useBatchModel from dlink_task where id = " + id; + " env_id as envId,batch_model AS useBatchModel from dlink_task where id = " + id;
} }
private static String getFlinkSQLStatement(Integer id, DBConfig config) { private static String getFlinkSQLStatement(Integer id, DBConfig config) {
...@@ -75,7 +77,8 @@ public class Submiter { ...@@ -75,7 +77,8 @@ public class Submiter {
try { try {
statement = DBUtil.getOneByID(getQuerySQL(id), config); statement = DBUtil.getOneByID(getQuerySQL(id), config);
} catch (IOException | SQLException e) { } catch (IOException | SQLException e) {
logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", LocalDateTime.now(), id, config.toString(), e.getMessage(), e); logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", LocalDateTime.now(), id,
config.toString(), e.getMessage(), e);
} }
return statement; return statement;
} }
...@@ -85,7 +88,8 @@ public class Submiter { ...@@ -85,7 +88,8 @@ public class Submiter {
try { try {
task = DBUtil.getMapByID(getTaskInfo(id), config); task = DBUtil.getMapByID(getTaskInfo(id), config);
} catch (IOException | SQLException e) { } catch (IOException | SQLException e) {
logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", LocalDateTime.now(), id, config.toString(), e.getMessage(), e); logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", LocalDateTime.now(), id,
config.toString(), e.getMessage(), e);
} }
return task; return task;
} }
...@@ -111,12 +115,13 @@ public class Submiter { ...@@ -111,12 +115,13 @@ public class Submiter {
String uuid = UUID.randomUUID().toString().replace("-", ""); String uuid = UUID.randomUUID().toString().replace("-", "");
if (executorSetting.getConfig().containsKey(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())) { if (executorSetting.getConfig().containsKey(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())) {
executorSetting.getConfig().put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(), executorSetting.getConfig().put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
executorSetting.getConfig().get(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()) + "/" + uuid); executorSetting.getConfig().get(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()) + "/" + uuid);
} }
if (executorSetting.getConfig().containsKey(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())) { if (executorSetting.getConfig().containsKey(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())) {
executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid); executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid);
} }
executorSetting.getConfig().put(PythonOptions.PYTHON_FILES.key(), "./python_udf.zip");
logger.info("作业配置如下: {}", executorSetting); logger.info("作业配置如下: {}", executorSetting);
Executor executor = Executor.buildAppStreamExecutor(executorSetting); Executor executor = Executor.buildAppStreamExecutor(executorSetting);
List<StatementParam> ddl = new ArrayList<>(); List<StatementParam> ddl = new ArrayList<>();
...@@ -184,6 +189,6 @@ public class Submiter { ...@@ -184,6 +189,6 @@ public class Submiter {
logger.error("执行失败, {}", e.getMessage(), e); logger.error("执行失败, {}", e.getMessage(), e);
} }
} }
logger.info("{}任务提交成功",LocalDateTime.now()); logger.info("{}任务提交成功", LocalDateTime.now());
} }
} }
...@@ -81,6 +81,8 @@ import org.slf4j.LoggerFactory; ...@@ -81,6 +81,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import cn.hutool.core.util.ArrayUtil;
/** /**
* JobManager * JobManager
* *
...@@ -164,7 +166,7 @@ public class JobManager { ...@@ -164,7 +166,7 @@ public class JobManager {
manager.executor.initPyUDF(config.getPyFiles()); manager.executor.initPyUDF(config.getPyFiles());
if (config.getGatewayConfig() != null) { if (config.getGatewayConfig() != null) {
config.getGatewayConfig().setJarPaths(config.getJarFiles()); config.getGatewayConfig().setJarPaths(ArrayUtil.append(config.getJarFiles(),config.getPyFiles()));
} }
return manager; return manager;
} }
......
...@@ -34,7 +34,20 @@ import lombok.Setter; ...@@ -34,7 +34,20 @@ import lombok.Setter;
@Setter @Setter
@Builder @Builder
public class UDF { public class UDF {
/**
* 函数名
*/
String name;
/**
* 类名
*/
String className; String className;
/**
* udf 代码语言
*/
FunctionLanguage functionLanguage; FunctionLanguage functionLanguage;
/**
* udf源代码
*/
String code; String code;
} }
...@@ -40,7 +40,6 @@ import java.util.Collections; ...@@ -40,7 +40,6 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -53,7 +52,6 @@ import cn.hutool.core.convert.Convert; ...@@ -53,7 +52,6 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Dict; import cn.hutool.core.lang.Dict;
import cn.hutool.core.lang.Opt; import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.PatternPool;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
...@@ -77,6 +75,7 @@ public class UDFUtil { ...@@ -77,6 +75,7 @@ public class UDFUtil {
*/ */
protected static final Map<String, Integer> UDF_MD5_MAP = new HashMap<>(); protected static final Map<String, Integer> UDF_MD5_MAP = new HashMap<>();
private static final String FUNCTION_REGEX = "function (.*?)'(.*?)'"; private static final String FUNCTION_REGEX = "function (.*?)'(.*?)'";
private static final String FUNCTION_SQL_REGEX = "create\\s+.*function\\s+(.*)\\s+as\\s+'(.*)'(\\s+language (.*))?;";
private static final String LANGUAGE_REGEX = "language (.*);"; private static final String LANGUAGE_REGEX = "language (.*);";
public static final String PYTHON_UDF_ATTR = "(\\S)\\s+=\\s+ud(?:f|tf|af|taf)"; public static final String PYTHON_UDF_ATTR = "(\\S)\\s+=\\s+ud(?:f|tf|af|taf)";
public static final String PYTHON_UDF_DEF = "@ud(?:f|tf|af|taf).*\\n+def\\s+(.*)\\(.*\\):"; public static final String PYTHON_UDF_DEF = "@ud(?:f|tf|af|taf).*\\n+def\\s+(.*)\\(.*\\):";
...@@ -117,15 +116,18 @@ public class UDFUtil { ...@@ -117,15 +116,18 @@ public class UDFUtil {
public static List<UDF> getUDF(String statement) { public static List<UDF> getUDF(String statement) {
ProcessEntity process = ProcessContextHolder.getProcess(); ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Parse UDF class name:"); process.info("Parse UDF class name:");
Pattern pattern = Pattern.compile(FUNCTION_REGEX, Pattern.CASE_INSENSITIVE); Pattern pattern = Pattern.compile(FUNCTION_SQL_REGEX, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(statement); List<String> udfSqlList = ReUtil.findAllGroup0(pattern, statement);
List<UDF> udfList = new ArrayList<>(); List<UDF> udfList = udfSqlList.stream().map(sql -> {
while (matcher.find()) { List<String> groups = ReUtil.getAllGroups(pattern, sql);
UDF udf = UDF.builder().className(matcher.group(2)) String udfName = groups.get(1);
.functionLanguage(FunctionLanguage.valueOf(Opt.ofNullable(ReUtil.getGroup1(PatternPool.get(LANGUAGE_REGEX, Pattern.CASE_INSENSITIVE), statement)).orElse("JAVA").toUpperCase())) String className = groups.get(2);
return UDF.builder()
.name(udfName)
.className(className)
.functionLanguage(FunctionLanguage.valueOf(Opt.ofNullable(groups.get(4)).orElse("JAVA").toUpperCase()))
.build(); .build();
udfList.add(udf); }).collect(Collectors.toList());
}
List<String> classNameList = udfList.stream().map(UDF::getClassName).collect(Collectors.toList()); List<String> classNameList = udfList.stream().map(UDF::getClassName).collect(Collectors.toList());
process.info(StringUtils.join(",", classNameList)); process.info(StringUtils.join(",", classNameList));
process.info(StrUtil.format("A total of {} UDF have been Parsed.", classNameList.size())); process.info(StrUtil.format("A total of {} UDF have been Parsed.", classNameList.size()));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment