Unverified Commit ca30c57b authored by ZackYoung's avatar ZackYoung Committed by GitHub

add Flink UDF Template (#1121)

* add flink udf template(backed)

* add flink udf template(front end)

* add sql file
parent ab68f5d2
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.UDFTemplate;
import com.dlink.service.UDFTemplateService;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
/**
* @author ZackYoung
* @since 0.6.8
*/
@Slf4j
@RestController
@RequestMapping("/api/udf")
public class UDFController {
@Resource
UDFTemplateService udfTemplateService;
@PostMapping("/template/list")
public ProTableResult<UDFTemplate> listUdfTemplates(@RequestBody JsonNode para) {
return udfTemplateService.selectForProTable(para);
}
@PutMapping("/template/")
public Result<String> addTemplate(@RequestBody UDFTemplate udfTemplate) {
return udfTemplateService.save(udfTemplate) ? Result.succeed("操作成功") : Result.failed("操作失败");
}
@DeleteMapping("/template/list")
public Result deleteMul(@RequestBody JsonNode para) {
if (para.size() > 0) {
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para) {
Integer id = item.asInt();
if (!udfTemplateService.removeById(id)) {
error.add(id);
}
}
if (error.size() == 0) {
return Result.succeed("删除成功");
} else {
return Result.succeed("删除部分成功,但" + error.toString() + "删除失败,共" + error.size() + "次失败。");
}
} else {
return Result.failed("请选择要删除的记录");
}
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.UDFTemplate;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
import java.util.Map;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* @author ZackYoung
* @since 0.6.8
* udf 模板mapper
*/
@Mapper
public interface UDFTemplateMapper extends SuperMapper<UDFTemplate> {
@Override
@Select("select * from dlink_udf_template")
List<UDFTemplate> selectForProTable(Page<UDFTemplate> page, Wrapper<UDFTemplate> queryWrapper, Map<String, Object> param);
}
\ No newline at end of file
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.model;
import com.dlink.db.model.SuperEntity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* AlertGroup
*
* @author wenmo
* @since 2022/2/24 19:58
**/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_udf_template")
public class UDFTemplate extends SuperEntity {
private static final long serialVersionUID = 7027411154789682344L;
private String codeType;
private String functionType;
private String templateCode;
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.UDFTemplate;
/**
* @author ZackYoung
* @since 0.6.8
*/
public interface UDFTemplateService extends ISuperService<UDFTemplate> {
/**
* 保存
*
* @param udfTemplate udf模板
* @return boolean
*/
boolean save(UDFTemplate udfTemplate);
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service.impl;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.exception.BusException;
import com.dlink.mapper.UDFTemplateMapper;
import com.dlink.model.UDFTemplate;
import com.dlink.service.UDFTemplateService;
import org.springframework.stereotype.Service;
/**
* @author ZackYoung
* @since 0.6.8
*/
@Service
public class UDFTemplateServiceImpl extends SuperServiceImpl<UDFTemplateMapper, UDFTemplate> implements UDFTemplateService {
@Override
public boolean save(UDFTemplate udfTemplate) {
UDFTemplate selectOne = query().eq("name", udfTemplate.getName()).one();
if (udfTemplate.getId() == 0) {
// 添加
udfTemplate.setId(null);
if ((selectOne != null)) {
throw new BusException("模板名已经存在");
}
} else {
// 修改
if (selectOne == null) {
return saveOrUpdate(udfTemplate);
}
if (!udfTemplate.getId().equals(selectOne.getId())) {
throw new BusException("模板名已经存在");
}
}
return saveOrUpdate(udfTemplate);
}
}
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS `dlink_udf`;
CREATE TABLE `dlink_udf` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(50) DEFAULT NULL COMMENT 'udf名',
`class_name` varchar(50) DEFAULT NULL COMMENT '完整的类名',
`source_code` text COMMENT '源码',
`compiler_code` binary(255) DEFAULT NULL COMMENT '编译产物',
`version_id` int(11) DEFAULT NULL COMMENT '版本',
`version_description` varchar(50) DEFAULT NULL COMMENT '版本描述',
`is_default` tinyint(1) DEFAULT NULL COMMENT '是否默认',
`document_id` int(11) DEFAULT NULL COMMENT '对应文档id',
`from_version_id` int(11) DEFAULT NULL COMMENT '基于udf版本id',
`code_md5` varchar(50) DEFAULT NULL COMMENT '源码',
`dialect` varchar(50) DEFAULT NULL COMMENT '方言',
`type` varchar(50) DEFAULT NULL COMMENT '类型',
`step` int(255) DEFAULT NULL COMMENT '作业生命周期',
`enable` tinyint(1) DEFAULT NULL COMMENT '是否启用',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for dlink_udf_template
-- ----------------------------
DROP TABLE IF EXISTS `dlink_udf_template`;
CREATE TABLE `dlink_udf_template` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(20) DEFAULT NULL COMMENT '模板名称',
`code_type` varchar(10) DEFAULT NULL COMMENT '代码类型',
`function_type` varchar(10) DEFAULT NULL COMMENT '函数类型',
`template_code` text COMMENT '模板代码',
`enabled` tinyint(1) DEFAULT NULL,
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Records of dlink_udf_template
-- ----------------------------
BEGIN;
INSERT INTO `dlink_udf_template` (`id`, `name`, `code_type`, `function_type`, `template_code`, `enabled`, `create_time`, `update_time`) VALUES (1, 'java_udf', 'java', 'UDF', 'package ${package};\n\nimport org.apache.flink.table.functions.ScalarFunction;\n\npublic class ${className} extends ScalarFunction {\n public String eval(String s) {\n return null;\n }\n}', NULL, '2022-10-19 09:17:37', '2022-10-19 09:17:37');
INSERT INTO `dlink_udf_template` (`id`, `name`, `code_type`, `function_type`, `template_code`, `enabled`, `create_time`, `update_time`) VALUES (2, 'java_udtf', 'java', 'UDTF', 'package ${package};\n\nimport org.apache.flink.table.functions.ScalarFunction;\n\n@FunctionHint(output = @DataTypeHint(\"ROW<word STRING, length INT>\"))\npublic static class ${className} extends TableFunction<Row> {\n\n public void eval(String str) {\n for (String s : str.split(\" \")) {\n // use collect(...) to emit a row\n collect(Row.of(s, s.length()));\n }\n }\n}', NULL, '2022-10-19 09:22:58', '2022-10-19 10:01:57');
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
\ No newline at end of file
...@@ -176,6 +176,12 @@ export default [ ...@@ -176,6 +176,12 @@ export default [
icon: 'setting', icon: 'setting',
component: './SettingCenter/FlinkSettings', component: './SettingCenter/FlinkSettings',
}, },
{
path: '/settingcenter/udfTemplate',
name: 'udfTemplate',
icon: 'setting',
component: './SettingCenter/UDFTemplate',
},
{ {
path: '/settingcenter/systeminfo', path: '/settingcenter/systeminfo',
name: 'systemInfo', name: 'systemInfo',
......
...@@ -102,6 +102,7 @@ export default { ...@@ -102,6 +102,7 @@ export default {
'menu.settings': 'Setting Center', 'menu.settings': 'Setting Center',
'menu.settings.flinkConfig': 'Flink Settings', 'menu.settings.flinkConfig': 'Flink Settings',
'menu.settings.udfTemplate': 'UDF Template Settings',
'menu.settings.systemInfo': 'System Info', 'menu.settings.systemInfo': 'System Info',
'menu.settings.processList': 'Process List', 'menu.settings.processList': 'Process List',
......
...@@ -106,6 +106,7 @@ export default { ...@@ -106,6 +106,7 @@ export default {
'menu.settings': '配置中心', 'menu.settings': '配置中心',
'menu.settings.flinkConfig': 'Flink 配置', 'menu.settings.flinkConfig': 'Flink 配置',
'menu.settings.udfTemplate': 'udf模板配置',
'menu.settings.systemInfo': '系统信息', 'menu.settings.systemInfo': '系统信息',
'menu.settings.processList': '进程列表', 'menu.settings.processList': '进程列表',
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
export type UDFTemplateItem = {
id: number,
name: string,
codeType: string,
functionType: string,
templateCode: string
};
.site-form-in-drawer-wrapper {
position: absolute;
right: 0px;
bottom: 0px;
width: 100%;
padding: 10px 16px;
text-align: right;
background: #fff;
border-top: 1px solid #e9e9e9;
}
[data-theme="dark"] .site-form-in-drawer-wrapper {
border-top: 1px solid #303030;
background: #1f1f1f;
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import React, {useRef, useState} from "react";
import ProTable, {ActionType, ProColumns} from "@ant-design/pro-table";
import {PageContainer} from '@ant-design/pro-layout';
import {UDFTemplateItem} from "@/pages/SettingCenter/UDFTemplate/data";
import {addTemplate, deleteTemplate, getTemplate} from "@/pages/SettingCenter/UDFTemplate/service";
import {Button, Col, Drawer, Form, Input, Row, Select, Space} from "antd";
import {DeleteOutlined, FormOutlined, PlusOutlined} from "@ant-design/icons";
import 'antd/dist/antd.css';
import './index.css';
import {ProCoreActionType} from "@ant-design/pro-utils/lib/typing";
const {Option} = Select;
const UDFTemplate: React.FC<{}> = (props) => {
const [open, setOpen] = useState(false);
const initData: UDFTemplateItem = {
id: 0,
name: "",
codeType: "java",
functionType: "UDF",
templateCode: ""
}
const [tModel, setTModel] = useState<UDFTemplateItem>(initData);
const actionRef = useRef<ActionType>();
const addModel = () => {
setTModel({
id: 0,
name: "",
codeType: "java",
functionType: "UDF",
templateCode: ""
})
}
const changeModel = (record: UDFTemplateItem) => {
setTModel(record)
setOpen(true)
}
const showDrawer = () => {
addModel()
setOpen(true);
};
const onClose = () => {
actionRef.current.reload()
setOpen(false);
};
const Box = () => {
const [form] = Form.useForm();
const add = async () => {
try {
const values = await form.validateFields();
values["id"] = tModel.id
await addTemplate(values)
onClose()
} catch (errorInfo) {
console.log('Failed:', errorInfo);
}
};
return <Drawer
visible={open}
title="添加或修改UDF模板"
width={720}
onClose={onClose}
open={open}
bodyStyle={{paddingBottom: 80}}
extra={
<Space>
<Button onClick={onClose}>取消</Button>
<Button onClick={add} type="primary">
提交
</Button>
</Space>
}
>
<Form layout="vertical" form={form} initialValues={tModel}>
<Row gutter={16}>
<Col span={12}>
<Form.Item
name="name"
label="模板名"
rules={[{required: true, message: '模板名称'}]}
>
<Input placeholder="请输入模板名"/>
</Form.Item>
</Col>
</Row>
<Row gutter={16}>
<Col span={12}>
<Form.Item
name="codeType"
label="代码类型"
rules={[{required: true, message: '请选择代码类型'}]}
>
<Select placeholder="请选择代码类型">
<Option value="java">java</Option>
<Option value="python">python</Option>
<Option value="scala">scala</Option>
</Select>
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
name="functionType"
label="函数类型"
rules={[{required: true, message: '请选择函数类型'}]}
>
<Select placeholder="Please choose the type">
<Option value="UDF">UDF</Option>
<Option value="UDAF">UDAF</Option>
<Option value="UDTF">UDTF</Option>
</Select>
</Form.Item>
</Col>
</Row>
<Row gutter={16}>
<Col span={24}>
<Form.Item
name="templateCode"
label="模板代码"
rules={[
{
required: true,
message: '请编辑模板代码',
},
]}
>
<Input.TextArea rows={20} placeholder="请编辑模板代码"/>
</Form.Item>
</Col>
</Row>
</Form>
</Drawer>
}
const columns: ProColumns<UDFTemplateItem>[] = [
{
title: '模板名',
sorter: true,
dataIndex: 'name',
},
{
title: '代码类型',
sorter: true,
dataIndex: 'codeType',
filters: [
{
text: 'java',
value: 'java',
}, {
text: 'scala',
value: 'scala',
}, {
text: 'python',
value: 'python',
},
],
valueEnum: {
'java': {text: 'java'},
'scala': {text: 'scala'},
'python': {text: 'python'},
},
onFilter: true
}, {
title: '函数类型',
sorter: true,
dataIndex: 'functionType',
filters: [
{
text: 'UDF',
value: 'UDF',
}, {
text: 'UDTF',
value: 'UDTF',
}, {
text: 'UDAF',
value: 'UDAF',
},
],
valueEnum: {
'UDF': {text: 'UDF'},
'UDTF': {text: 'UDTF'},
'UDAF': {text: 'UDAF'},
},
onFilter: true
}, {
title: '操作',
key: 'action',
render: (text, record,_,action) => (
<Space size="middle">
<Button type="primary" icon={<FormOutlined/>} onClick={() => changeModel(record)}></Button>
<Button type="primary" icon={<DeleteOutlined/>} onClick={() => {deleteTemplate(record.id);action?.reload()}}></Button>
</Space>
),
}
];
return (
<PageContainer title={false}>
{<Box/>}
<ProTable
request={(params, sorter, filter) => getTemplate()}
columns={columns}
search={false}
toolBarRender={() => [
<Button type="primary" onClick={showDrawer} icon={<PlusOutlined/>}>
新建
</Button>
]}
actionRef={actionRef}
/>
</PageContainer>
);
};
export default UDFTemplate;
import {handleRemoveById, postAll, handleAddOrUpdate} from "@/components/Common/crud";
import {UDFTemplateItem} from "@/pages/SettingCenter/UDFTemplate/data";
const url = '/api/udf/template/list';
const addUrl = '/api/udf/template/';
export async function getTemplate() {
return await postAll(url);
}
export function deleteTemplate(id: number) {
handleRemoveById(url, id)
}
export function addTemplate(params: UDFTemplateItem) {
return handleAddOrUpdate(addUrl, params)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment