Commit ea875c21 authored by coderTomato's avatar coderTomato

Merge remote-tracking branch 'origin/dev' into dev

parents b69ac7bd 5c6f8d22
...@@ -49,6 +49,7 @@ Dinky(原 Dlink): ...@@ -49,6 +49,7 @@ Dinky(原 Dlink):
| | | 新增 选中片段执行 | 0.4.0 | | | | 新增 选中片段执行 | 0.4.0 |
| | | 新增 布局拖拽 | 0.4.0 | | | | 新增 布局拖拽 | 0.4.0 |
| | | 新增 SQL导出 | 0.5.0 | | | | 新增 SQL导出 | 0.5.0 |
| | | 新增 快捷键保存、校验、美化 | 0.5.0 |
| | | 支持 local 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 local 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 standalone 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 standalone 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn session 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 yarn session 模式下 FlinkSQL 提交 | 0.4.0 |
...@@ -56,16 +57,19 @@ Dinky(原 Dlink): ...@@ -56,16 +57,19 @@ Dinky(原 Dlink):
| | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 | | | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 | | | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 UDF Java 方言Local模式在线编写、调试、动态加载 | 0.5.0 |
| | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 | | | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 |
| | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 | | | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 |
| | | 支持 作业 Cancel | 0.4.0 | | | | 支持 作业 Cancel | 0.4.0 |
| | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 | | | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 |
| | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 | | | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 |
| | | 新增 UDF java方言代码的开发 | 0.5.0 |
| | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 | | | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 |
| | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 | | | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 |
| | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 | | | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 |
| | | 新增 外部数据源的 SQL 执行与预览 | 0.5.0 | | | | 新增 外部数据源的 SQL 执行与预览 | 0.5.0 |
| | BI | 新增 折线图的渲染 | 0.5.0 |
| | | 新增 条形图图的渲染 | 0.5.0 |
| | | 新增 饼图的渲染 | 0.5.0 |
| | 元数据 | 新增 查询外部数据源的元数据信息 | 0.4.0 | | | 元数据 | 新增 查询外部数据源的元数据信息 | 0.4.0 |
| | 归档 | 新增 执行与提交历史 | 0.4.0 | | | 归档 | 新增 执行与提交历史 | 0.4.0 |
| 运维中心 | 暂无 | 暂无 | 0.4.0 | | 运维中心 | 暂无 | 暂无 | 0.4.0 |
......
...@@ -24,4 +24,6 @@ public interface TaskService extends ISuperService<Task> { ...@@ -24,4 +24,6 @@ public interface TaskService extends ISuperService<Task> {
List<Task> listFlinkSQLEnv(); List<Task> listFlinkSQLEnv();
String exportSql(Integer id); String exportSql(Integer id);
Task getUDFByClassName(String className);
} }
...@@ -88,6 +88,7 @@ public class CatalogueServiceImpl extends SuperServiceImpl<CatalogueMapper, Cata ...@@ -88,6 +88,7 @@ public class CatalogueServiceImpl extends SuperServiceImpl<CatalogueMapper, Cata
}else{ }else{
Task task = new Task(); Task task = new Task();
task.setId(oldCatalogue.getTaskId()); task.setId(oldCatalogue.getTaskId());
task.setName(catalogue.getName());
task.setAlias(catalogue.getName()); task.setAlias(catalogue.getName());
taskService.updateById(task); taskService.updateById(task);
this.updateById(catalogue); this.updateById(catalogue);
......
...@@ -27,6 +27,7 @@ import com.dlink.session.SessionConfig; ...@@ -27,6 +27,7 @@ import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo; import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import com.dlink.utils.RunTimeUtil; import com.dlink.utils.RunTimeUtil;
import com.dlink.utils.UDFUtil;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
...@@ -89,6 +90,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -89,6 +90,7 @@ public class StudioServiceImpl implements StudioService {
if(!config.isUseSession()) { if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
} }
initUDF(config,studioExecuteDTO.getStatement());
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement()); JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
RunTimeUtil.recovery(jobManager); RunTimeUtil.recovery(jobManager);
...@@ -152,6 +154,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -152,6 +154,7 @@ public class StudioServiceImpl implements StudioService {
if(!config.isUseSession()) { if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
} }
initUDF(config,studioExecuteDTO.getStatement());
JobManager jobManager = JobManager.buildPlanMode(config); JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults(); return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
} }
...@@ -317,4 +320,15 @@ public class StudioServiceImpl implements StudioService { ...@@ -317,4 +320,15 @@ public class StudioServiceImpl implements StudioService {
} }
return false; return false;
} }
private void initUDF(JobConfig config,String statement){
if(!GatewayType.LOCAL.equalsValue(config.getType())){
return;
}
List<String> udfClassNameList = JobManager.getUDFClassName(statement);
for(String item : udfClassNameList){
Task task = taskService.getUDFByClassName(item);
JobManager.initUDF(item,task.getStatement());
}
}
} }
...@@ -15,6 +15,7 @@ import com.dlink.job.JobResult; ...@@ -15,6 +15,7 @@ import com.dlink.job.JobResult;
import com.dlink.mapper.TaskMapper; import com.dlink.mapper.TaskMapper;
import com.dlink.model.*; import com.dlink.model.*;
import com.dlink.service.*; import com.dlink.service.*;
import com.dlink.utils.CustomStringJavaCompiler;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -100,6 +101,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -100,6 +101,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override @Override
public boolean saveOrUpdateTask(Task task) { public boolean saveOrUpdateTask(Task task) {
if(Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.equalsVal(task.getDialect())
&& Asserts.isNotNullString(task.getStatement()) ){
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement());
task.setSavePointPath(compiler.getFullClassName());
}
if (task.getId() != null) { if (task.getId() != null) {
this.updateById(task); this.updateById(task);
if (task.getStatement() != null) { if (task.getStatement() != null) {
...@@ -151,6 +157,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -151,6 +157,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
} }
@Override
public Task getUDFByClassName(String className) {
Task task = getOne(new QueryWrapper<Task>().eq("dialect", "Java").eq("enabled", 1).eq("save_point_path", className));
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
return task;
}
private JobConfig buildJobConfig(Task task){ private JobConfig buildJobConfig(Task task){
boolean isJarTask = isJarTask(task); boolean isJarTask = isJarTask(task);
if(!isJarTask&&Asserts.isNotNull(task.getEnvId())){ if(!isJarTask&&Asserts.isNotNull(task.getEnvId())){
......
package com.dlink.pool;
import com.dlink.assertion.Asserts;
import lombok.Getter;
import lombok.Setter;
/**
* ClassEntity
*
* @author wenmo
* @since 2022/1/12 23:52
*/
@Getter
@Setter
public class ClassEntity {
private String name;
private String code;
private byte[] classByte;
public ClassEntity(String name, String code) {
this.name = name;
this.code = code;
}
public ClassEntity(String name, String code, byte[] classByte) {
this.name = name;
this.code = code;
this.classByte = classByte;
}
public static ClassEntity build(String name, String code){
return new ClassEntity(name,code);
}
public boolean equals(ClassEntity entity) {
if (Asserts.isEquals(name, entity.getName()) && Asserts.isEquals(code, entity.getCode())){
return true;
}else{
return false;
}
}
}
package com.dlink.pool;
import java.util.List;
import java.util.Vector;
/**
* ClassPool
*
* @author wenmo
* @since 2022/1/12 23:52
*/
public class ClassPool {
private static volatile List<ClassEntity> classList = new Vector<>();
public static boolean exist(String name) {
for (ClassEntity executorEntity : classList) {
if (executorEntity.getName().equals(name)) {
return true;
}
}
return false;
}
public static boolean exist(ClassEntity entity) {
for (ClassEntity executorEntity : classList) {
if (executorEntity.equals(entity)) {
return true;
}
}
return false;
}
public static Integer push(ClassEntity executorEntity){
if(exist(executorEntity.getName())){
remove(executorEntity.getName());
}
classList.add(executorEntity);
return classList.size();
}
public static Integer remove(String name) {
int count = classList.size();
for (int i = 0; i < classList.size(); i++) {
if (name.equals(classList.get(i).getName())) {
classList.remove(i);
break;
}
}
return count - classList.size();
}
public static ClassEntity get(String name) {
for (ClassEntity executorEntity : classList) {
if (executorEntity.getName().equals(name)) {
return executorEntity;
}
}
return null;
}
}
...@@ -35,6 +35,11 @@ ...@@ -35,6 +35,11 @@
<groupId>cn.hutool</groupId> <groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId> <artifactId>hutool-all</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>3.0.9</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
......
...@@ -9,6 +9,7 @@ import com.dlink.explainer.trans.TransGenerator; ...@@ -9,6 +9,7 @@ import com.dlink.explainer.trans.TransGenerator;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.job.JobParam; import com.dlink.job.JobParam;
import com.dlink.job.StatementParam; import com.dlink.job.StatementParam;
import com.dlink.model.SystemConfiguration;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.result.ExplainResult; import com.dlink.result.ExplainResult;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
...@@ -162,7 +163,7 @@ public class Explainer { ...@@ -162,7 +163,7 @@ public class Explainer {
} }
} }
if (inserts.size() > 0) { if (inserts.size() > 0) {
String sqlSet = String.join(FlinkSQLConstant.SEPARATOR, inserts); String sqlSet = String.join(";\r\n ", inserts);
try { try {
record.setExplain(executor.explainStatementSet(inserts)); record.setExplain(executor.explainStatementSet(inserts));
record.setParseTrue(true); record.setParseTrue(true);
......
...@@ -19,6 +19,8 @@ import com.dlink.gateway.result.TestResult; ...@@ -19,6 +19,8 @@ import com.dlink.gateway.result.TestResult;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.model.SystemConfiguration; import com.dlink.model.SystemConfiguration;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool;
import com.dlink.result.*; import com.dlink.result.*;
import com.dlink.session.ExecutorEntity; import com.dlink.session.ExecutorEntity;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
...@@ -26,6 +28,7 @@ import com.dlink.session.SessionInfo; ...@@ -26,6 +28,7 @@ import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import com.dlink.utils.UDFUtil;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.DeploymentOptions;
...@@ -43,6 +46,8 @@ import java.util.ArrayList; ...@@ -43,6 +46,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* JobManager * JobManager
...@@ -491,4 +496,22 @@ public class JobManager { ...@@ -491,4 +496,22 @@ public class JobManager {
sb.append(statement); sb.append(statement);
return sb.toString(); return sb.toString();
} }
public static List<String> getUDFClassName(String statement){
Pattern pattern = Pattern.compile("function (.*?)'(.*?)'", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(statement);
List<String> classNameList = new ArrayList<>();
while (matcher.find()) {
classNameList.add(matcher.group(2));
}
return classNameList;
}
public static void initUDF(String className,String code){
if(ClassPool.exist(ClassEntity.build(className,code))){
UDFUtil.initClassLoader(className);
}else{
UDFUtil.buildClass(code);
}
}
} }
package com.dlink.utils;
import javax.tools.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* CustomStringJavaCompiler
*
* @author wenmo
* @since 2021/12/28 22:46
*/
public class CustomStringJavaCompiler {
//类全名
private String fullClassName;
private String sourceCode;
//存放编译之后的字节码(key:类全名,value:编译之后输出的字节码)
private Map<String, ByteJavaFileObject> javaFileObjectMap = new ConcurrentHashMap<>();
//获取java的编译器
private JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
//存放编译过程中输出的信息
private DiagnosticCollector<JavaFileObject> diagnosticsCollector = new DiagnosticCollector<>();
//编译耗时(单位ms)
private long compilerTakeTime;
public String getFullClassName() {
return fullClassName;
}
public ByteJavaFileObject getJavaFileObjectMap(String name) {
return javaFileObjectMap.get(name);
}
public CustomStringJavaCompiler(String sourceCode) {
this.sourceCode = sourceCode;
this.fullClassName = getFullClassName(sourceCode);
}
/**
* 编译字符串源代码,编译失败在 diagnosticsCollector 中获取提示信息
*
* @return true:编译成功 false:编译失败
*/
public boolean compiler() {
long startTime = System.currentTimeMillis();
//标准的内容管理器,更换成自己的实现,覆盖部分方法
StandardJavaFileManager standardFileManager = compiler.getStandardFileManager(diagnosticsCollector, null, null);
JavaFileManager javaFileManager = new StringJavaFileManage(standardFileManager);
//构造源代码对象
JavaFileObject javaFileObject = new StringJavaFileObject(fullClassName, sourceCode);
//获取一个编译任务
JavaCompiler.CompilationTask task = compiler.getTask(null, javaFileManager, diagnosticsCollector, null, null, Arrays.asList(javaFileObject));
//设置编译耗时
compilerTakeTime = System.currentTimeMillis() - startTime;
return task.call();
}
/**
* @return 编译信息(错误 警告)
*/
public String getCompilerMessage() {
StringBuilder sb = new StringBuilder();
List<Diagnostic<? extends JavaFileObject>> diagnostics = diagnosticsCollector.getDiagnostics();
for (Diagnostic diagnostic : diagnostics) {
sb.append(diagnostic.toString()).append("\r\n");
}
return sb.toString();
}
public long getCompilerTakeTime() {
return compilerTakeTime;
}
/**
* 获取类的全名称
*
* @param sourceCode 源码
* @return 类的全名称
*/
public static String getFullClassName(String sourceCode) {
String className = "";
Pattern pattern = Pattern.compile("package\\s+\\S+\\s*;");
Matcher matcher = pattern.matcher(sourceCode);
if (matcher.find()) {
className = matcher.group().replaceFirst("package", "").replace(";", "").trim() + ".";
}
pattern = Pattern.compile("class\\s+(\\S+)\\s+");
matcher = pattern.matcher(sourceCode);
if (matcher.find()) {
className += matcher.group(1).trim();
}
return className;
}
/**
* 自定义一个字符串的源码对象
*/
private class StringJavaFileObject extends SimpleJavaFileObject {
//等待编译的源码字段
private String contents;
//java源代码 => StringJavaFileObject对象 的时候使用
public StringJavaFileObject(String className, String contents) {
super(URI.create("string:///" + className.replaceAll("\\.", "/") + Kind.SOURCE.extension), Kind.SOURCE);
this.contents = contents;
}
//字符串源码会调用该方法
@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
return contents;
}
}
/**
* 自定义一个编译之后的字节码对象
*/
public class ByteJavaFileObject extends SimpleJavaFileObject {
//存放编译后的字节码
private ByteArrayOutputStream outPutStream;
public ByteJavaFileObject(String className, Kind kind) {
super(URI.create("string:///" + className.replaceAll("\\.", "/") + Kind.SOURCE.extension), kind);
}
//StringJavaFileManage 编译之后的字节码输出会调用该方法(把字节码输出到outputStream)
@Override
public OutputStream openOutputStream() {
outPutStream = new ByteArrayOutputStream();
return outPutStream;
}
//在类加载器加载的时候需要用到
public byte[] getCompiledBytes() {
return outPutStream.toByteArray();
}
}
/**
* 自定义一个JavaFileManage来控制编译之后字节码的输出位置
*/
private class StringJavaFileManage extends ForwardingJavaFileManager {
StringJavaFileManage(JavaFileManager fileManager) {
super(fileManager);
}
//获取输出的文件对象,它表示给定位置处指定类型的指定类。
@Override
public JavaFileObject getJavaFileForOutput(Location location, String className, JavaFileObject.Kind kind, FileObject sibling) throws IOException {
ByteJavaFileObject javaFileObject = new ByteJavaFileObject(className, kind);
javaFileObjectMap.put(className, javaFileObject);
return javaFileObject;
}
}
}
package com.dlink.utils;
import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool;
import groovy.lang.GroovyClassLoader;
import org.codehaus.groovy.control.CompilerConfiguration;
/**
* UDFUtil
*
* @author wenmo
* @since 2021/12/27 23:25
*/
public class UDFUtil {
public static void buildClass(String code){
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(code);
boolean res = compiler.compiler();
if (res) {
String className = compiler.getFullClassName();
byte[] compiledBytes = compiler.getJavaFileObjectMap(className).getCompiledBytes();
ClassPool.push(new ClassEntity(className,code,compiledBytes));
System.out.println("编译成功");
System.out.println("compilerTakeTime:" + compiler.getCompilerTakeTime());
initClassLoader(className);
} else {
System.out.println("编译失败");
System.out.println(compiler.getCompilerMessage());
}
}
public static void initClassLoader(String name){
ClassEntity classEntity = ClassPool.get(name);
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
CompilerConfiguration config = new CompilerConfiguration();
config.setSourceEncoding("UTF-8");
GroovyClassLoader groovyClassLoader = new GroovyClassLoader(contextClassLoader, config);
groovyClassLoader.setShouldRecompile(true);
groovyClassLoader.defineClass(classEntity.getName(),classEntity.getClassByte());
Thread.currentThread().setContextClassLoader(groovyClassLoader);
// Class<?> clazz = groovyClassLoader.parseClass(codeSource,"com.dlink.ud.udf.SubstringFunction");
}
}
...@@ -19,4 +19,8 @@ public interface FlinkSQLConstant { ...@@ -19,4 +19,8 @@ public interface FlinkSQLConstant {
* DML 类型 * DML 类型
*/ */
String DML = "DML"; String DML = "DML";
/**
* 片段 Fragments 标识
*/
String FRAGMENTS = ":=";
} }
package com.dlink.executor; package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.model.SystemConfiguration;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException; import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Table;
...@@ -148,23 +151,24 @@ public final class SqlManager { ...@@ -148,23 +151,24 @@ public final class SqlManager {
* @throws ExpressionParserException if the name of the variable under the given sql failed. * @throws ExpressionParserException if the name of the variable under the given sql failed.
*/ */
public String parseVariable(String statement) { public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) { if (Asserts.isNullString(statement)) {
return statement; return statement;
} }
String[] strs = statement.split(";"); String[] strs = statement.split(SystemConfiguration.getInstances().getSqlSeparator());
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) { for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim(); String str = strs[i];
if (str.length() == 0) { if (str.trim().length() == 0) {
continue; continue;
} }
if (str.contains(":=")) { str = strs[i];
String[] strs2 = str.split(":="); if (str.contains(FlinkSQLConstant.FRAGMENTS)) {
String[] strs2 = str.split(FlinkSQLConstant.FRAGMENTS);
if (strs2.length >= 2) { if (strs2.length >= 2) {
if (strs2[0].length() == 0) { if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name."); throw new ExpressionParserException("Illegal variable name.");
} }
String valueString = str.substring(str.indexOf(":=") + 2); String valueString = str.substring(str.indexOf(FlinkSQLConstant.FRAGMENTS) + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString)); this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else { } else {
throw new ExpressionParserException("Illegal variable definition."); throw new ExpressionParserException("Illegal variable definition.");
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
<!-- <scope>test</scope>--> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -56,6 +56,7 @@ ...@@ -56,6 +56,7 @@
"dependencies": { "dependencies": {
"@ant-design/charts": "^1.3.4", "@ant-design/charts": "^1.3.4",
"@ant-design/icons": "^4.5.0", "@ant-design/icons": "^4.5.0",
"@ant-design/plots": "^1.0.7",
"@ant-design/pro-descriptions": "^1.6.8", "@ant-design/pro-descriptions": "^1.6.8",
"@ant-design/pro-form": "^1.18.3", "@ant-design/pro-form": "^1.18.3",
"@ant-design/pro-layout": "^6.18.0", "@ant-design/pro-layout": "^6.18.0",
......
@import '~antd/es/style/themes/default.less';
.form_setting{
padding-left: 10px;
}
.form_item{
margin-bottom: 5px;
}
import {Button, Tag, Row, Col, Form, Select, Empty, Switch} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import styles from "./index.less";
import {FireOutlined, SearchOutlined, RedoOutlined, InfoCircleOutlined} from '@ant-design/icons';
import {useEffect, useState} from "react";
import React from "react";
const {Option} = Select;
export type BarChartConfig = {
isGroup: boolean,
isStack: boolean,
isPercent: boolean,
xField: string,
yField: string,
seriesField?: string,
label?: { },
};
export type BarChartProps = {
onChange: (values: Partial<BarChartConfig>) => void;
data: [];
column: [];
};
const BarChartSetting: React.FC<BarChartProps> = (props) => {
const {current,column,onChange: handleChange,dispatch} = props;
const [form] = Form.useForm();
useEffect(() => {
form.setFieldsValue(current.console.chart);
}, [current.console.chart]);
const onValuesChange = (change: any, all: any) => {
let config: BarChartConfig = {
isGroup: all.isGroup,
isStack: all.isStack,
isPercent: all.isPercent,
xField: all.xField?all.xField:column[0],
yField: all.yField?all.yField:column.length>1?column[1]:column[0],
label: {
position: 'middle',
content: (item) => {
return item[all.xField];
},
style: {
fill: '#fff',
},
},
};
if(all.seriesField){
config.seriesField = all.seriesField;
}
handleChange(config);
};
const getColumnOptions = () => {
const itemList = [];
for (const item of column) {
itemList.push(<Option key={item} value={item} label={item}>
{item}
</Option>)
}
return itemList;
};
return (
<>
<Form
form={form}
className={styles.form_setting}
onValuesChange={onValuesChange}
>
<Row>
<Col span={12}>
<Form.Item
label="x 轴" className={styles.form_item} name="xField"
>
{column&&column.length > 0 ? (
<Select allowClear showSearch
defaultValue={column[0]} value={column[0]}>
{getColumnOptions()}
</Select>):(<Select allowClear showSearch>
{column&&getColumnOptions()}
</Select>)}
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="y 轴" className={styles.form_item} name="yField"
>
{column&&column.length > 1 ? (
<Select allowClear showSearch
defaultValue={column[1]} value={column[1]}>
{getColumnOptions()}
</Select>):(<Select allowClear showSearch>
{column&&getColumnOptions()}
</Select>)}
</Form.Item>
</Col>
</Row>
<Row>
<Col span={12}>
<Form.Item
label="分组字段" className={styles.form_item} name="seriesField"
>
{column&&column.length > 0 ? (
<Select allowClear showSearch>
{getColumnOptions()}
</Select>):(<Select allowClear showSearch>
{column&&getColumnOptions()}
</Select>)}
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="分组" className={styles.form_item} name="isGroup" valuePropName="checked"
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
</Row>
<Row>
<Col span={12}>
<Form.Item
label="堆叠" className={styles.form_item} name="isStack" valuePropName="checked"
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="百分比" className={styles.form_item} name="isPercent" valuePropName="checked"
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
</Row>
</Form>
</>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
result: Studio.result,
}))(BarChartSetting);
@import '~antd/es/style/themes/default.less';
.form_setting{
padding-left: 10px;
}
.form_item{
margin-bottom: 5px;
}
import {Button, Tag, Row, Col, Form, Select, Empty, Switch} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import styles from "./index.less";
import {FireOutlined, SearchOutlined, RedoOutlined, InfoCircleOutlined} from '@ant-design/icons';
import {useEffect, useState} from "react";
import React from "react";
const {Option} = Select;
export type LineChartConfig = {
padding: string,
xField: string,
yField: string,
seriesField?: string,
stepType?: string,
xAxis?: {
type?: string,
},
slider?: {},
};
export type LineChartProps = {
onChange: (values: Partial<LineChartConfig>) => void;
data: [];
column: [];
};
const LineChartSetting: React.FC<LineChartProps> = (props) => {
const {current,column,onChange: handleChange,dispatch} = props;
const [form] = Form.useForm();
useEffect(() => {
form.setFieldsValue(current.console.chart);
}, [current.console.chart]);
const onValuesChange = (change: any, all: any) => {
let config: LineChartConfig = {
padding: 'auto',
xField: all.xField?all.xField:column[0],
yField: all.yField?all.yField:column.length>1?column[1]:column[0],
};
if(all.seriesField){
config.seriesField = all.seriesField;
}
if(all.openStepType){
config.stepType = 'hv';
}
if(all.openSlider){
config.slider = {
start: 0,
end: 0.5,
};
}
handleChange(config);
};
const getColumnOptions = () => {
const itemList = [];
for (const item of column) {
itemList.push(<Option key={item} value={item} label={item}>
{item}
</Option>)
}
return itemList;
};
return (
<>
<Form
form={form}
className={styles.form_setting}
onValuesChange={onValuesChange}
>
<Row>
<Col span={12}>
<Form.Item
label="x 轴" className={styles.form_item} name="xField"
>
{column&&column.length > 0 ? (
<Select allowClear showSearch
defaultValue={column[0]} value={column[0]}>
{getColumnOptions()}
</Select>):(<Select allowClear showSearch>
{column&&getColumnOptions()}
</Select>)}
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="y 轴" className={styles.form_item} name="yField"
>
{column&&column.length > 1 ? (
<Select allowClear showSearch
defaultValue={column[1]} value={column[1]}>
{getColumnOptions()}
</Select>):(<Select allowClear showSearch>
{column&&getColumnOptions()}
</Select>)}
</Form.Item>
</Col>
</Row>
<Row>
<Col span={12}>
<Form.Item
label="分组字段" className={styles.form_item} name="seriesField"
>
{column&&column.length > 0 ? (
<Select allowClear showSearch>
{getColumnOptions()}
</Select>):(<Select allowClear showSearch>
{column&&getColumnOptions()}
</Select>)}
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="缩略轴" className={styles.form_item} name="openSlider" valuePropName="checked"
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
</Row>
<Row>
<Col span={12}>
<Form.Item
label="阶梯线" className={styles.form_item} name="openStepType" valuePropName="checked"
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
</Row>
</Form>
</>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
result: Studio.result,
}))(LineChartSetting);
@import '~antd/es/style/themes/default.less';
.form_setting{
padding-left: 10px;
}
.form_item{
margin-bottom: 5px;
}
import {Button, Tag, Row, Col, Form, Select, Empty, Switch} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import styles from "./index.less";
import {FireOutlined, SearchOutlined, RedoOutlined, InfoCircleOutlined} from '@ant-design/icons';
import {useEffect, useState} from "react";
import React from "react";
const {Option} = Select;
export type PieChartConfig = {
angleField: string,
colorField: string,
label: {},
interactions: [],
};
export type PieChartProps = {
onChange: (values: Partial<PieChartConfig>) => void;
data: [];
column: [];
};
const PieChartSetting: React.FC<PieChartProps> = (props) => {
const {current,column,onChange: handleChange,dispatch} = props;
const [form] = Form.useForm();
useEffect(() => {
form.setFieldsValue(current.console.chart);
}, [current.console.chart]);
const onValuesChange = (change: any, all: any) => {
let config: PieChartConfig = {
angleField: all.angleField?all.angleField:column[0],
colorField: all.colorField?all.colorField:column.length>1?column[1]:column[0],
label: {
type: 'inner',
offset: '-30%',
content: ({ percent }) => `${(percent * 100).toFixed(0)}%`,
style: {
fontSize: 14,
textAlign: 'center',
},
},
interactions: [
{
type: 'element-active',
},
],
};
handleChange(config);
};
const getColumnOptions = () => {
const itemList = [];
for (const item of column) {
itemList.push(<Option key={item} value={item} label={item}>
{item}
</Option>)
}
return itemList;
};
return (
<>
<Form
form={form}
className={styles.form_setting}
onValuesChange={onValuesChange}
>
<Row>
<Col span={12}>
<Form.Item
label="弧轴" className={styles.form_item} name="angleField"
>
{column&&column.length > 0 ? (
<Select allowClear showSearch
defaultValue={column[0]} value={column[0]}>
{getColumnOptions()}
</Select>):(<Select allowClear showSearch>
{column&&getColumnOptions()}
</Select>)}
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="颜色" className={styles.form_item} name="colorField"
>
{column&&column.length > 1 ? (
<Select allowClear showSearch
defaultValue={column[1]} value={column[1]}>
{getColumnOptions()}
</Select>):(<Select allowClear showSearch>
{column&&getColumnOptions()}
</Select>)}
</Form.Item>
</Col>
</Row>
</Form>
</>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
result: Studio.result,
}))(PieChartSetting);
@import '~antd/es/style/themes/default.less';
.form_setting{
padding-left: 10px;
}
.form_item{
margin-bottom: 5px;
}
import {Button, Tag,Row, Col,Form,Select, Empty} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import styles from "./index.less";
import {RedoOutlined} from '@ant-design/icons';
import {CHART, isSql} from "@/components/Studio/conf";
import { Line,Bar,Pie } from '@ant-design/plots';
import React, {useEffect, useState} from "react";
import LineChartSetting from "./LineChartSetting";
import BarChartSetting from "./BarChartSetting";
import PieChartSetting from "./PieChartSetting";
import {showJobData} from "@/components/Studio/StudioEvent/DQL";
import {Dispatch} from "@@/plugin-dva/connect";
const {Option} = Select;
const Chart = (props:any) => {
const {current,result,height,dispatch} = props;
const [config, setConfig] = useState({});
const [type, setType] = useState<string>(CHART.LINE);
const [form] = Form.useForm();
useEffect(() => {
form.setFieldsValue(current.console.chart);
}, [current.console.chart]);
const toRebuild = () => {
if(!isSql(current.task.dialect)){
showJobData(current.console.result.jobId,dispatch);
}
};
const onValuesChange = (change: any, all: any) => {
if(change.type){
setType(change.type);
props.saveChart({type:change.type});
}
};
const renderChartSetting = () => {
if(!current.console.chart||!current.console.result.result){
return undefined;
}
switch (type){
case CHART.LINE:
return <LineChartSetting column={current.console.result.result.columns} onChange={(value) => {
setConfig(value);
props.saveChart({...value,type: current.console.chart.type});
}} />;
case CHART.BAR:
return <BarChartSetting column={current.console.result.result.columns} onChange={(value) => {
setConfig(value);
props.saveChart({...value,type: current.console.chart.type});
}} />;
case CHART.PIE:
return <PieChartSetting column={current.console.result.result.columns} onChange={(value) => {
setConfig(value);
props.saveChart({...value,type: current.console.chart.type});
}} />;
default:
return <LineChartSetting column={current.console.result.result.columns} onChange={(value) => {
setConfig(value);
props.saveChart({...value,type: current.console.chart.type});
}} />
}
};
const renderChartContent = () => {
if(!current.console.result.result||!current.console.result.result.columns){
return <Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />;
}
switch (current.console.chart.type){
case CHART.LINE:
return <Line data={current.console.result.result.rowData} {...config} />;
case CHART.BAR:
return <Bar data={current.console.result.result.rowData} {...config} />;
case CHART.PIE:
if(config.angleField){
return <Pie data={current.console.result.result.rowData} {...config} />;
} else {
return <Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />;
}
default:
return <Line data={current.console.result.result.rowData} {...config} />;
}
};
return (
<div style={{width: '100%'}}>
<Row>
<Col span={16} style={{padding:'20px'}}>
{renderChartContent()}
</Col>
<Col span={8}>
<Form
form={form}
className={styles.form_setting}
onValuesChange={onValuesChange}
>
<Row>
<Col span={12}>
<Form.Item
label="图形类型" className={styles.form_item} name="type"
>
<Select defaultValue={CHART.LINE} value={CHART.LINE}>
<Option value={CHART.LINE}>{CHART.LINE}</Option>
<Option value={CHART.BAR}>{CHART.BAR}</Option>
<Option value={CHART.PIE}>{CHART.PIE}</Option>
</Select>
</Form.Item>
</Col>
{ !isSql(current.task.dialect) ? <Col span={12}>
<Button type="primary" onClick={toRebuild} icon={<RedoOutlined />}>
刷新数据
</Button>
</Col>:undefined}
</Row>
</Form>
{renderChartSetting()}
</Col>
</Row>
</div>
);
};
const mapDispatchToProps = (dispatch: Dispatch)=>({
saveChart:(chart: any)=>dispatch({
type: "Studio/saveChart",
payload: chart,
}),
})
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
result: Studio.result,
}),mapDispatchToProps)(Chart);
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment