Unverified Commit 7d66ec37 authored by ZackYoung's avatar ZackYoung Committed by GitHub

Udf template use (#1154)

* 添加数据开发 udf与udf模板使用

* 添加数据开发 udf与udf模板使用

* change sql

* 去掉库名
parent c841efa4
......@@ -25,7 +25,10 @@ import com.dlink.model.UDFTemplate;
import com.dlink.service.UDFTemplateService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Resource;
......@@ -38,6 +41,9 @@ import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.JsonNode;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
/**
......@@ -52,6 +58,37 @@ public class UDFController {
@Resource
UDFTemplateService udfTemplateService;
@PostMapping("/template/tree")
public Result<List<Object>> listUdfTemplates() {
List<UDFTemplate> list = udfTemplateService.list();
Map<String, Dict> one = new HashMap<>(3);
Map<String, Dict> two = new HashMap<>(3);
Map<String, Dict> three = new HashMap<>(3);
Map<String, Object> result = new HashMap<>(3);
list.forEach(t -> {
one.putIfAbsent(t.getCodeType(), Dict.create().set("label", t.getCodeType()).set("value", t.getCodeType()));
two.putIfAbsent(t.getCodeType() + t.getFunctionType(), Dict.create().set("label", t.getFunctionType()).set("value", t.getFunctionType()));
three.putIfAbsent(t.getCodeType() + t.getFunctionType() + t.getId(), Dict.create().set("label", t.getName()).set("value", t.getId()));
});
Set<String> twoKeys = two.keySet();
Set<String> threeKeys = three.keySet();
one.forEach((k1, v1) -> {
result.put(k1, v1);
ArrayList<Dict> c1 = new ArrayList<>();
v1.put("children", c1);
twoKeys.stream().filter(x -> x.contains(k1)).map(x -> StrUtil.strip(x, k1)).forEach(k2 -> {
Dict v2 = two.get(k1 + k2);
c1.add(v2);
ArrayList<Dict> c2 = new ArrayList<>();
v2.put("children", c2);
threeKeys.stream().filter(x -> x.contains(k1 + k2)).map(x -> StrUtil.strip(x, k1 + k2)).forEach(k3 -> {
c2.add(three.get(k1 + k2 + k3));
});
});
});
return Result.succeed(CollUtil.newArrayList(result.values()));
}
@PostMapping("/template/list")
public ProTableResult<UDFTemplate> listUdfTemplates(@RequestBody JsonNode para) {
return udfTemplateService.selectForProTable(para);
......
......@@ -21,6 +21,9 @@ package com.dlink.dto;
import com.dlink.config.Dialect;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
......@@ -40,4 +43,5 @@ public class CatalogueTaskDTO {
private String name;
private String alias;
private String dialect = Dialect.DEFAULT.getValue();
private Map<String,String> config = new HashMap<>();
}
......@@ -36,6 +36,7 @@ import com.dlink.service.StatementService;
import com.dlink.service.TaskService;
import com.dlink.service.TaskVersionService;
import java.util.Collections;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -83,6 +84,7 @@ public class CatalogueServiceImpl extends SuperServiceImpl<CatalogueMapper, Cata
task.setName(catalogueTaskDTO.getName());
task.setAlias(catalogueTaskDTO.getAlias());
task.setDialect(catalogueTaskDTO.getDialect());
task.setConfig(Collections.singletonList(catalogueTaskDTO.getConfig()));
taskService.saveOrUpdateTask(task);
Catalogue catalogue = new Catalogue();
catalogue.setTenantId(catalogueTaskDTO.getTenantId());
......
......@@ -74,6 +74,7 @@ import com.dlink.model.TaskOperatingSavepointSelect;
import com.dlink.model.TaskOperatingStatus;
import com.dlink.model.TaskVersion;
import com.dlink.model.UDFPath;
import com.dlink.model.UDFTemplate;
import com.dlink.result.SqlExplainResult;
import com.dlink.result.TaskOperatingResult;
import com.dlink.service.AlertGroupService;
......@@ -92,6 +93,7 @@ import com.dlink.service.StatementService;
import com.dlink.service.TaskService;
import com.dlink.service.TaskVersionService;
import com.dlink.service.UDFService;
import com.dlink.service.UDFTemplateService;
import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.UDFUtil;
......@@ -134,6 +136,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.lang.tree.TreeNode;
import cn.hutool.core.lang.tree.TreeUtil;
......@@ -175,6 +178,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private CatalogueService catalogueService;
@Autowired
private FragmentVariableService fragmentVariableService;
@Autowired
private UDFTemplateService udfTemplateService;
@Value("${spring.datasource.driver-class-name}")
private String driver;
......@@ -361,7 +366,17 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public boolean saveOrUpdateTask(Task task) {
// to compiler java udf
if (Dialect.isUDF(task.getDialect())) {
if (CollUtil.isNotEmpty(task.getConfig()) && Asserts.isNullString(task.getStatement())) {
Map<String, String> config = task.getConfig().get(0);
UDFTemplate template = udfTemplateService.getById(config.get("templateId"));
if (template != null) {
String code = UDFUtil.templateParse(task.getDialect(), template.getTemplateCode(), config.get("className"));
task.setStatement(code);
}
}
}
// to compiler udf
if (Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.equalsVal(task.getDialect())
&& Asserts.isNotNullString(task.getStatement())) {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement());
......@@ -371,6 +386,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} else {
task.setSavePointPath(UDFUtil.getScalaFullClassName(task.getStatement()));
}
// if modify task else create task
if (task.getId() != null) {
Task taskInfo = getById(task.getId());
......@@ -486,7 +502,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public Task getUDFByClassName(String className) {
Task task = getOne(
new QueryWrapper<Task>().in("dialect", "Java", "Python", "Scala").eq("enabled", 1).eq("save_point_path", className));
new QueryWrapper<Task>().in("dialect", Dialect.JAVA, Dialect.SCALA, Dialect.PYTHON).eq("enabled", 1).eq("save_point_path", className));
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
return task;
......@@ -495,7 +511,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public List<Task> getAllUDF() {
List<Task> tasks =
list(new QueryWrapper<Task>().in("dialect", "Java", "Python", "Scala").eq("enabled", 1).isNotNull("save_point_path"));
list(new QueryWrapper<Task>().in("dialect", Dialect.JAVA, Dialect.SCALA, Dialect.PYTHON).eq("enabled", 1).isNotNull("save_point_path"));
return tasks.stream().peek(task -> {
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
......
......@@ -38,6 +38,10 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
......
......@@ -28,8 +28,8 @@ import com.dlink.assertion.Asserts;
* @since 2021/12/13
**/
public enum Dialect {
FLINKSQL("FlinkSql"), FLINKJAR("FlinkJar"), FLINKSQLENV("FlinkSqlEnv"), SQL("Sql"), JAVA("Java"),PYTHON("PYTHON"),
//
FLINKSQL("FlinkSql"), FLINKJAR("FlinkJar"), FLINKSQLENV("FlinkSqlEnv"), SQL("Sql"), JAVA("Java"), PYTHON("Python"), SCALA("Scala"),
MYSQL("Mysql"), ORACLE("Oracle"), SQLSERVER("SqlServer"), POSTGRESQL("PostgreSql"), CLICKHOUSE("ClickHouse"),
DORIS("Doris"), PHOENIX("Phoenix"), HIVE("Hive"), STARROCKS("StarRocks"), KUBERNETES_APPLICATION("KubernetesApplaction");
......@@ -82,5 +82,17 @@ public enum Dialect {
return false;
}
}
public static boolean isUDF(String value) {
Dialect dialect = Dialect.get(value);
switch (dialect) {
case JAVA:
case SCALA:
case PYTHON:
return true;
default:
return false;
}
}
}
......@@ -19,6 +19,8 @@
package com.dlink.utils;
import com.dlink.assertion.Asserts;
import com.dlink.config.Dialect;
import com.dlink.constant.PathConstant;
import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool;
......@@ -49,12 +51,16 @@ import org.slf4j.LoggerFactory;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.PatternPool;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.MD5;
import cn.hutool.extra.template.TemplateConfig;
import cn.hutool.extra.template.TemplateEngine;
import cn.hutool.extra.template.TemplateUtil;
import groovy.lang.GroovyClassLoader;
/**
......@@ -76,6 +82,37 @@ public class UDFUtil {
public static final String PYTHON_UDF_DEF = "@ud(?:f|tf|af|taf).*\\n+def\\s+(.*)\\(.*\\):";
public static final String SCALA_UDF_CLASS = "class\\s+(\\w+)(\\s*\\(.*\\)){0,1}\\s+extends";
public static final String SCALA_UDF_PACKAGE = "package\\s+(.*);";
private static final TemplateEngine ENGINE = TemplateUtil.createEngine(new TemplateConfig());
/**
* 模板解析
*
* @param dialect 方言
* @param template 模板
* @param className 类名
* @return {@link String}
*/
public static String templateParse(String dialect, String template, String className) {
List<String> split = StrUtil.split(className, ".");
switch (Dialect.get(dialect)) {
case JAVA:
case SCALA:
String clazz = CollUtil.getLast(split);
String packageName = StrUtil.strip(className, clazz);
Dict data = Dict.create()
.set("className", clazz)
.set("package", Asserts.isNullString(packageName) ? "" : StrUtil.strip(packageName, "."));
return ENGINE.getTemplate(template).render(data);
case PYTHON:
default:
String clazzName = split.get(0);
Dict data2 = Dict.create()
.set("className", clazzName)
.set("attr", split.size() > 1 ? split.get(1) : null);
return ENGINE.getTemplate(template).render(data2);
}
}
public static List<UDF> getUDF(String statement) {
ProcessEntity process = ProcessContextHolder.getProcess();
......
......@@ -58,9 +58,17 @@ VALUES (1, 1, 1, current_time, current_time);
-- Records of dlink_udf_template
-- ----------------------------
INSERT INTO `dlink_udf_template` (`id`, `name`, `code_type`, `function_type`, `template_code`, `enabled`, `create_time`, `update_time`)
VALUES (1, 'java_udf', 'java', 'UDF', 'package ${package};\n\nimport org.apache.flink.table.functions.ScalarFunction;\n\npublic class ${className} extends ScalarFunction {\n public String eval(String s) {\n return null;\n }\n}', NULL, '2022-10-19 09:17:37', '2022-10-19 09:17:37');
VALUES (1, 'java_udf', 'Java', 'UDF', '${(package==\'\')?string(\'\',\'package \'+package+\';\')}\n\nimport org.apache.flink.table.functions.ScalarFunction;\n\npublic class ${className} extends ScalarFunction {\n public String eval(String s) {\n return null;\n }\n}', NULL, '2022-10-19 09:17:37', '2022-10-25 17:45:57');
INSERT INTO `dlink_udf_template` (`id`, `name`, `code_type`, `function_type`, `template_code`, `enabled`, `create_time`, `update_time`)
VALUES (2, 'java_udtf', 'java', 'UDTF', 'package ${package};\n\nimport org.apache.flink.table.functions.ScalarFunction;\n\n@FunctionHint(output = @DataTypeHint(\"ROW<word STRING, length INT>\"))\npublic static class ${className} extends TableFunction<Row> {\n\n public void eval(String str) {\n for (String s : str.split(\" \")) {\n // use collect(...) to emit a row\n collect(Row.of(s, s.length()));\n }\n }\n}', NULL, '2022-10-19 09:22:58', '2022-10-19 10:01:57');
VALUES (2, 'java_udtf', 'Java', 'UDTF', '${(package==\'\')?string(\'\',\'package \'+package+\';\')}\n\nimport org.apache.flink.table.functions.ScalarFunction;\n\n@FunctionHint(output = @DataTypeHint(\"ROW<word STRING, length INT>\"))\npublic static class ${className} extends TableFunction<Row> {\n\n public void eval(String str) {\n for (String s : str.split(\" \")) {\n // use collect(...) to emit a row\n collect(Row.of(s, s.length()));\n }\n }\n}', NULL, '2022-10-19 09:22:58', '2022-10-25 17:49:30');
INSERT INTO `dlink_udf_template` (`id`, `name`, `code_type`, `function_type`, `template_code`, `enabled`, `create_time`, `update_time`)
VALUES (3, 'scala_udf', 'Scala', 'UDF', '${(package==\'\')?string(\'\',\'package \'+package+\';\')}\n\nimport org.apache.flink.table.api._\nimport org.apache.flink.table.functions.ScalarFunction\n\n// 定义可参数化的函数逻辑\nclass ${className} extends ScalarFunction {\n def eval(s: String, begin: Integer, end: Integer): String = {\n \"this is scala\"\n }\n}', NULL, '2022-10-25 09:21:32', '2022-10-25 17:49:46');
INSERT INTO `dlink_udf_template` (`id`, `name`, `code_type`, `function_type`, `template_code`, `enabled`, `create_time`, `update_time`)
VALUES (4, 'python_udf_1', 'Python', 'UDF', 'from pyflink.table import ScalarFunction, DataTypes\nfrom pyflink.table.udf import udf\n\nclass ${className}(ScalarFunction):\n def __init__(self):\n pass\n\n def eval(self, variable):\n return str(variable)\n\n\n${attr!\'f\'} = udf(HashCode(), result_type=DataTypes.STRING())', NULL, '2022-10-25 09:23:07', '2022-10-25 09:34:01');
INSERT INTO `dlink_udf_template` (`id`, `name`, `code_type`, `function_type`, `template_code`, `enabled`, `create_time`, `update_time`)
VALUES (5, 'python_udf_2', 'Python', 'UDF', 'from pyflink.table import DataTypes\nfrom pyflink.table.udf import udf\n\n@udf(result_type=DataTypes.STRING())\ndef ${className}(variable1:string):\n return \'\'', NULL, '2022-10-25 09:25:13', '2022-10-25 09:34:47');
......@@ -18,12 +18,13 @@
*/
import React, {useState} from 'react';
import {Button, Form, Input, Modal, Select} from 'antd';
import React, {useEffect, useState} from 'react';
import {Button, Form, Input, Modal, Select, Cascader} from 'antd';
import type {TaskTableListItem} from '../data.d';
import {DIALECT} from "@/components/Studio/conf";
import {useIntl} from "umi";
import {postAll, postDataArray} from "@/components/Common/crud";
const {Option} = Select;
......@@ -32,6 +33,7 @@ export type UpdateFormProps = {
onSubmit: (values: Partial<TaskTableListItem>) => void;
updateModalVisible: boolean;
isCreate: boolean;
dialect: string;
values: Partial<TaskTableListItem>;
};
......@@ -39,21 +41,37 @@ const formLayout = {
labelCol: {span: 7},
wrapperCol: {span: 13},
};
const isUDF = (dialect: string) => {
return (dialect == DIALECT.SCALA || dialect == DIALECT.PYTHON || dialect == DIALECT.JAVA)
}
const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => {
const intl = useIntl();
const l = (id: string, defaultMessage?: string, value?: {}) => intl.formatMessage({id, defaultMessage}, value);
useEffect(() => {
getTemplateTreeData()
}, [])
const [formVals, setFormVals] = useState<Partial<TaskTableListItem>>({
id: props.values.id,
name: props.values.name,
alias: props.values.alias,
parentId: props.values.parentId,
config: props.values.config,
});
const [dialect, setDialect] = useState<string>('')
const [templateTree, setTemplateTree] = useState<Array<Object>>([])
const [templateData, setTemplateData] = useState<Array<Object>>([])
const [defaultTemplateData, setDefaultTemplateData] = useState<Array<Object>>([])
const [form] = Form.useForm();
const getTemplateTreeData = async () => {
const resp = await postAll("/api/udf/template/tree")
setTemplateTree(resp.datas)
}
const {
onSubmit: handleUpdate,
onCancel: handleUpdateModalVisible,
......@@ -62,11 +80,31 @@ const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => {
isCreate,
} = props;
const submitForm = async () => {
const fieldsValue = await form.validateFields();
setFormVals({...formVals, ...fieldsValue});
handleUpdate({...formVals, ...fieldsValue});
const data = {...formVals, ...fieldsValue};
try {
data.config = {
templateId: String(data['config.templateId'][1]),
className: data['config.className'],
}
}catch (e) {
}
setFormVals(data);
handleUpdate(data);
};
const handlerSetDialect = (value: string) => {
setDialect(value)
if (isUDF(value)) {
templateTree.map(x => {
if (x.label == value) {
setTemplateData(x.children)
setDefaultTemplateData([x.children[0].label, x.children[0].children[0].label])
}
})
}
}
const renderContent = () => {
return (
......@@ -75,7 +113,7 @@ const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => {
label="作业类型" name="dialect"
tooltip='指定作业类型,默认为 FlinkSql'
>
<Select defaultValue={DIALECT.FLINKSQL} value={DIALECT.FLINKSQL}>
<Select defaultValue={DIALECT.FLINKSQL} value={DIALECT.FLINKSQL} onChange={handlerSetDialect}>
<Option value={DIALECT.FLINKSQL}>{DIALECT.FLINKSQL}</Option>
<Option value={DIALECT.KUBERNETES_APPLICATION}>{DIALECT.KUBERNETES_APPLICATION}</Option>
<Option value={DIALECT.FLINKJAR}>{DIALECT.FLINKJAR}</Option>
......@@ -89,9 +127,9 @@ const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => {
<Option value={DIALECT.HIVE}>{DIALECT.HIVE}</Option>
<Option value={DIALECT.PHOENIX}>{DIALECT.PHOENIX}</Option>
<Option value={DIALECT.STARROCKS}>{DIALECT.STARROCKS}</Option>
<Option value={DIALECT.JAVA}>{DIALECT.JAVA}</Option>
<Option value={DIALECT.SCALA}>{DIALECT.SCALA}</Option>
<Option value={DIALECT.PYTHON}>{DIALECT.PYTHON}</Option>
<Option key={DIALECT.JAVA} value={DIALECT.JAVA}>{DIALECT.JAVA}</Option>
<Option key={DIALECT.SCALA} value={DIALECT.SCALA}>{DIALECT.SCALA}</Option>
<Option key={DIALECT.PYTHON} value={DIALECT.PYTHON}>{DIALECT.PYTHON}</Option>
<Option value={DIALECT.SQL}>{DIALECT.SQL}</Option>
</Select>
</Form.Item>) : undefined}
......@@ -107,6 +145,23 @@ const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => {
rules={[{required: true, message: '请输入别名!'}]}>
<Input placeholder="请输入"/>
</Form.Item>
{isUDF(dialect) ? (<>
<Form.Item
name="config.templateId"
label="udf 模板"
rules={[{required: true, message: '请选择udf模板!'}]}>
{<Cascader
value={defaultTemplateData}
options={templateData}
/>}
</Form.Item>
<Form.Item
name="config.className"
label="类名或方法名"
rules={[{required: true, message: '请输入类名或方法名!'}]}>
<Input placeholder="请输入"/>
</Form.Item>
</>) : undefined}
</>
);
};
......
......@@ -31,5 +31,6 @@ export type TaskTableListItem = {
name: string,
alias: string,
dialect: string,
config: Object<string, object>,
parentId: number,
};
......@@ -28,10 +28,11 @@ import 'antd/dist/antd.css';
import './index.css';
import CodeEdit from "@/components/Common/CodeEdit";
import {useIntl} from "umi";
import { DIALECT } from "@/components/Studio/conf";
const {Option} = Select;
const UDFTemplate: React.FC<{}> = (props) => {
const UDFTemplate: React.FC<{}> = () => {
const [open, setOpen] = useState(false);
......@@ -89,7 +90,6 @@ const UDFTemplate: React.FC<{}> = (props) => {
title={(tModel.id ? '修改' : '添加') + "UDF模板"}
width={720}
onClose={onClose}
open={open}
extra={
<Space>
<Button onClick={onClose}>{l('button.cancel')}</Button>
......@@ -119,9 +119,9 @@ const UDFTemplate: React.FC<{}> = (props) => {
rules={[{required: true, message: '请选择代码类型'}]}
>
<Select placeholder="请选择代码类型">
<Option value="java">java</Option>
<Option value="python">python</Option>
<Option value="scala">scala</Option>
<Option value={DIALECT.JAVA}>{DIALECT.JAVA}</Option>
<Option value={DIALECT.SCALA}>{DIALECT.SCALA}</Option>
<Option value={DIALECT.PYTHON}>{DIALECT.PYTHON}</Option>
</Select>
</Form.Item>
</Col>
......@@ -166,8 +166,8 @@ const UDFTemplate: React.FC<{}> = (props) => {
const deleteUDFTemplate = (id: number) => {
Modal.confirm({
title: '删除集群',
content: '确定删除该集群吗?',
title: '删除模板',
content: '确定删除该模板吗?',
okText: l('button.confirm'),
cancelText: l('button.cancel'),
onOk: async () => {
......@@ -230,7 +230,7 @@ const UDFTemplate: React.FC<{}> = (props) => {
}, {
title: l('global.table.operate'),
key: 'action',
render: (text, record, _, action) => (
render: (text, record) => (
<Space size="middle">
<Button type="primary" icon={<FormOutlined/>} onClick={() => changeModel(record)}></Button>
<Button type="primary" icon={<DeleteOutlined/>} onClick={() => {
......@@ -245,7 +245,7 @@ const UDFTemplate: React.FC<{}> = (props) => {
<PageContainer title={false}>
{<Box/>}
<ProTable
request={(params, sorter, filter) => getTemplate()}
request={() => getTemplate()}
columns={columns}
search={false}
toolBarRender={() => [
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment