Commit 2a2d4f6b authored by wenmo's avatar wenmo

UDFJava 调试与加载

parent f9cfbc03
...@@ -49,7 +49,7 @@ Dinky(原 Dlink): ...@@ -49,7 +49,7 @@ Dinky(原 Dlink):
| | | 新增 选中片段执行 | 0.4.0 | | | | 新增 选中片段执行 | 0.4.0 |
| | | 新增 布局拖拽 | 0.4.0 | | | | 新增 布局拖拽 | 0.4.0 |
| | | 新增 SQL导出 | 0.5.0 | | | | 新增 SQL导出 | 0.5.0 |
| | | 新增 快捷键保存、校验、美化 | 0.5.0 | | | | 新增 快捷键保存、校验、美化 | 0.5.0 |
| | | 支持 local 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 local 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 standalone 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 standalone 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn session 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 yarn session 模式下 FlinkSQL 提交 | 0.4.0 |
...@@ -57,12 +57,12 @@ Dinky(原 Dlink): ...@@ -57,12 +57,12 @@ Dinky(原 Dlink):
| | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 | | | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 | | | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 UDF Java 方言Local模式在线编写、调试、动态加载 | 0.5.0 |
| | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 | | | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 |
| | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 | | | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 |
| | | 支持 作业 Cancel | 0.4.0 | | | | 支持 作业 Cancel | 0.4.0 |
| | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 | | | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 |
| | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 | | | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 |
| | | 新增 UDF java方言代码的开发 | 0.5.0 |
| | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 | | | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 |
| | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 | | | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 |
| | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 | | | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 |
......
...@@ -24,4 +24,6 @@ public interface TaskService extends ISuperService<Task> { ...@@ -24,4 +24,6 @@ public interface TaskService extends ISuperService<Task> {
List<Task> listFlinkSQLEnv(); List<Task> listFlinkSQLEnv();
String exportSql(Integer id); String exportSql(Integer id);
Task getUDFByClassName(String className);
} }
...@@ -27,6 +27,7 @@ import com.dlink.session.SessionConfig; ...@@ -27,6 +27,7 @@ import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo; import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import com.dlink.utils.RunTimeUtil; import com.dlink.utils.RunTimeUtil;
import com.dlink.utils.UDFUtil;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
...@@ -89,6 +90,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -89,6 +90,7 @@ public class StudioServiceImpl implements StudioService {
if(!config.isUseSession()) { if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
} }
initUDF(config,studioExecuteDTO.getStatement());
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement()); JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
RunTimeUtil.recovery(jobManager); RunTimeUtil.recovery(jobManager);
...@@ -152,6 +154,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -152,6 +154,7 @@ public class StudioServiceImpl implements StudioService {
if(!config.isUseSession()) { if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
} }
initUDF(config,studioExecuteDTO.getStatement());
JobManager jobManager = JobManager.buildPlanMode(config); JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults(); return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
} }
...@@ -317,4 +320,15 @@ public class StudioServiceImpl implements StudioService { ...@@ -317,4 +320,15 @@ public class StudioServiceImpl implements StudioService {
} }
return false; return false;
} }
private void initUDF(JobConfig config,String statement){
if(!GatewayType.LOCAL.equalsValue(config.getType())){
return;
}
List<String> udfClassNameList = JobManager.getUDFClassName(statement);
for(String item : udfClassNameList){
Task task = taskService.getUDFByClassName(item);
JobManager.initUDF(item,task.getStatement());
}
}
} }
...@@ -15,6 +15,7 @@ import com.dlink.job.JobResult; ...@@ -15,6 +15,7 @@ import com.dlink.job.JobResult;
import com.dlink.mapper.TaskMapper; import com.dlink.mapper.TaskMapper;
import com.dlink.model.*; import com.dlink.model.*;
import com.dlink.service.*; import com.dlink.service.*;
import com.dlink.utils.CustomStringJavaCompiler;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -100,6 +101,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -100,6 +101,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override @Override
public boolean saveOrUpdateTask(Task task) { public boolean saveOrUpdateTask(Task task) {
if(Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.equalsVal(task.getDialect())
&& Asserts.isNotNullString(task.getStatement()) ){
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement());
task.setSavePointPath(compiler.getFullClassName());
}
if (task.getId() != null) { if (task.getId() != null) {
this.updateById(task); this.updateById(task);
if (task.getStatement() != null) { if (task.getStatement() != null) {
...@@ -151,6 +157,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -151,6 +157,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
} }
@Override
public Task getUDFByClassName(String className) {
Task task = getOne(new QueryWrapper<Task>().eq("dialect", "Java").eq("enabled", 1).eq("save_point_path", className));
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
return task;
}
private JobConfig buildJobConfig(Task task){ private JobConfig buildJobConfig(Task task){
boolean isJarTask = isJarTask(task); boolean isJarTask = isJarTask(task);
if(!isJarTask&&Asserts.isNotNull(task.getEnvId())){ if(!isJarTask&&Asserts.isNotNull(task.getEnvId())){
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.types.extraction;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
/** Utilities for performing reflection tasks. */
@Internal
public final class ExtractionUtils {
// --------------------------------------------------------------------------------------------
// Methods shared across packages
// --------------------------------------------------------------------------------------------
/** Collects methods of the given name. */
public static List<Method> collectMethods(Class<?> function, String methodName) {
return Arrays.stream(function.getMethods())
.filter(method -> method.getName().equals(methodName))
.sorted(Comparator.comparing(Method::toString)) // for deterministic order
.collect(Collectors.toList());
}
/**
* Checks whether a method/constructor can be called with the given argument classes. This
* includes type widening and vararg. {@code null} is a wildcard.
*
* <p>E.g., {@code (int.class, int.class)} matches {@code f(Object...), f(int, int), f(Integer,
* Object)} and so forth.
*/
public static boolean isInvokable(Executable executable, Class<?>... classes) {
final int m = executable.getModifiers();
if (!Modifier.isPublic(m)) {
return false;
}
final int paramCount = executable.getParameterCount();
final int classCount = classes.length;
// check for enough classes for each parameter
if ((!executable.isVarArgs() && classCount != paramCount)
|| (executable.isVarArgs() && classCount < paramCount - 1)) {
return false;
}
int currentClass = 0;
for (int currentParam = 0; currentParam < paramCount; currentParam++) {
final Class<?> param = executable.getParameterTypes()[currentParam];
// last parameter is a vararg that needs to consume remaining classes
if (currentParam == paramCount - 1 && executable.isVarArgs()) {
final Class<?> paramComponent =
executable.getParameterTypes()[currentParam].getComponentType();
// we have more than 1 classes left so the vararg needs to consume them all
if (classCount - currentClass > 1) {
while (currentClass < classCount
&& ExtractionUtils.isAssignable(
classes[currentClass], paramComponent, true)) {
currentClass++;
}
} else if (currentClass < classCount
&& (parameterMatches(classes[currentClass], param)
|| parameterMatches(classes[currentClass], paramComponent))) {
currentClass++;
}
}
// entire parameter matches
else if (parameterMatches(classes[currentClass], param)) {
currentClass++;
}
}
// check if all classes have been consumed
return currentClass == classCount;
}
private static boolean parameterMatches(Class<?> clz, Class<?> param) {
return clz == null || ExtractionUtils.isAssignable(clz, param, true);
}
/** Creates a method signature string like {@code int eval(Integer, String)}. */
public static String createMethodSignatureString(
String methodName, Class<?>[] parameters, @Nullable Class<?> returnType) {
final StringBuilder builder = new StringBuilder();
if (returnType != null) {
builder.append(returnType.getCanonicalName()).append(" ");
}
builder.append(methodName)
.append(
Stream.of(parameters)
.map(
parameter -> {
// in case we don't know the parameter at this location
// (i.e. for accumulators)
if (parameter == null) {
return "_";
} else {
return parameter.getCanonicalName();
}
})
.collect(Collectors.joining(", ", "(", ")")));
return builder.toString();
}
/**
* Validates the characteristics of a class for a {@link StructuredType} such as accessibility.
*/
public static void validateStructuredClass(Class<?> clazz) {
final int m = clazz.getModifiers();
if (Modifier.isAbstract(m)) {
throw extractionError("Class '%s' must not be abstract.", clazz.getName());
}
if (!Modifier.isPublic(m)) {
throw extractionError("Class '%s' is not public.", clazz.getName());
}
if (clazz.getEnclosingClass() != null
&& (clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) {
throw extractionError(
"Class '%s' is a not a static, globally accessible class.", clazz.getName());
}
}
/**
* Returns the field of a structured type. The logic is as broad as possible to support both
* Java and Scala in different flavors.
*/
public static Field getStructuredField(Class<?> clazz, String fieldName) {
final String normalizedFieldName = fieldName.toUpperCase();
final List<Field> fields = collectStructuredFields(clazz);
for (Field field : fields) {
if (field.getName().toUpperCase().equals(normalizedFieldName)) {
return field;
}
}
throw extractionError(
"Could not to find a field named '%s' in class '%s' for structured type.",
fieldName, clazz.getName());
}
/**
* Checks for a field getter of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
*/
public static Optional<Method> getStructuredFieldGetter(Class<?> clazz, Field field) {
final String normalizedFieldName = field.getName().toUpperCase();
final List<Method> methods = collectStructuredMethods(clazz);
for (Method method : methods) {
// check name:
// get<Name>()
// is<Name>()
// <Name>() for Scala
final String normalizedMethodName = method.getName().toUpperCase();
final boolean hasName =
normalizedMethodName.equals("GET" + normalizedFieldName)
|| normalizedMethodName.equals("IS" + normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName);
if (!hasName) {
continue;
}
// check return type:
// equal to field type
final Type returnType = method.getGenericReturnType();
final boolean hasReturnType = returnType.equals(field.getGenericType());
if (!hasReturnType) {
continue;
}
// check parameters:
// no parameters
final boolean hasNoParameters = method.getParameterCount() == 0;
if (!hasNoParameters) {
continue;
}
// matching getter found
return Optional.of(method);
}
// no getter found
return Optional.empty();
}
/**
* Checks for a field setters of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
*/
public static Optional<Method> getStructuredFieldSetter(Class<?> clazz, Field field) {
final String normalizedFieldName = field.getName().toUpperCase();
final List<Method> methods = collectStructuredMethods(clazz);
for (Method method : methods) {
// check name:
// set<Name>(type)
// <Name>(type)
// <Name>_$eq(type) for Scala
final String normalizedMethodName = method.getName().toUpperCase();
final boolean hasName =
normalizedMethodName.equals("SET" + normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName + "_$EQ");
if (!hasName) {
continue;
}
// check return type:
// void or the declaring class
final Class<?> returnType = method.getReturnType();
final boolean hasReturnType = returnType == Void.TYPE || returnType == clazz;
if (!hasReturnType) {
continue;
}
// check parameters:
// one parameter that has the same (or primitive) type of the field
final boolean hasParameter =
method.getParameterCount() == 1
&& (method.getGenericParameterTypes()[0].equals(field.getGenericType())
|| primitiveToWrapper(method.getGenericParameterTypes()[0])
.equals(field.getGenericType()));
if (!hasParameter) {
continue;
}
// matching setter found
return Optional.of(method);
}
// no setter found
return Optional.empty();
}
/**
* Checks for an invokable constructor matching the given arguments.
*
* @see #isInvokable(Executable, Class[])
*/
public static boolean hasInvokableConstructor(Class<?> clazz, Class<?>... classes) {
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
if (isInvokable(constructor, classes)) {
return true;
}
}
return false;
}
/** Checks whether a field is directly readable without a getter. */
public static boolean isStructuredFieldDirectlyReadable(Field field) {
final int m = field.getModifiers();
// field is directly readable
return Modifier.isPublic(m);
}
/** Checks whether a field is directly writable without a setter or constructor. */
public static boolean isStructuredFieldDirectlyWritable(Field field) {
final int m = field.getModifiers();
// field is immutable
if (Modifier.isFinal(m)) {
return false;
}
// field is directly writable
return Modifier.isPublic(m);
}
// --------------------------------------------------------------------------------------------
// Methods intended for this package
// --------------------------------------------------------------------------------------------
/** Helper method for creating consistent exceptions during extraction. */
static ValidationException extractionError(String message, Object... args) {
return extractionError(null, message, args);
}
/** Helper method for creating consistent exceptions during extraction. */
static ValidationException extractionError(Throwable cause, String message, Object... args) {
return new ValidationException(String.format(message, args), cause);
}
/**
* Collects the partially ordered type hierarchy (i.e. all involved super classes and super
* interfaces) of the given type.
*/
static List<Type> collectTypeHierarchy(Type type) {
Type currentType = type;
Class<?> currentClass = toClass(type);
final List<Type> typeHierarchy = new ArrayList<>();
while (currentClass != null) {
// collect type
typeHierarchy.add(currentType);
// collect super interfaces
for (Type genericInterface : currentClass.getGenericInterfaces()) {
final Class<?> interfaceClass = toClass(genericInterface);
if (interfaceClass != null) {
typeHierarchy.addAll(collectTypeHierarchy(genericInterface));
}
}
currentType = currentClass.getGenericSuperclass();
currentClass = toClass(currentType);
}
return typeHierarchy;
}
/** Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise. */
static @Nullable Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
// this is always a class
return (Class<?>) ((ParameterizedType) type).getRawType();
}
// unsupported: generic arrays, type variables, wildcard types
return null;
}
/** Creates a raw data type. */
@SuppressWarnings({"unchecked", "rawtypes"})
static DataType createRawType(
DataTypeFactory typeFactory,
@Nullable Class<? extends TypeSerializer<?>> rawSerializer,
@Nullable Class<?> conversionClass) {
if (rawSerializer != null) {
return DataTypes.RAW(
(Class) createConversionClass(conversionClass),
instantiateRawSerializer(rawSerializer));
}
return typeFactory.createRawDataType(createConversionClass(conversionClass));
}
static Class<?> createConversionClass(@Nullable Class<?> conversionClass) {
if (conversionClass != null) {
return conversionClass;
}
return Object.class;
}
private static TypeSerializer<?> instantiateRawSerializer(
Class<? extends TypeSerializer<?>> rawSerializer) {
try {
return rawSerializer.newInstance();
} catch (Exception e) {
throw extractionError(
e,
"Cannot instantiate type serializer '%s' for RAW type. "
+ "Make sure the class is publicly accessible and has a default constructor.",
rawSerializer.getName());
}
}
/** Resolves a {@link TypeVariable} using the given type hierarchy if possible. */
static Type resolveVariable(List<Type> typeHierarchy, TypeVariable<?> variable) {
// iterate through hierarchy from top to bottom until type variable gets a non-variable
// assigned
for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
final Type currentType = typeHierarchy.get(i);
if (currentType instanceof ParameterizedType) {
final Type resolvedType =
resolveVariableInParameterizedType(
variable, (ParameterizedType) currentType);
if (resolvedType instanceof TypeVariable) {
// follow type variables transitively
variable = (TypeVariable<?>) resolvedType;
} else if (resolvedType != null) {
return resolvedType;
}
}
}
// unresolved variable
return variable;
}
private static @Nullable Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
// search for matching type variable
for (int paramPos = 0; paramPos < currentVariables.length; paramPos++) {
if (typeVariableEquals(variable, currentVariables[paramPos])) {
return currentType.getActualTypeArguments()[paramPos];
}
}
return null;
}
private static boolean typeVariableEquals(
TypeVariable<?> variable, TypeVariable<?> currentVariable) {
return currentVariable.getGenericDeclaration().equals(variable.getGenericDeclaration())
&& currentVariable.getName().equals(variable.getName());
}
/**
* Validates if a given type is not already contained in the type hierarchy of a structured
* type.
*
* <p>Otherwise this would lead to infinite data type extraction cycles.
*/
static void validateStructuredSelfReference(Type t, List<Type> typeHierarchy) {
final Class<?> clazz = toClass(t);
if (clazz != null
&& !clazz.isInterface()
&& clazz != Object.class
&& typeHierarchy.contains(t)) {
throw extractionError(
"Cyclic reference detected for class '%s'. Attributes of structured types must not "
+ "(transitively) reference the structured type itself.",
clazz.getName());
}
}
/** Returns the fields of a class for a {@link StructuredType}. */
static List<Field> collectStructuredFields(Class<?> clazz) {
final List<Field> fields = new ArrayList<>();
while (clazz != Object.class) {
final Field[] declaredFields = clazz.getDeclaredFields();
Stream.of(declaredFields)
.filter(
field -> {
final int m = field.getModifiers();
return !Modifier.isStatic(m) && !Modifier.isTransient(m);
})
.forEach(fields::add);
clazz = clazz.getSuperclass();
}
return fields;
}
/** Validates if a field is properly readable either directly or through a getter. */
static void validateStructuredFieldReadability(Class<?> clazz, Field field) {
// field is accessible
if (isStructuredFieldDirectlyReadable(field)) {
return;
}
// field needs a getter
if (!getStructuredFieldGetter(clazz, field).isPresent()) {
throw extractionError(
"Field '%s' of class '%s' is neither publicly accessible nor does it have "
+ "a corresponding getter method.",
field.getName(), clazz.getName());
}
}
/**
* Checks if a field is mutable or immutable. Returns {@code true} if the field is properly
* mutable. Returns {@code false} if it is properly immutable.
*/
static boolean isStructuredFieldMutable(Class<?> clazz, Field field) {
final int m = field.getModifiers();
// field is immutable
if (Modifier.isFinal(m)) {
return false;
}
// field is directly mutable
if (Modifier.isPublic(m)) {
return true;
}
// field has setters by which it is mutable
if (getStructuredFieldSetter(clazz, field).isPresent()) {
return true;
}
throw extractionError(
"Field '%s' of class '%s' is mutable but is neither publicly accessible nor does it have "
+ "a corresponding setter method.",
field.getName(), clazz.getName());
}
/** Returns the boxed type of a primitive type. */
static Type primitiveToWrapper(Type type) {
if (type instanceof Class) {
return primitiveToWrapper((Class<?>) type);
}
return type;
}
/** Collects all methods that qualify as methods of a {@link StructuredType}. */
static List<Method> collectStructuredMethods(Class<?> clazz) {
final List<Method> methods = new ArrayList<>();
while (clazz != Object.class) {
final Method[] declaredMethods = clazz.getDeclaredMethods();
Stream.of(declaredMethods)
.filter(
field -> {
final int m = field.getModifiers();
return Modifier.isPublic(m)
&& !Modifier.isNative(m)
&& !Modifier.isAbstract(m);
})
.forEach(methods::add);
clazz = clazz.getSuperclass();
}
return methods;
}
/**
* Collects all annotations of the given type defined in the current class or superclasses.
* Duplicates are ignored.
*/
static <T extends Annotation> Set<T> collectAnnotationsOfClass(
Class<T> annotation, Class<?> annotatedClass) {
final List<Class<?>> classHierarchy = new ArrayList<>();
Class<?> currentClass = annotatedClass;
while (currentClass != null) {
classHierarchy.add(currentClass);
currentClass = currentClass.getSuperclass();
}
// convert to top down
Collections.reverse(classHierarchy);
return classHierarchy.stream()
.flatMap(c -> Stream.of(c.getAnnotationsByType(annotation)))
.collect(Collectors.toCollection(LinkedHashSet::new));
}
/**
* Collects all annotations of the given type defined in the given method. Duplicates are
* ignored.
*/
static <T extends Annotation> Set<T> collectAnnotationsOfMethod(
Class<T> annotation, Method annotatedMethod) {
return new LinkedHashSet<>(Arrays.asList(annotatedMethod.getAnnotationsByType(annotation)));
}
// --------------------------------------------------------------------------------------------
// Parameter Extraction Utilities
// --------------------------------------------------------------------------------------------
/** Result of the extraction in {@link #extractAssigningConstructor(Class, List)}. */
static class AssigningConstructor {
public final Constructor<?> constructor;
public final List<String> parameterNames;
private AssigningConstructor(Constructor<?> constructor, List<String> parameterNames) {
this.constructor = constructor;
this.parameterNames = parameterNames;
}
}
/**
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
static @Nullable AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
final boolean qualifyingConstructor =
Modifier.isPublic(constructor.getModifiers())
&& constructor.getParameterTypes().length == fields.size();
if (!qualifyingConstructor) {
continue;
}
final List<String> parameterNames =
extractConstructorParameterNames(constructor, fields);
if (parameterNames != null) {
if (foundConstructor != null) {
throw extractionError(
"Multiple constructors found that assign all fields for class '%s'.",
clazz.getName());
}
foundConstructor = new AssigningConstructor(constructor, parameterNames);
}
}
return foundConstructor;
}
/** Extracts the parameter names of a method if possible. */
static @Nullable List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
/**
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive) type and name.
*/
private static @Nullable List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
List<String> parameterNames = extractExecutableNames(constructor);
if (parameterNames == null) {
return null;
}
final Map<String, Type> fieldMap =
fields.stream().collect(Collectors.toMap(Field::getName, Field::getGenericType));
// check that all fields are represented in the parameters of the constructor
for (int i = 0; i < parameterNames.size(); i++) {
final String parameterName = parameterNames.get(i);
final Type fieldType = fieldMap.get(parameterName); // might be null
final Type parameterType = parameterTypes[i];
// we are tolerant here because frameworks such as Avro accept a boxed type even though
// the field is primitive
if (!primitiveToWrapper(parameterType).equals(primitiveToWrapper(fieldType))) {
return null;
}
}
return parameterNames;
}
private static @Nullable List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
offset = 1;
} else {
offset = 0;
}
// by default parameter names are "arg0, arg1, arg2, ..." if compiler flag is not set
// so we need to extract them manually if possible
List<String> parameterNames =
Stream.of(executable.getParameters())
.map(Parameter::getName)
.collect(Collectors.toList());
if (parameterNames.stream().allMatch(n -> n.startsWith("arg"))) {
final ParameterExtractor extractor;
if (executable instanceof Constructor) {
extractor = new ParameterExtractor((Constructor<?>) executable);
} else {
extractor = new ParameterExtractor((Method) executable);
}
getClassReader(executable.getDeclaringClass()).accept(extractor, 0);
final List<String> extractedNames = extractor.getParameterNames();
if (extractedNames.size() == 0) {
return null;
}
// remove "this" and additional local variables
// select less names if class file has not the required information
parameterNames =
extractedNames.subList(
offset,
Math.min(
executable.getParameterCount() + offset,
extractedNames.size()));
}
if (parameterNames.size() != executable.getParameterCount()) {
return null;
}
return parameterNames;
}
private static ClassReader getClassReader(Class<?> cls) {
final String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
if(ClassPool.exist(cls.getName())){
return new ClassReader(ClassPool.get(cls.getName()).getClassByte());
}
try {
return new ClassReader(cls.getResourceAsStream(className));
} catch (IOException e) {
throw new IllegalStateException("Could not instantiate ClassReader.", e);
}
}
/**
* Extracts the parameter names and descriptors from a constructor or method. Assuming the
* existence of a local variable table.
*
* <p>For example:
*
* <pre>{@code
* public WC(java.lang.String arg0, long arg1) { // <init> //(Ljava/lang/String;J)V
* <localVar:index=0 , name=this , desc=Lorg/apache/flink/WC;, sig=null, start=L1, end=L2>
* <localVar:index=1 , name=word , desc=Ljava/lang/String;, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=frequency , desc=J, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=otherLocal , desc=J, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=otherLocal2 , desc=J, sig=null, start=L1, end=L2>
* }
* }</pre>
*/
private static class ParameterExtractor extends ClassVisitor {
private static final int OPCODE = Opcodes.ASM7;
private final String methodDescriptor;
private final List<String> parameterNames = new ArrayList<>();
ParameterExtractor(Constructor<?> constructor) {
super(OPCODE);
methodDescriptor = getConstructorDescriptor(constructor);
}
ParameterExtractor(Method method) {
super(OPCODE);
methodDescriptor = getMethodDescriptor(method);
}
List<String> getParameterNames() {
return parameterNames;
}
@Override
public MethodVisitor visitMethod(
int access, String name, String descriptor, String signature, String[] exceptions) {
if (descriptor.equals(methodDescriptor)) {
return new MethodVisitor(OPCODE) {
@Override
public void visitLocalVariable(
String name,
String descriptor,
String signature,
Label start,
Label end,
int index) {
parameterNames.add(name);
}
};
}
return super.visitMethod(access, name, descriptor, signature, exceptions);
}
}
// --------------------------------------------------------------------------------------------
// Class Assignment and Boxing
//
// copied from o.a.commons.lang3.ClassUtils (commons-lang3:3.3.2)
// --------------------------------------------------------------------------------------------
/**
* Checks if one {@code Class} can be assigned to a variable of another {@code Class}.
*
* <p>Unlike the {@link Class#isAssignableFrom(java.lang.Class)} method, this method takes into
* account widenings of primitive classes and {@code null}s.
*
* <p>Primitive widenings allow an int to be assigned to a long, float or double. This method
* returns the correct result for these cases.
*
* <p>{@code Null} may be assigned to any reference type. This method will return {@code true}
* if {@code null} is passed in and the toClass is non-primitive.
*
* <p>Specifically, this method tests whether the type represented by the specified {@code
* Class} parameter can be converted to the type represented by this {@code Class} object via an
* identity conversion widening primitive or widening reference conversion. See <em><a
* href="http://docs.oracle.com/javase/specs/">The Java Language Specification</a></em>,
* sections 5.1.1, 5.1.2 and 5.1.4 for details.
*
* @param cls the Class to check, may be null
* @param toClass the Class to try to assign into, returns false if null
* @param autoboxing whether to use implicit autoboxing/unboxing between primitives and wrappers
* @return {@code true} if assignment possible
*/
public static boolean isAssignable(
Class<?> cls, final Class<?> toClass, final boolean autoboxing) {
if (toClass == null) {
return false;
}
// have to check for null, as isAssignableFrom doesn't
if (cls == null) {
return !toClass.isPrimitive();
}
// autoboxing:
if (autoboxing) {
if (cls.isPrimitive() && !toClass.isPrimitive()) {
cls = primitiveToWrapper(cls);
if (cls == null) {
return false;
}
}
if (toClass.isPrimitive() && !cls.isPrimitive()) {
cls = wrapperToPrimitive(cls);
if (cls == null) {
return false;
}
}
}
if (cls.equals(toClass)) {
return true;
}
if (cls.isPrimitive()) {
if (!toClass.isPrimitive()) {
return false;
}
if (Integer.TYPE.equals(cls)) {
return Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Long.TYPE.equals(cls)) {
return Float.TYPE.equals(toClass) || Double.TYPE.equals(toClass);
}
if (Boolean.TYPE.equals(cls)) {
return false;
}
if (Double.TYPE.equals(cls)) {
return false;
}
if (Float.TYPE.equals(cls)) {
return Double.TYPE.equals(toClass);
}
if (Character.TYPE.equals(cls)) {
return Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Short.TYPE.equals(cls)) {
return Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Byte.TYPE.equals(cls)) {
return Short.TYPE.equals(toClass)
|| Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
// should never get here
return false;
}
return toClass.isAssignableFrom(cls);
}
/** Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */
private static final Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
static {
primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
primitiveWrapperMap.put(Byte.TYPE, Byte.class);
primitiveWrapperMap.put(Character.TYPE, Character.class);
primitiveWrapperMap.put(Short.TYPE, Short.class);
primitiveWrapperMap.put(Integer.TYPE, Integer.class);
primitiveWrapperMap.put(Long.TYPE, Long.class);
primitiveWrapperMap.put(Double.TYPE, Double.class);
primitiveWrapperMap.put(Float.TYPE, Float.class);
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
/** Maps wrapper {@code Class}es to their corresponding primitive types. */
private static final Map<Class<?>, Class<?>> wrapperPrimitiveMap = new HashMap<>();
static {
for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
if (!primitiveClass.equals(wrapperClass)) {
wrapperPrimitiveMap.put(wrapperClass, primitiveClass);
}
}
}
/**
* Converts the specified primitive Class object to its corresponding wrapper Class object.
*
* <p>NOTE: From v2.2, this method handles {@code Void.TYPE}, returning {@code Void.TYPE}.
*
* @param cls the class to convert, may be null
* @return the wrapper class for {@code cls} or {@code cls} if {@code cls} is not a primitive.
* {@code null} if null input.
* @since 2.1
*/
public static Class<?> primitiveToWrapper(final Class<?> cls) {
Class<?> convertedClass = cls;
if (cls != null && cls.isPrimitive()) {
convertedClass = primitiveWrapperMap.get(cls);
}
return convertedClass;
}
/**
* Converts the specified wrapper class to its corresponding primitive class.
*
* <p>This method is the counter part of {@code primitiveToWrapper()}. If the passed in class is
* a wrapper class for a primitive type, this primitive type will be returned (e.g. {@code
* Integer.TYPE} for {@code Integer.class}). For other classes, or if the parameter is
* <b>null</b>, the return value is <b>null</b>.
*
* @param cls the class to convert, may be <b>null</b>
* @return the corresponding primitive type if {@code cls} is a wrapper class, <b>null</b>
* otherwise
* @see #primitiveToWrapper(Class)
* @since 2.4
*/
public static Class<?> wrapperToPrimitive(final Class<?> cls) {
return wrapperPrimitiveMap.get(cls);
}
// --------------------------------------------------------------------------------------------
private ExtractionUtils() {
// no instantiation
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.types.extraction;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
/** Utilities for performing reflection tasks. */
@Internal
public final class ExtractionUtils {
// --------------------------------------------------------------------------------------------
// Methods shared across packages
// --------------------------------------------------------------------------------------------
/** Collects methods of the given name. */
public static List<Method> collectMethods(Class<?> function, String methodName) {
return Arrays.stream(function.getMethods())
.filter(method -> method.getName().equals(methodName))
.sorted(Comparator.comparing(Method::toString)) // for deterministic order
.collect(Collectors.toList());
}
/**
* Checks whether a method/constructor can be called with the given argument classes. This
* includes type widening and vararg. {@code null} is a wildcard.
*
* <p>E.g., {@code (int.class, int.class)} matches {@code f(Object...), f(int, int), f(Integer,
* Object)} and so forth.
*/
public static boolean isInvokable(Executable executable, Class<?>... classes) {
final int m = executable.getModifiers();
if (!Modifier.isPublic(m)) {
return false;
}
final int paramCount = executable.getParameterCount();
final int classCount = classes.length;
// check for enough classes for each parameter
if ((!executable.isVarArgs() && classCount != paramCount)
|| (executable.isVarArgs() && classCount < paramCount - 1)) {
return false;
}
int currentClass = 0;
for (int currentParam = 0; currentParam < paramCount; currentParam++) {
final Class<?> param = executable.getParameterTypes()[currentParam];
// last parameter is a vararg that needs to consume remaining classes
if (currentParam == paramCount - 1 && executable.isVarArgs()) {
final Class<?> paramComponent =
executable.getParameterTypes()[currentParam].getComponentType();
// we have more than 1 classes left so the vararg needs to consume them all
if (classCount - currentClass > 1) {
while (currentClass < classCount
&& ExtractionUtils.isAssignable(
classes[currentClass], paramComponent, true)) {
currentClass++;
}
} else if (currentClass < classCount
&& (parameterMatches(classes[currentClass], param)
|| parameterMatches(classes[currentClass], paramComponent))) {
currentClass++;
}
}
// entire parameter matches
else if (parameterMatches(classes[currentClass], param)) {
currentClass++;
}
}
// check if all classes have been consumed
return currentClass == classCount;
}
private static boolean parameterMatches(Class<?> clz, Class<?> param) {
return clz == null || ExtractionUtils.isAssignable(clz, param, true);
}
/** Creates a method signature string like {@code int eval(Integer, String)}. */
public static String createMethodSignatureString(
String methodName, Class<?>[] parameters, @Nullable Class<?> returnType) {
final StringBuilder builder = new StringBuilder();
if (returnType != null) {
builder.append(returnType.getCanonicalName()).append(" ");
}
builder.append(methodName)
.append(
Stream.of(parameters)
.map(
parameter -> {
// in case we don't know the parameter at this location
// (i.e. for accumulators)
if (parameter == null) {
return "_";
} else {
return parameter.getCanonicalName();
}
})
.collect(Collectors.joining(", ", "(", ")")));
return builder.toString();
}
/**
* Validates the characteristics of a class for a {@link StructuredType} such as accessibility.
*/
public static void validateStructuredClass(Class<?> clazz) {
final int m = clazz.getModifiers();
if (Modifier.isAbstract(m)) {
throw extractionError("Class '%s' must not be abstract.", clazz.getName());
}
if (!Modifier.isPublic(m)) {
throw extractionError("Class '%s' is not public.", clazz.getName());
}
if (clazz.getEnclosingClass() != null
&& (clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) {
throw extractionError(
"Class '%s' is a not a static, globally accessible class.", clazz.getName());
}
}
/**
* Returns the field of a structured type. The logic is as broad as possible to support both
* Java and Scala in different flavors.
*/
public static Field getStructuredField(Class<?> clazz, String fieldName) {
final String normalizedFieldName = fieldName.toUpperCase();
final List<Field> fields = collectStructuredFields(clazz);
for (Field field : fields) {
if (field.getName().toUpperCase().equals(normalizedFieldName)) {
return field;
}
}
throw extractionError(
"Could not to find a field named '%s' in class '%s' for structured type.",
fieldName, clazz.getName());
}
/**
* Checks for a field getter of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
*/
public static Optional<Method> getStructuredFieldGetter(Class<?> clazz, Field field) {
final String normalizedFieldName = field.getName().toUpperCase();
final List<Method> methods = collectStructuredMethods(clazz);
for (Method method : methods) {
// check name:
// get<Name>()
// is<Name>()
// <Name>() for Scala
final String normalizedMethodName = method.getName().toUpperCase();
final boolean hasName =
normalizedMethodName.equals("GET" + normalizedFieldName)
|| normalizedMethodName.equals("IS" + normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName);
if (!hasName) {
continue;
}
// check return type:
// equal to field type
final Type returnType = method.getGenericReturnType();
final boolean hasReturnType = returnType.equals(field.getGenericType());
if (!hasReturnType) {
continue;
}
// check parameters:
// no parameters
final boolean hasNoParameters = method.getParameterCount() == 0;
if (!hasNoParameters) {
continue;
}
// matching getter found
return Optional.of(method);
}
// no getter found
return Optional.empty();
}
/**
* Checks for a field setters of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
*/
public static Optional<Method> getStructuredFieldSetter(Class<?> clazz, Field field) {
final String normalizedFieldName = field.getName().toUpperCase();
final List<Method> methods = collectStructuredMethods(clazz);
for (Method method : methods) {
// check name:
// set<Name>(type)
// <Name>(type)
// <Name>_$eq(type) for Scala
final String normalizedMethodName = method.getName().toUpperCase();
final boolean hasName =
normalizedMethodName.equals("SET" + normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName + "_$EQ");
if (!hasName) {
continue;
}
// check return type:
// void or the declaring class
final Class<?> returnType = method.getReturnType();
final boolean hasReturnType = returnType == Void.TYPE || returnType == clazz;
if (!hasReturnType) {
continue;
}
// check parameters:
// one parameter that has the same (or primitive) type of the field
final boolean hasParameter =
method.getParameterCount() == 1
&& (method.getGenericParameterTypes()[0].equals(field.getGenericType())
|| primitiveToWrapper(method.getGenericParameterTypes()[0])
.equals(field.getGenericType()));
if (!hasParameter) {
continue;
}
// matching setter found
return Optional.of(method);
}
// no setter found
return Optional.empty();
}
/**
* Checks for an invokable constructor matching the given arguments.
*
* @see #isInvokable(Executable, Class[])
*/
public static boolean hasInvokableConstructor(Class<?> clazz, Class<?>... classes) {
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
if (isInvokable(constructor, classes)) {
return true;
}
}
return false;
}
/** Checks whether a field is directly readable without a getter. */
public static boolean isStructuredFieldDirectlyReadable(Field field) {
final int m = field.getModifiers();
// field is directly readable
return Modifier.isPublic(m);
}
/** Checks whether a field is directly writable without a setter or constructor. */
public static boolean isStructuredFieldDirectlyWritable(Field field) {
final int m = field.getModifiers();
// field is immutable
if (Modifier.isFinal(m)) {
return false;
}
// field is directly writable
return Modifier.isPublic(m);
}
// --------------------------------------------------------------------------------------------
// Methods intended for this package
// --------------------------------------------------------------------------------------------
/** Helper method for creating consistent exceptions during extraction. */
static ValidationException extractionError(String message, Object... args) {
return extractionError(null, message, args);
}
/** Helper method for creating consistent exceptions during extraction. */
static ValidationException extractionError(Throwable cause, String message, Object... args) {
return new ValidationException(String.format(message, args), cause);
}
/**
* Collects the partially ordered type hierarchy (i.e. all involved super classes and super
* interfaces) of the given type.
*/
static List<Type> collectTypeHierarchy(Type type) {
Type currentType = type;
Class<?> currentClass = toClass(type);
final List<Type> typeHierarchy = new ArrayList<>();
while (currentClass != null) {
// collect type
typeHierarchy.add(currentType);
// collect super interfaces
for (Type genericInterface : currentClass.getGenericInterfaces()) {
final Class<?> interfaceClass = toClass(genericInterface);
if (interfaceClass != null) {
typeHierarchy.addAll(collectTypeHierarchy(genericInterface));
}
}
currentType = currentClass.getGenericSuperclass();
currentClass = toClass(currentType);
}
return typeHierarchy;
}
/** Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise. */
static @Nullable Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
// this is always a class
return (Class<?>) ((ParameterizedType) type).getRawType();
}
// unsupported: generic arrays, type variables, wildcard types
return null;
}
/** Creates a raw data type. */
@SuppressWarnings({"unchecked", "rawtypes"})
static DataType createRawType(
DataTypeFactory typeFactory,
@Nullable Class<? extends TypeSerializer<?>> rawSerializer,
@Nullable Class<?> conversionClass) {
if (rawSerializer != null) {
return DataTypes.RAW(
(Class) createConversionClass(conversionClass),
instantiateRawSerializer(rawSerializer));
}
return typeFactory.createRawDataType(createConversionClass(conversionClass));
}
static Class<?> createConversionClass(@Nullable Class<?> conversionClass) {
if (conversionClass != null) {
return conversionClass;
}
return Object.class;
}
private static TypeSerializer<?> instantiateRawSerializer(
Class<? extends TypeSerializer<?>> rawSerializer) {
try {
return rawSerializer.newInstance();
} catch (Exception e) {
throw extractionError(
e,
"Cannot instantiate type serializer '%s' for RAW type. "
+ "Make sure the class is publicly accessible and has a default constructor.",
rawSerializer.getName());
}
}
/** Resolves a {@link TypeVariable} using the given type hierarchy if possible. */
static Type resolveVariable(List<Type> typeHierarchy, TypeVariable<?> variable) {
// iterate through hierarchy from top to bottom until type variable gets a non-variable
// assigned
for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
final Type currentType = typeHierarchy.get(i);
if (currentType instanceof ParameterizedType) {
final Type resolvedType =
resolveVariableInParameterizedType(
variable, (ParameterizedType) currentType);
if (resolvedType instanceof TypeVariable) {
// follow type variables transitively
variable = (TypeVariable<?>) resolvedType;
} else if (resolvedType != null) {
return resolvedType;
}
}
}
// unresolved variable
return variable;
}
private static @Nullable Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
// search for matching type variable
for (int paramPos = 0; paramPos < currentVariables.length; paramPos++) {
if (typeVariableEquals(variable, currentVariables[paramPos])) {
return currentType.getActualTypeArguments()[paramPos];
}
}
return null;
}
private static boolean typeVariableEquals(
TypeVariable<?> variable, TypeVariable<?> currentVariable) {
return currentVariable.getGenericDeclaration().equals(variable.getGenericDeclaration())
&& currentVariable.getName().equals(variable.getName());
}
/**
* Validates if a given type is not already contained in the type hierarchy of a structured
* type.
*
* <p>Otherwise this would lead to infinite data type extraction cycles.
*/
static void validateStructuredSelfReference(Type t, List<Type> typeHierarchy) {
final Class<?> clazz = toClass(t);
if (clazz != null
&& !clazz.isInterface()
&& clazz != Object.class
&& typeHierarchy.contains(t)) {
throw extractionError(
"Cyclic reference detected for class '%s'. Attributes of structured types must not "
+ "(transitively) reference the structured type itself.",
clazz.getName());
}
}
/** Returns the fields of a class for a {@link StructuredType}. */
static List<Field> collectStructuredFields(Class<?> clazz) {
final List<Field> fields = new ArrayList<>();
while (clazz != Object.class) {
final Field[] declaredFields = clazz.getDeclaredFields();
Stream.of(declaredFields)
.filter(
field -> {
final int m = field.getModifiers();
return !Modifier.isStatic(m) && !Modifier.isTransient(m);
})
.forEach(fields::add);
clazz = clazz.getSuperclass();
}
return fields;
}
/** Validates if a field is properly readable either directly or through a getter. */
static void validateStructuredFieldReadability(Class<?> clazz, Field field) {
// field is accessible
if (isStructuredFieldDirectlyReadable(field)) {
return;
}
// field needs a getter
if (!getStructuredFieldGetter(clazz, field).isPresent()) {
throw extractionError(
"Field '%s' of class '%s' is neither publicly accessible nor does it have "
+ "a corresponding getter method.",
field.getName(), clazz.getName());
}
}
/**
* Checks if a field is mutable or immutable. Returns {@code true} if the field is properly
* mutable. Returns {@code false} if it is properly immutable.
*/
static boolean isStructuredFieldMutable(Class<?> clazz, Field field) {
final int m = field.getModifiers();
// field is immutable
if (Modifier.isFinal(m)) {
return false;
}
// field is directly mutable
if (Modifier.isPublic(m)) {
return true;
}
// field has setters by which it is mutable
if (getStructuredFieldSetter(clazz, field).isPresent()) {
return true;
}
throw extractionError(
"Field '%s' of class '%s' is mutable but is neither publicly accessible nor does it have "
+ "a corresponding setter method.",
field.getName(), clazz.getName());
}
/** Returns the boxed type of a primitive type. */
static Type primitiveToWrapper(Type type) {
if (type instanceof Class) {
return primitiveToWrapper((Class<?>) type);
}
return type;
}
/** Collects all methods that qualify as methods of a {@link StructuredType}. */
static List<Method> collectStructuredMethods(Class<?> clazz) {
final List<Method> methods = new ArrayList<>();
while (clazz != Object.class) {
final Method[] declaredMethods = clazz.getDeclaredMethods();
Stream.of(declaredMethods)
.filter(
field -> {
final int m = field.getModifiers();
return Modifier.isPublic(m)
&& !Modifier.isNative(m)
&& !Modifier.isAbstract(m);
})
.forEach(methods::add);
clazz = clazz.getSuperclass();
}
return methods;
}
/**
* Collects all annotations of the given type defined in the current class or superclasses.
* Duplicates are ignored.
*/
static <T extends Annotation> Set<T> collectAnnotationsOfClass(
Class<T> annotation, Class<?> annotatedClass) {
final List<Class<?>> classHierarchy = new ArrayList<>();
Class<?> currentClass = annotatedClass;
while (currentClass != null) {
classHierarchy.add(currentClass);
currentClass = currentClass.getSuperclass();
}
// convert to top down
Collections.reverse(classHierarchy);
return classHierarchy.stream()
.flatMap(c -> Stream.of(c.getAnnotationsByType(annotation)))
.collect(Collectors.toCollection(LinkedHashSet::new));
}
/**
* Collects all annotations of the given type defined in the given method. Duplicates are
* ignored.
*/
static <T extends Annotation> Set<T> collectAnnotationsOfMethod(
Class<T> annotation, Method annotatedMethod) {
return new LinkedHashSet<>(Arrays.asList(annotatedMethod.getAnnotationsByType(annotation)));
}
// --------------------------------------------------------------------------------------------
// Parameter Extraction Utilities
// --------------------------------------------------------------------------------------------
/** Result of the extraction in {@link #extractAssigningConstructor(Class, List)}. */
static class AssigningConstructor {
public final Constructor<?> constructor;
public final List<String> parameterNames;
private AssigningConstructor(Constructor<?> constructor, List<String> parameterNames) {
this.constructor = constructor;
this.parameterNames = parameterNames;
}
}
/**
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
static @Nullable AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
final boolean qualifyingConstructor =
Modifier.isPublic(constructor.getModifiers())
&& constructor.getParameterTypes().length == fields.size();
if (!qualifyingConstructor) {
continue;
}
final List<String> parameterNames =
extractConstructorParameterNames(constructor, fields);
if (parameterNames != null) {
if (foundConstructor != null) {
throw extractionError(
"Multiple constructors found that assign all fields for class '%s'.",
clazz.getName());
}
foundConstructor = new AssigningConstructor(constructor, parameterNames);
}
}
return foundConstructor;
}
/** Extracts the parameter names of a method if possible. */
static @Nullable List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
/**
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive) type and name.
*/
private static @Nullable List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
List<String> parameterNames = extractExecutableNames(constructor);
if (parameterNames == null) {
return null;
}
final Map<String, Type> fieldMap =
fields.stream().collect(Collectors.toMap(Field::getName, Field::getGenericType));
// check that all fields are represented in the parameters of the constructor
for (int i = 0; i < parameterNames.size(); i++) {
final String parameterName = parameterNames.get(i);
final Type fieldType = fieldMap.get(parameterName); // might be null
final Type parameterType = parameterTypes[i];
// we are tolerant here because frameworks such as Avro accept a boxed type even though
// the field is primitive
if (!primitiveToWrapper(parameterType).equals(primitiveToWrapper(fieldType))) {
return null;
}
}
return parameterNames;
}
private static @Nullable List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
offset = 1;
} else {
offset = 0;
}
// by default parameter names are "arg0, arg1, arg2, ..." if compiler flag is not set
// so we need to extract them manually if possible
List<String> parameterNames =
Stream.of(executable.getParameters())
.map(Parameter::getName)
.collect(Collectors.toList());
if (parameterNames.stream().allMatch(n -> n.startsWith("arg"))) {
final ParameterExtractor extractor;
if (executable instanceof Constructor) {
extractor = new ParameterExtractor((Constructor<?>) executable);
} else {
extractor = new ParameterExtractor((Method) executable);
}
getClassReader(executable.getDeclaringClass()).accept(extractor, 0);
final List<String> extractedNames = extractor.getParameterNames();
if (extractedNames.size() == 0) {
return null;
}
// remove "this" and additional local variables
// select less names if class file has not the required information
parameterNames =
extractedNames.subList(
offset,
Math.min(
executable.getParameterCount() + offset,
extractedNames.size()));
}
if (parameterNames.size() != executable.getParameterCount()) {
return null;
}
return parameterNames;
}
private static ClassReader getClassReader(Class<?> cls) {
final String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
if(ClassPool.exist(cls.getName())){
return new ClassReader(ClassPool.get(cls.getName()).getClassByte());
}
try {
return new ClassReader(cls.getResourceAsStream(className));
} catch (IOException e) {
throw new IllegalStateException("Could not instantiate ClassReader.", e);
}
}
/**
* Extracts the parameter names and descriptors from a constructor or method. Assuming the
* existence of a local variable table.
*
* <p>For example:
*
* <pre>{@code
* public WC(java.lang.String arg0, long arg1) { // <init> //(Ljava/lang/String;J)V
* <localVar:index=0 , name=this , desc=Lorg/apache/flink/WC;, sig=null, start=L1, end=L2>
* <localVar:index=1 , name=word , desc=Ljava/lang/String;, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=frequency , desc=J, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=otherLocal , desc=J, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=otherLocal2 , desc=J, sig=null, start=L1, end=L2>
* }
* }</pre>
*/
private static class ParameterExtractor extends ClassVisitor {
private static final int OPCODE = Opcodes.ASM7;
private final String methodDescriptor;
private final List<String> parameterNames = new ArrayList<>();
ParameterExtractor(Constructor<?> constructor) {
super(OPCODE);
methodDescriptor = getConstructorDescriptor(constructor);
}
ParameterExtractor(Method method) {
super(OPCODE);
methodDescriptor = getMethodDescriptor(method);
}
List<String> getParameterNames() {
return parameterNames;
}
@Override
public MethodVisitor visitMethod(
int access, String name, String descriptor, String signature, String[] exceptions) {
if (descriptor.equals(methodDescriptor)) {
return new MethodVisitor(OPCODE) {
@Override
public void visitLocalVariable(
String name,
String descriptor,
String signature,
Label start,
Label end,
int index) {
parameterNames.add(name);
}
};
}
return super.visitMethod(access, name, descriptor, signature, exceptions);
}
}
// --------------------------------------------------------------------------------------------
// Class Assignment and Boxing
//
// copied from o.a.commons.lang3.ClassUtils (commons-lang3:3.3.2)
// --------------------------------------------------------------------------------------------
/**
* Checks if one {@code Class} can be assigned to a variable of another {@code Class}.
*
* <p>Unlike the {@link Class#isAssignableFrom(java.lang.Class)} method, this method takes into
* account widenings of primitive classes and {@code null}s.
*
* <p>Primitive widenings allow an int to be assigned to a long, float or double. This method
* returns the correct result for these cases.
*
* <p>{@code Null} may be assigned to any reference type. This method will return {@code true}
* if {@code null} is passed in and the toClass is non-primitive.
*
* <p>Specifically, this method tests whether the type represented by the specified {@code
* Class} parameter can be converted to the type represented by this {@code Class} object via an
* identity conversion widening primitive or widening reference conversion. See <em><a
* href="http://docs.oracle.com/javase/specs/">The Java Language Specification</a></em>,
* sections 5.1.1, 5.1.2 and 5.1.4 for details.
*
* @param cls the Class to check, may be null
* @param toClass the Class to try to assign into, returns false if null
* @param autoboxing whether to use implicit autoboxing/unboxing between primitives and wrappers
* @return {@code true} if assignment possible
*/
public static boolean isAssignable(
Class<?> cls, final Class<?> toClass, final boolean autoboxing) {
if (toClass == null) {
return false;
}
// have to check for null, as isAssignableFrom doesn't
if (cls == null) {
return !toClass.isPrimitive();
}
// autoboxing:
if (autoboxing) {
if (cls.isPrimitive() && !toClass.isPrimitive()) {
cls = primitiveToWrapper(cls);
if (cls == null) {
return false;
}
}
if (toClass.isPrimitive() && !cls.isPrimitive()) {
cls = wrapperToPrimitive(cls);
if (cls == null) {
return false;
}
}
}
if (cls.equals(toClass)) {
return true;
}
if (cls.isPrimitive()) {
if (!toClass.isPrimitive()) {
return false;
}
if (Integer.TYPE.equals(cls)) {
return Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Long.TYPE.equals(cls)) {
return Float.TYPE.equals(toClass) || Double.TYPE.equals(toClass);
}
if (Boolean.TYPE.equals(cls)) {
return false;
}
if (Double.TYPE.equals(cls)) {
return false;
}
if (Float.TYPE.equals(cls)) {
return Double.TYPE.equals(toClass);
}
if (Character.TYPE.equals(cls)) {
return Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Short.TYPE.equals(cls)) {
return Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Byte.TYPE.equals(cls)) {
return Short.TYPE.equals(toClass)
|| Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
// should never get here
return false;
}
return toClass.isAssignableFrom(cls);
}
/** Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */
private static final Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
static {
primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
primitiveWrapperMap.put(Byte.TYPE, Byte.class);
primitiveWrapperMap.put(Character.TYPE, Character.class);
primitiveWrapperMap.put(Short.TYPE, Short.class);
primitiveWrapperMap.put(Integer.TYPE, Integer.class);
primitiveWrapperMap.put(Long.TYPE, Long.class);
primitiveWrapperMap.put(Double.TYPE, Double.class);
primitiveWrapperMap.put(Float.TYPE, Float.class);
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
/** Maps wrapper {@code Class}es to their corresponding primitive types. */
private static final Map<Class<?>, Class<?>> wrapperPrimitiveMap = new HashMap<>();
static {
for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
if (!primitiveClass.equals(wrapperClass)) {
wrapperPrimitiveMap.put(wrapperClass, primitiveClass);
}
}
}
/**
* Converts the specified primitive Class object to its corresponding wrapper Class object.
*
* <p>NOTE: From v2.2, this method handles {@code Void.TYPE}, returning {@code Void.TYPE}.
*
* @param cls the class to convert, may be null
* @return the wrapper class for {@code cls} or {@code cls} if {@code cls} is not a primitive.
* {@code null} if null input.
* @since 2.1
*/
public static Class<?> primitiveToWrapper(final Class<?> cls) {
Class<?> convertedClass = cls;
if (cls != null && cls.isPrimitive()) {
convertedClass = primitiveWrapperMap.get(cls);
}
return convertedClass;
}
/**
* Converts the specified wrapper class to its corresponding primitive class.
*
* <p>This method is the counter part of {@code primitiveToWrapper()}. If the passed in class is
* a wrapper class for a primitive type, this primitive type will be returned (e.g. {@code
* Integer.TYPE} for {@code Integer.class}). For other classes, or if the parameter is
* <b>null</b>, the return value is <b>null</b>.
*
* @param cls the class to convert, may be <b>null</b>
* @return the corresponding primitive type if {@code cls} is a wrapper class, <b>null</b>
* otherwise
* @see #primitiveToWrapper(Class)
* @since 2.4
*/
public static Class<?> wrapperToPrimitive(final Class<?> cls) {
return wrapperPrimitiveMap.get(cls);
}
// --------------------------------------------------------------------------------------------
private ExtractionUtils() {
// no instantiation
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.types.extraction;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
/** Utilities for performing reflection tasks. */
@Internal
public final class ExtractionUtils {
// --------------------------------------------------------------------------------------------
// Methods shared across packages
// --------------------------------------------------------------------------------------------
/** Collects methods of the given name. */
public static List<Method> collectMethods(Class<?> function, String methodName) {
return Arrays.stream(function.getMethods())
.filter(method -> method.getName().equals(methodName))
.sorted(Comparator.comparing(Method::toString)) // for deterministic order
.collect(Collectors.toList());
}
/**
* Checks whether a method/constructor can be called with the given argument classes. This
* includes type widening and vararg. {@code null} is a wildcard.
*
* <p>E.g., {@code (int.class, int.class)} matches {@code f(Object...), f(int, int), f(Integer,
* Object)} and so forth.
*/
public static boolean isInvokable(Executable executable, Class<?>... classes) {
final int m = executable.getModifiers();
if (!Modifier.isPublic(m)) {
return false;
}
final int paramCount = executable.getParameterCount();
final int classCount = classes.length;
// check for enough classes for each parameter
if ((!executable.isVarArgs() && classCount != paramCount)
|| (executable.isVarArgs() && classCount < paramCount - 1)) {
return false;
}
int currentClass = 0;
for (int currentParam = 0; currentParam < paramCount; currentParam++) {
final Class<?> param = executable.getParameterTypes()[currentParam];
// last parameter is a vararg that needs to consume remaining classes
if (currentParam == paramCount - 1 && executable.isVarArgs()) {
final Class<?> paramComponent =
executable.getParameterTypes()[currentParam].getComponentType();
// we have more than 1 classes left so the vararg needs to consume them all
if (classCount - currentClass > 1) {
while (currentClass < classCount
&& ExtractionUtils.isAssignable(
classes[currentClass], paramComponent, true)) {
currentClass++;
}
} else if (currentClass < classCount
&& (parameterMatches(classes[currentClass], param)
|| parameterMatches(classes[currentClass], paramComponent))) {
currentClass++;
}
}
// entire parameter matches
else if (parameterMatches(classes[currentClass], param)) {
currentClass++;
}
}
// check if all classes have been consumed
return currentClass == classCount;
}
private static boolean parameterMatches(Class<?> clz, Class<?> param) {
return clz == null || ExtractionUtils.isAssignable(clz, param, true);
}
/** Creates a method signature string like {@code int eval(Integer, String)}. */
public static String createMethodSignatureString(
String methodName, Class<?>[] parameters, @Nullable Class<?> returnType) {
final StringBuilder builder = new StringBuilder();
if (returnType != null) {
builder.append(returnType.getCanonicalName()).append(" ");
}
builder.append(methodName)
.append(
Stream.of(parameters)
.map(
parameter -> {
// in case we don't know the parameter at this location
// (i.e. for accumulators)
if (parameter == null) {
return "_";
} else {
return parameter.getCanonicalName();
}
})
.collect(Collectors.joining(", ", "(", ")")));
return builder.toString();
}
/**
* Validates the characteristics of a class for a {@link StructuredType} such as accessibility.
*/
public static void validateStructuredClass(Class<?> clazz) {
final int m = clazz.getModifiers();
if (Modifier.isAbstract(m)) {
throw extractionError("Class '%s' must not be abstract.", clazz.getName());
}
if (!Modifier.isPublic(m)) {
throw extractionError("Class '%s' is not public.", clazz.getName());
}
if (clazz.getEnclosingClass() != null
&& (clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) {
throw extractionError(
"Class '%s' is a not a static, globally accessible class.", clazz.getName());
}
}
/**
* Returns the field of a structured type. The logic is as broad as possible to support both
* Java and Scala in different flavors.
*/
public static Field getStructuredField(Class<?> clazz, String fieldName) {
final String normalizedFieldName = fieldName.toUpperCase();
final List<Field> fields = collectStructuredFields(clazz);
for (Field field : fields) {
if (field.getName().toUpperCase().equals(normalizedFieldName)) {
return field;
}
}
throw extractionError(
"Could not find a field named '%s' in class '%s' for structured type.",
fieldName, clazz.getName());
}
/**
* Checks for a field getter of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
*/
public static Optional<Method> getStructuredFieldGetter(Class<?> clazz, Field field) {
final String normalizedFieldName = normalizeAccessorName(field.getName());
final List<Method> methods = collectStructuredMethods(clazz);
for (Method method : methods) {
// check name:
// get<Name>()
// is<Name>()
// <Name>() for Scala
final String normalizedMethodName = normalizeAccessorName(method.getName());
final boolean hasName =
normalizedMethodName.equals("GET" + normalizedFieldName)
|| normalizedMethodName.equals("IS" + normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName);
if (!hasName) {
continue;
}
// check return type:
// equal to field type
final Type returnType = method.getGenericReturnType();
final boolean hasReturnType = returnType.equals(field.getGenericType());
if (!hasReturnType) {
continue;
}
// check parameters:
// no parameters
final boolean hasNoParameters = method.getParameterCount() == 0;
if (!hasNoParameters) {
continue;
}
// matching getter found
return Optional.of(method);
}
// no getter found
return Optional.empty();
}
/**
* Checks for a field setters of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
*/
public static Optional<Method> getStructuredFieldSetter(Class<?> clazz, Field field) {
final String normalizedFieldName = normalizeAccessorName(field.getName());
final List<Method> methods = collectStructuredMethods(clazz);
for (Method method : methods) {
// check name:
// set<Name>(type)
// <Name>(type)
// <Name>_$eq(type) for Scala
final String normalizedMethodName = normalizeAccessorName(method.getName());
final boolean hasName =
normalizedMethodName.equals("SET" + normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName + "$EQ");
if (!hasName) {
continue;
}
// check return type:
// void or the declaring class
final Class<?> returnType = method.getReturnType();
final boolean hasReturnType = returnType == Void.TYPE || returnType == clazz;
if (!hasReturnType) {
continue;
}
// check parameters:
// one parameter that has the same (or primitive) type of the field
final boolean hasParameter =
method.getParameterCount() == 1
&& (method.getGenericParameterTypes()[0].equals(field.getGenericType())
|| primitiveToWrapper(method.getGenericParameterTypes()[0])
.equals(field.getGenericType()));
if (!hasParameter) {
continue;
}
// matching setter found
return Optional.of(method);
}
// no setter found
return Optional.empty();
}
private static String normalizeAccessorName(String name) {
return name.toUpperCase().replaceAll(Pattern.quote("_"), "");
}
/**
* Checks for an invokable constructor matching the given arguments.
*
* @see #isInvokable(Executable, Class[])
*/
public static boolean hasInvokableConstructor(Class<?> clazz, Class<?>... classes) {
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
if (isInvokable(constructor, classes)) {
return true;
}
}
return false;
}
/** Checks whether a field is directly readable without a getter. */
public static boolean isStructuredFieldDirectlyReadable(Field field) {
final int m = field.getModifiers();
// field is directly readable
return Modifier.isPublic(m);
}
/** Checks whether a field is directly writable without a setter or constructor. */
public static boolean isStructuredFieldDirectlyWritable(Field field) {
final int m = field.getModifiers();
// field is immutable
if (Modifier.isFinal(m)) {
return false;
}
// field is directly writable
return Modifier.isPublic(m);
}
/**
* A minimal version to extract a generic parameter from a given class.
*
* <p>This method should only be used for very specific use cases, in most cases {@link
* DataTypeExtractor#extractFromGeneric(DataTypeFactory, Class, int, Type)} should be more
* appropriate.
*/
public static Optional<Class<?>> extractSimpleGeneric(
Class<?> baseClass, Class<?> clazz, int pos) {
try {
if (clazz.getSuperclass() != baseClass) {
return Optional.empty();
}
final Type t =
((ParameterizedType) clazz.getGenericSuperclass())
.getActualTypeArguments()[pos];
return Optional.ofNullable(toClass(t));
} catch (Exception unused) {
return Optional.empty();
}
}
// --------------------------------------------------------------------------------------------
// Methods intended for this package
// --------------------------------------------------------------------------------------------
/** Helper method for creating consistent exceptions during extraction. */
static ValidationException extractionError(String message, Object... args) {
return extractionError(null, message, args);
}
/** Helper method for creating consistent exceptions during extraction. */
static ValidationException extractionError(Throwable cause, String message, Object... args) {
return new ValidationException(String.format(message, args), cause);
}
/**
* Collects the partially ordered type hierarchy (i.e. all involved super classes and super
* interfaces) of the given type.
*/
static List<Type> collectTypeHierarchy(Type type) {
Type currentType = type;
Class<?> currentClass = toClass(type);
final List<Type> typeHierarchy = new ArrayList<>();
while (currentClass != null) {
// collect type
typeHierarchy.add(currentType);
// collect super interfaces
for (Type genericInterface : currentClass.getGenericInterfaces()) {
final Class<?> interfaceClass = toClass(genericInterface);
if (interfaceClass != null) {
typeHierarchy.addAll(collectTypeHierarchy(genericInterface));
}
}
currentType = currentClass.getGenericSuperclass();
currentClass = toClass(currentType);
}
return typeHierarchy;
}
/** Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise. */
static @Nullable Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
// this is always a class
return (Class<?>) ((ParameterizedType) type).getRawType();
}
// unsupported: generic arrays, type variables, wildcard types
return null;
}
/** Creates a raw data type. */
@SuppressWarnings({"unchecked", "rawtypes"})
static DataType createRawType(
DataTypeFactory typeFactory,
@Nullable Class<? extends TypeSerializer<?>> rawSerializer,
@Nullable Class<?> conversionClass) {
if (rawSerializer != null) {
return DataTypes.RAW(
(Class) createConversionClass(conversionClass),
instantiateRawSerializer(rawSerializer));
}
return typeFactory.createRawDataType(createConversionClass(conversionClass));
}
static Class<?> createConversionClass(@Nullable Class<?> conversionClass) {
if (conversionClass != null) {
return conversionClass;
}
return Object.class;
}
private static TypeSerializer<?> instantiateRawSerializer(
Class<? extends TypeSerializer<?>> rawSerializer) {
try {
return rawSerializer.newInstance();
} catch (Exception e) {
throw extractionError(
e,
"Cannot instantiate type serializer '%s' for RAW type. "
+ "Make sure the class is publicly accessible and has a default constructor.",
rawSerializer.getName());
}
}
/** Resolves a {@link TypeVariable} using the given type hierarchy if possible. */
static Type resolveVariable(List<Type> typeHierarchy, TypeVariable<?> variable) {
// iterate through hierarchy from top to bottom until type variable gets a non-variable
// assigned
for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
final Type currentType = typeHierarchy.get(i);
if (currentType instanceof ParameterizedType) {
final Type resolvedType =
resolveVariableInParameterizedType(
variable, (ParameterizedType) currentType);
if (resolvedType instanceof TypeVariable) {
// follow type variables transitively
variable = (TypeVariable<?>) resolvedType;
} else if (resolvedType != null) {
return resolvedType;
}
}
}
// unresolved variable
return variable;
}
private static @Nullable Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
// search for matching type variable
for (int paramPos = 0; paramPos < currentVariables.length; paramPos++) {
if (typeVariableEquals(variable, currentVariables[paramPos])) {
return currentType.getActualTypeArguments()[paramPos];
}
}
return null;
}
private static boolean typeVariableEquals(
TypeVariable<?> variable, TypeVariable<?> currentVariable) {
return currentVariable.getGenericDeclaration().equals(variable.getGenericDeclaration())
&& currentVariable.getName().equals(variable.getName());
}
/**
* Validates if a given type is not already contained in the type hierarchy of a structured
* type.
*
* <p>Otherwise this would lead to infinite data type extraction cycles.
*/
static void validateStructuredSelfReference(Type t, List<Type> typeHierarchy) {
final Class<?> clazz = toClass(t);
if (clazz != null
&& !clazz.isInterface()
&& clazz != Object.class
&& typeHierarchy.contains(t)) {
throw extractionError(
"Cyclic reference detected for class '%s'. Attributes of structured types must not "
+ "(transitively) reference the structured type itself.",
clazz.getName());
}
}
/** Returns the fields of a class for a {@link StructuredType}. */
static List<Field> collectStructuredFields(Class<?> clazz) {
final List<Field> fields = new ArrayList<>();
while (clazz != Object.class) {
final Field[] declaredFields = clazz.getDeclaredFields();
Stream.of(declaredFields)
.filter(
field -> {
final int m = field.getModifiers();
return !Modifier.isStatic(m) && !Modifier.isTransient(m);
})
.forEach(fields::add);
clazz = clazz.getSuperclass();
}
return fields;
}
/** Validates if a field is properly readable either directly or through a getter. */
static void validateStructuredFieldReadability(Class<?> clazz, Field field) {
// field is accessible
if (isStructuredFieldDirectlyReadable(field)) {
return;
}
// field needs a getter
if (!getStructuredFieldGetter(clazz, field).isPresent()) {
throw extractionError(
"Field '%s' of class '%s' is neither publicly accessible nor does it have "
+ "a corresponding getter method.",
field.getName(), clazz.getName());
}
}
/**
* Checks if a field is mutable or immutable. Returns {@code true} if the field is properly
* mutable. Returns {@code false} if it is properly immutable.
*/
static boolean isStructuredFieldMutable(Class<?> clazz, Field field) {
final int m = field.getModifiers();
// field is immutable
if (Modifier.isFinal(m)) {
return false;
}
// field is directly mutable
if (Modifier.isPublic(m)) {
return true;
}
// field has setters by which it is mutable
if (getStructuredFieldSetter(clazz, field).isPresent()) {
return true;
}
throw extractionError(
"Field '%s' of class '%s' is mutable but is neither publicly accessible nor does it have "
+ "a corresponding setter method.",
field.getName(), clazz.getName());
}
/** Returns the boxed type of a primitive type. */
static Type primitiveToWrapper(Type type) {
if (type instanceof Class) {
return primitiveToWrapper((Class<?>) type);
}
return type;
}
/** Collects all methods that qualify as methods of a {@link StructuredType}. */
static List<Method> collectStructuredMethods(Class<?> clazz) {
final List<Method> methods = new ArrayList<>();
while (clazz != Object.class) {
final Method[] declaredMethods = clazz.getDeclaredMethods();
Stream.of(declaredMethods)
.filter(
field -> {
final int m = field.getModifiers();
return Modifier.isPublic(m)
&& !Modifier.isNative(m)
&& !Modifier.isAbstract(m);
})
.forEach(methods::add);
clazz = clazz.getSuperclass();
}
return methods;
}
/**
* Collects all annotations of the given type defined in the current class or superclasses.
* Duplicates are ignored.
*/
static <T extends Annotation> Set<T> collectAnnotationsOfClass(
Class<T> annotation, Class<?> annotatedClass) {
final List<Class<?>> classHierarchy = new ArrayList<>();
Class<?> currentClass = annotatedClass;
while (currentClass != null) {
classHierarchy.add(currentClass);
currentClass = currentClass.getSuperclass();
}
// convert to top down
Collections.reverse(classHierarchy);
return classHierarchy.stream()
.flatMap(c -> Stream.of(c.getAnnotationsByType(annotation)))
.collect(Collectors.toCollection(LinkedHashSet::new));
}
/**
* Collects all annotations of the given type defined in the given method. Duplicates are
* ignored.
*/
static <T extends Annotation> Set<T> collectAnnotationsOfMethod(
Class<T> annotation, Method annotatedMethod) {
return new LinkedHashSet<>(Arrays.asList(annotatedMethod.getAnnotationsByType(annotation)));
}
// --------------------------------------------------------------------------------------------
// Parameter Extraction Utilities
// --------------------------------------------------------------------------------------------
/** Result of the extraction in {@link #extractAssigningConstructor(Class, List)}. */
public static class AssigningConstructor {
public final Constructor<?> constructor;
public final List<String> parameterNames;
private AssigningConstructor(Constructor<?> constructor, List<String> parameterNames) {
this.constructor = constructor;
this.parameterNames = parameterNames;
}
}
/**
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
public static @Nullable AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
final boolean qualifyingConstructor =
Modifier.isPublic(constructor.getModifiers())
&& constructor.getParameterTypes().length == fields.size();
if (!qualifyingConstructor) {
continue;
}
final List<String> parameterNames =
extractConstructorParameterNames(constructor, fields);
if (parameterNames != null) {
if (foundConstructor != null) {
throw extractionError(
"Multiple constructors found that assign all fields for class '%s'.",
clazz.getName());
}
foundConstructor = new AssigningConstructor(constructor, parameterNames);
}
}
return foundConstructor;
}
/** Extracts the parameter names of a method if possible. */
static @Nullable List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
/**
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive and lenient) type and name.
*/
private static @Nullable List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
List<String> parameterNames = extractExecutableNames(constructor);
if (parameterNames == null) {
return null;
}
final Map<String, Field> fieldMap =
fields.stream()
.collect(
Collectors.toMap(
f -> normalizeAccessorName(f.getName()),
Function.identity()));
// check that all fields are represented in the parameters of the constructor
final List<String> fieldNames = new ArrayList<>();
for (int i = 0; i < parameterNames.size(); i++) {
final String parameterName = normalizeAccessorName(parameterNames.get(i));
final Field field = fieldMap.get(parameterName);
if (field == null) {
return null;
}
final Type fieldType = field.getGenericType();
final Type parameterType = parameterTypes[i];
// we are tolerant here because frameworks such as Avro accept a boxed type even though
// the field is primitive
if (!primitiveToWrapper(parameterType).equals(primitiveToWrapper(fieldType))) {
return null;
}
fieldNames.add(field.getName());
}
return fieldNames;
}
private static @Nullable List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
offset = 1;
} else {
offset = 0;
}
// by default parameter names are "arg0, arg1, arg2, ..." if compiler flag is not set
// so we need to extract them manually if possible
List<String> parameterNames =
Stream.of(executable.getParameters())
.map(Parameter::getName)
.collect(Collectors.toList());
if (parameterNames.stream().allMatch(n -> n.startsWith("arg"))) {
final ParameterExtractor extractor;
if (executable instanceof Constructor) {
extractor = new ParameterExtractor((Constructor<?>) executable);
} else {
extractor = new ParameterExtractor((Method) executable);
}
getClassReader(executable.getDeclaringClass()).accept(extractor, 0);
final List<String> extractedNames = extractor.getParameterNames();
if (extractedNames.size() == 0) {
return null;
}
// remove "this" and additional local variables
// select less names if class file has not the required information
parameterNames =
extractedNames.subList(
offset,
Math.min(
executable.getParameterCount() + offset,
extractedNames.size()));
}
if (parameterNames.size() != executable.getParameterCount()) {
return null;
}
return parameterNames;
}
private static ClassReader getClassReader(Class<?> cls) {
final String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
if(ClassPool.exist(cls.getName())){
return new ClassReader(ClassPool.get(cls.getName()).getClassByte());
}
try {
return new ClassReader(cls.getResourceAsStream(className));
} catch (IOException e) {
throw new IllegalStateException("Could not instantiate ClassReader.", e);
}
}
/**
* Extracts the parameter names and descriptors from a constructor or method. Assuming the
* existence of a local variable table.
*
* <p>For example:
*
* <pre>{@code
* public WC(java.lang.String arg0, long arg1) { // <init> //(Ljava/lang/String;J)V
* <localVar:index=0 , name=this , desc=Lorg/apache/flink/WC;, sig=null, start=L1, end=L2>
* <localVar:index=1 , name=word , desc=Ljava/lang/String;, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=frequency , desc=J, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=otherLocal , desc=J, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=otherLocal2 , desc=J, sig=null, start=L1, end=L2>
* }
* }</pre>
*/
private static class ParameterExtractor extends ClassVisitor {
private static final int OPCODE = Opcodes.ASM7;
private final String methodDescriptor;
private final List<String> parameterNames = new ArrayList<>();
ParameterExtractor(Constructor<?> constructor) {
super(OPCODE);
methodDescriptor = getConstructorDescriptor(constructor);
}
ParameterExtractor(Method method) {
super(OPCODE);
methodDescriptor = getMethodDescriptor(method);
}
List<String> getParameterNames() {
return parameterNames;
}
@Override
public MethodVisitor visitMethod(
int access, String name, String descriptor, String signature, String[] exceptions) {
if (descriptor.equals(methodDescriptor)) {
return new MethodVisitor(OPCODE) {
@Override
public void visitLocalVariable(
String name,
String descriptor,
String signature,
Label start,
Label end,
int index) {
parameterNames.add(name);
}
};
}
return super.visitMethod(access, name, descriptor, signature, exceptions);
}
}
// --------------------------------------------------------------------------------------------
// Class Assignment and Boxing
//
// copied from o.a.commons.lang3.ClassUtils (commons-lang3:3.3.2)
// --------------------------------------------------------------------------------------------
/**
* Checks if one {@code Class} can be assigned to a variable of another {@code Class}.
*
* <p>Unlike the {@link Class#isAssignableFrom(java.lang.Class)} method, this method takes into
* account widenings of primitive classes and {@code null}s.
*
* <p>Primitive widenings allow an int to be assigned to a long, float or double. This method
* returns the correct result for these cases.
*
* <p>{@code Null} may be assigned to any reference type. This method will return {@code true}
* if {@code null} is passed in and the toClass is non-primitive.
*
* <p>Specifically, this method tests whether the type represented by the specified {@code
* Class} parameter can be converted to the type represented by this {@code Class} object via an
* identity conversion widening primitive or widening reference conversion. See <em><a
* href="http://docs.oracle.com/javase/specs/">The Java Language Specification</a></em>,
* sections 5.1.1, 5.1.2 and 5.1.4 for details.
*
* @param cls the Class to check, may be null
* @param toClass the Class to try to assign into, returns false if null
* @param autoboxing whether to use implicit autoboxing/unboxing between primitives and wrappers
* @return {@code true} if assignment possible
*/
public static boolean isAssignable(
Class<?> cls, final Class<?> toClass, final boolean autoboxing) {
if (toClass == null) {
return false;
}
// have to check for null, as isAssignableFrom doesn't
if (cls == null) {
return !toClass.isPrimitive();
}
// autoboxing:
if (autoboxing) {
if (cls.isPrimitive() && !toClass.isPrimitive()) {
cls = primitiveToWrapper(cls);
if (cls == null) {
return false;
}
}
if (toClass.isPrimitive() && !cls.isPrimitive()) {
cls = wrapperToPrimitive(cls);
if (cls == null) {
return false;
}
}
}
if (cls.equals(toClass)) {
return true;
}
if (cls.isPrimitive()) {
if (!toClass.isPrimitive()) {
return false;
}
if (Integer.TYPE.equals(cls)) {
return Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Long.TYPE.equals(cls)) {
return Float.TYPE.equals(toClass) || Double.TYPE.equals(toClass);
}
if (Boolean.TYPE.equals(cls)) {
return false;
}
if (Double.TYPE.equals(cls)) {
return false;
}
if (Float.TYPE.equals(cls)) {
return Double.TYPE.equals(toClass);
}
if (Character.TYPE.equals(cls)) {
return Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Short.TYPE.equals(cls)) {
return Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Byte.TYPE.equals(cls)) {
return Short.TYPE.equals(toClass)
|| Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
// should never get here
return false;
}
return toClass.isAssignableFrom(cls);
}
/** Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */
private static final Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
static {
primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
primitiveWrapperMap.put(Byte.TYPE, Byte.class);
primitiveWrapperMap.put(Character.TYPE, Character.class);
primitiveWrapperMap.put(Short.TYPE, Short.class);
primitiveWrapperMap.put(Integer.TYPE, Integer.class);
primitiveWrapperMap.put(Long.TYPE, Long.class);
primitiveWrapperMap.put(Double.TYPE, Double.class);
primitiveWrapperMap.put(Float.TYPE, Float.class);
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
/** Maps wrapper {@code Class}es to their corresponding primitive types. */
private static final Map<Class<?>, Class<?>> wrapperPrimitiveMap = new HashMap<>();
static {
for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
if (!primitiveClass.equals(wrapperClass)) {
wrapperPrimitiveMap.put(wrapperClass, primitiveClass);
}
}
}
/**
* Converts the specified primitive Class object to its corresponding wrapper Class object.
*
* <p>NOTE: From v2.2, this method handles {@code Void.TYPE}, returning {@code Void.TYPE}.
*
* @param cls the class to convert, may be null
* @return the wrapper class for {@code cls} or {@code cls} if {@code cls} is not a primitive.
* {@code null} if null input.
* @since 2.1
*/
public static Class<?> primitiveToWrapper(final Class<?> cls) {
Class<?> convertedClass = cls;
if (cls != null && cls.isPrimitive()) {
convertedClass = primitiveWrapperMap.get(cls);
}
return convertedClass;
}
/**
* Converts the specified wrapper class to its corresponding primitive class.
*
* <p>This method is the counter part of {@code primitiveToWrapper()}. If the passed in class is
* a wrapper class for a primitive type, this primitive type will be returned (e.g. {@code
* Integer.TYPE} for {@code Integer.class}). For other classes, or if the parameter is
* <b>null</b>, the return value is <b>null</b>.
*
* @param cls the class to convert, may be <b>null</b>
* @return the corresponding primitive type if {@code cls} is a wrapper class, <b>null</b>
* otherwise
* @see #primitiveToWrapper(Class)
* @since 2.4
*/
public static Class<?> wrapperToPrimitive(final Class<?> cls) {
return wrapperPrimitiveMap.get(cls);
}
// --------------------------------------------------------------------------------------------
private ExtractionUtils() {
// no instantiation
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.types.extraction;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
/** Utilities for performing reflection tasks. */
@Internal
public final class ExtractionUtils {
// --------------------------------------------------------------------------------------------
// Methods shared across packages
// --------------------------------------------------------------------------------------------
/** Collects methods of the given name. */
public static List<Method> collectMethods(Class<?> function, String methodName) {
return Arrays.stream(function.getMethods())
.filter(method -> method.getName().equals(methodName))
.sorted(Comparator.comparing(Method::toString)) // for deterministic order
.collect(Collectors.toList());
}
/**
* Checks whether a method/constructor can be called with the given argument classes. This
* includes type widening and vararg. {@code null} is a wildcard.
*
* <p>E.g., {@code (int.class, int.class)} matches {@code f(Object...), f(int, int), f(Integer,
* Object)} and so forth.
*/
public static boolean isInvokable(Executable executable, Class<?>... classes) {
final int m = executable.getModifiers();
if (!Modifier.isPublic(m)) {
return false;
}
final int paramCount = executable.getParameterCount();
final int classCount = classes.length;
// check for enough classes for each parameter
if ((!executable.isVarArgs() && classCount != paramCount)
|| (executable.isVarArgs() && classCount < paramCount - 1)) {
return false;
}
int currentClass = 0;
for (int currentParam = 0; currentParam < paramCount; currentParam++) {
final Class<?> param = executable.getParameterTypes()[currentParam];
// last parameter is a vararg that needs to consume remaining classes
if (currentParam == paramCount - 1 && executable.isVarArgs()) {
final Class<?> paramComponent =
executable.getParameterTypes()[currentParam].getComponentType();
// we have more than 1 classes left so the vararg needs to consume them all
if (classCount - currentClass > 1) {
while (currentClass < classCount
&& ExtractionUtils.isAssignable(
classes[currentClass], paramComponent, true)) {
currentClass++;
}
} else if (currentClass < classCount
&& (parameterMatches(classes[currentClass], param)
|| parameterMatches(classes[currentClass], paramComponent))) {
currentClass++;
}
}
// entire parameter matches
else if (parameterMatches(classes[currentClass], param)) {
currentClass++;
}
}
// check if all classes have been consumed
return currentClass == classCount;
}
private static boolean parameterMatches(Class<?> clz, Class<?> param) {
return clz == null || ExtractionUtils.isAssignable(clz, param, true);
}
/** Creates a method signature string like {@code int eval(Integer, String)}. */
public static String createMethodSignatureString(
String methodName, Class<?>[] parameters, @Nullable Class<?> returnType) {
final StringBuilder builder = new StringBuilder();
if (returnType != null) {
builder.append(returnType.getCanonicalName()).append(" ");
}
builder.append(methodName)
.append(
Stream.of(parameters)
.map(
parameter -> {
// in case we don't know the parameter at this location
// (i.e. for accumulators)
if (parameter == null) {
return "_";
} else {
return parameter.getCanonicalName();
}
})
.collect(Collectors.joining(", ", "(", ")")));
return builder.toString();
}
/**
* Validates the characteristics of a class for a {@link StructuredType} such as accessibility.
*/
public static void validateStructuredClass(Class<?> clazz) {
final int m = clazz.getModifiers();
if (Modifier.isAbstract(m)) {
throw extractionError("Class '%s' must not be abstract.", clazz.getName());
}
if (!Modifier.isPublic(m)) {
throw extractionError("Class '%s' is not public.", clazz.getName());
}
if (clazz.getEnclosingClass() != null
&& (clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) {
throw extractionError(
"Class '%s' is a not a static, globally accessible class.", clazz.getName());
}
}
/**
* Returns the field of a structured type. The logic is as broad as possible to support both
* Java and Scala in different flavors.
*/
public static Field getStructuredField(Class<?> clazz, String fieldName) {
final String normalizedFieldName = fieldName.toUpperCase();
final List<Field> fields = collectStructuredFields(clazz);
for (Field field : fields) {
if (field.getName().toUpperCase().equals(normalizedFieldName)) {
return field;
}
}
throw extractionError(
"Could not find a field named '%s' in class '%s' for structured type.",
fieldName, clazz.getName());
}
/**
* Checks for a field getter of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
*/
public static Optional<Method> getStructuredFieldGetter(Class<?> clazz, Field field) {
final String normalizedFieldName = normalizeAccessorName(field.getName());
final List<Method> methods = collectStructuredMethods(clazz);
for (Method method : methods) {
// check name:
// get<Name>()
// is<Name>()
// <Name>() for Scala
final String normalizedMethodName = normalizeAccessorName(method.getName());
final boolean hasName =
normalizedMethodName.equals("GET" + normalizedFieldName)
|| normalizedMethodName.equals("IS" + normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName);
if (!hasName) {
continue;
}
// check return type:
// equal to field type
final Type returnType = method.getGenericReturnType();
final boolean hasReturnType = returnType.equals(field.getGenericType());
if (!hasReturnType) {
continue;
}
// check parameters:
// no parameters
final boolean hasNoParameters = method.getParameterCount() == 0;
if (!hasNoParameters) {
continue;
}
// matching getter found
return Optional.of(method);
}
// no getter found
return Optional.empty();
}
/**
* Checks for a field setters of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
*/
public static Optional<Method> getStructuredFieldSetter(Class<?> clazz, Field field) {
final String normalizedFieldName = normalizeAccessorName(field.getName());
final List<Method> methods = collectStructuredMethods(clazz);
for (Method method : methods) {
// check name:
// set<Name>(type)
// <Name>(type)
// <Name>_$eq(type) for Scala
final String normalizedMethodName = normalizeAccessorName(method.getName());
final boolean hasName =
normalizedMethodName.equals("SET" + normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName)
|| normalizedMethodName.equals(normalizedFieldName + "$EQ");
if (!hasName) {
continue;
}
// check return type:
// void or the declaring class
final Class<?> returnType = method.getReturnType();
final boolean hasReturnType = returnType == Void.TYPE || returnType == clazz;
if (!hasReturnType) {
continue;
}
// check parameters:
// one parameter that has the same (or primitive) type of the field
final boolean hasParameter =
method.getParameterCount() == 1
&& (method.getGenericParameterTypes()[0].equals(field.getGenericType())
|| primitiveToWrapper(method.getGenericParameterTypes()[0])
.equals(field.getGenericType()));
if (!hasParameter) {
continue;
}
// matching setter found
return Optional.of(method);
}
// no setter found
return Optional.empty();
}
private static String normalizeAccessorName(String name) {
return name.toUpperCase().replaceAll(Pattern.quote("_"), "");
}
/**
* Checks for an invokable constructor matching the given arguments.
*
* @see #isInvokable(Executable, Class[])
*/
public static boolean hasInvokableConstructor(Class<?> clazz, Class<?>... classes) {
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
if (isInvokable(constructor, classes)) {
return true;
}
}
return false;
}
/** Checks whether a field is directly readable without a getter. */
public static boolean isStructuredFieldDirectlyReadable(Field field) {
final int m = field.getModifiers();
// field is directly readable
return Modifier.isPublic(m);
}
/** Checks whether a field is directly writable without a setter or constructor. */
public static boolean isStructuredFieldDirectlyWritable(Field field) {
final int m = field.getModifiers();
// field is immutable
if (Modifier.isFinal(m)) {
return false;
}
// field is directly writable
return Modifier.isPublic(m);
}
/**
* A minimal version to extract a generic parameter from a given class.
*
* <p>This method should only be used for very specific use cases, in most cases {@link
* DataTypeExtractor#extractFromGeneric(DataTypeFactory, Class, int, Type)} should be more
* appropriate.
*/
public static Optional<Class<?>> extractSimpleGeneric(
Class<?> baseClass, Class<?> clazz, int pos) {
try {
if (clazz.getSuperclass() != baseClass) {
return Optional.empty();
}
final Type t =
((ParameterizedType) clazz.getGenericSuperclass())
.getActualTypeArguments()[pos];
return Optional.ofNullable(toClass(t));
} catch (Exception unused) {
return Optional.empty();
}
}
// --------------------------------------------------------------------------------------------
// Methods intended for this package
// --------------------------------------------------------------------------------------------
/** Helper method for creating consistent exceptions during extraction. */
static ValidationException extractionError(String message, Object... args) {
return extractionError(null, message, args);
}
/** Helper method for creating consistent exceptions during extraction. */
static ValidationException extractionError(Throwable cause, String message, Object... args) {
return new ValidationException(String.format(message, args), cause);
}
/**
* Collects the partially ordered type hierarchy (i.e. all involved super classes and super
* interfaces) of the given type.
*/
static List<Type> collectTypeHierarchy(Type type) {
Type currentType = type;
Class<?> currentClass = toClass(type);
final List<Type> typeHierarchy = new ArrayList<>();
while (currentClass != null) {
// collect type
typeHierarchy.add(currentType);
// collect super interfaces
for (Type genericInterface : currentClass.getGenericInterfaces()) {
final Class<?> interfaceClass = toClass(genericInterface);
if (interfaceClass != null) {
typeHierarchy.addAll(collectTypeHierarchy(genericInterface));
}
}
currentType = currentClass.getGenericSuperclass();
currentClass = toClass(currentType);
}
return typeHierarchy;
}
/** Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise. */
static @Nullable Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
// this is always a class
return (Class<?>) ((ParameterizedType) type).getRawType();
}
// unsupported: generic arrays, type variables, wildcard types
return null;
}
/** Creates a raw data type. */
@SuppressWarnings({"unchecked", "rawtypes"})
static DataType createRawType(
DataTypeFactory typeFactory,
@Nullable Class<? extends TypeSerializer<?>> rawSerializer,
@Nullable Class<?> conversionClass) {
if (rawSerializer != null) {
return DataTypes.RAW(
(Class) createConversionClass(conversionClass),
instantiateRawSerializer(rawSerializer));
}
return typeFactory.createRawDataType(createConversionClass(conversionClass));
}
static Class<?> createConversionClass(@Nullable Class<?> conversionClass) {
if (conversionClass != null) {
return conversionClass;
}
return Object.class;
}
private static TypeSerializer<?> instantiateRawSerializer(
Class<? extends TypeSerializer<?>> rawSerializer) {
try {
return rawSerializer.newInstance();
} catch (Exception e) {
throw extractionError(
e,
"Cannot instantiate type serializer '%s' for RAW type. "
+ "Make sure the class is publicly accessible and has a default constructor.",
rawSerializer.getName());
}
}
/** Resolves a {@link TypeVariable} using the given type hierarchy if possible. */
static Type resolveVariable(List<Type> typeHierarchy, TypeVariable<?> variable) {
// iterate through hierarchy from top to bottom until type variable gets a non-variable
// assigned
for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
final Type currentType = typeHierarchy.get(i);
if (currentType instanceof ParameterizedType) {
final Type resolvedType =
resolveVariableInParameterizedType(
variable, (ParameterizedType) currentType);
if (resolvedType instanceof TypeVariable) {
// follow type variables transitively
variable = (TypeVariable<?>) resolvedType;
} else if (resolvedType != null) {
return resolvedType;
}
}
}
// unresolved variable
return variable;
}
private static @Nullable Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
// search for matching type variable
for (int paramPos = 0; paramPos < currentVariables.length; paramPos++) {
if (typeVariableEquals(variable, currentVariables[paramPos])) {
return currentType.getActualTypeArguments()[paramPos];
}
}
return null;
}
private static boolean typeVariableEquals(
TypeVariable<?> variable, TypeVariable<?> currentVariable) {
return currentVariable.getGenericDeclaration().equals(variable.getGenericDeclaration())
&& currentVariable.getName().equals(variable.getName());
}
/**
* Validates if a given type is not already contained in the type hierarchy of a structured
* type.
*
* <p>Otherwise this would lead to infinite data type extraction cycles.
*/
static void validateStructuredSelfReference(Type t, List<Type> typeHierarchy) {
final Class<?> clazz = toClass(t);
if (clazz != null
&& !clazz.isInterface()
&& clazz != Object.class
&& typeHierarchy.contains(t)) {
throw extractionError(
"Cyclic reference detected for class '%s'. Attributes of structured types must not "
+ "(transitively) reference the structured type itself.",
clazz.getName());
}
}
/** Returns the fields of a class for a {@link StructuredType}. */
static List<Field> collectStructuredFields(Class<?> clazz) {
final List<Field> fields = new ArrayList<>();
while (clazz != Object.class) {
final Field[] declaredFields = clazz.getDeclaredFields();
Stream.of(declaredFields)
.filter(
field -> {
final int m = field.getModifiers();
return !Modifier.isStatic(m) && !Modifier.isTransient(m);
})
.forEach(fields::add);
clazz = clazz.getSuperclass();
}
return fields;
}
/** Validates if a field is properly readable either directly or through a getter. */
static void validateStructuredFieldReadability(Class<?> clazz, Field field) {
// field is accessible
if (isStructuredFieldDirectlyReadable(field)) {
return;
}
// field needs a getter
if (!getStructuredFieldGetter(clazz, field).isPresent()) {
throw extractionError(
"Field '%s' of class '%s' is neither publicly accessible nor does it have "
+ "a corresponding getter method.",
field.getName(), clazz.getName());
}
}
/**
* Checks if a field is mutable or immutable. Returns {@code true} if the field is properly
* mutable. Returns {@code false} if it is properly immutable.
*/
static boolean isStructuredFieldMutable(Class<?> clazz, Field field) {
final int m = field.getModifiers();
// field is immutable
if (Modifier.isFinal(m)) {
return false;
}
// field is directly mutable
if (Modifier.isPublic(m)) {
return true;
}
// field has setters by which it is mutable
if (getStructuredFieldSetter(clazz, field).isPresent()) {
return true;
}
throw extractionError(
"Field '%s' of class '%s' is mutable but is neither publicly accessible nor does it have "
+ "a corresponding setter method.",
field.getName(), clazz.getName());
}
/** Returns the boxed type of a primitive type. */
static Type primitiveToWrapper(Type type) {
if (type instanceof Class) {
return primitiveToWrapper((Class<?>) type);
}
return type;
}
/** Collects all methods that qualify as methods of a {@link StructuredType}. */
static List<Method> collectStructuredMethods(Class<?> clazz) {
final List<Method> methods = new ArrayList<>();
while (clazz != Object.class) {
final Method[] declaredMethods = clazz.getDeclaredMethods();
Stream.of(declaredMethods)
.filter(
field -> {
final int m = field.getModifiers();
return Modifier.isPublic(m)
&& !Modifier.isNative(m)
&& !Modifier.isAbstract(m);
})
.forEach(methods::add);
clazz = clazz.getSuperclass();
}
return methods;
}
/**
* Collects all annotations of the given type defined in the current class or superclasses.
* Duplicates are ignored.
*/
static <T extends Annotation> Set<T> collectAnnotationsOfClass(
Class<T> annotation, Class<?> annotatedClass) {
final List<Class<?>> classHierarchy = new ArrayList<>();
Class<?> currentClass = annotatedClass;
while (currentClass != null) {
classHierarchy.add(currentClass);
currentClass = currentClass.getSuperclass();
}
// convert to top down
Collections.reverse(classHierarchy);
return classHierarchy.stream()
.flatMap(c -> Stream.of(c.getAnnotationsByType(annotation)))
.collect(Collectors.toCollection(LinkedHashSet::new));
}
/**
* Collects all annotations of the given type defined in the given method. Duplicates are
* ignored.
*/
static <T extends Annotation> Set<T> collectAnnotationsOfMethod(
Class<T> annotation, Method annotatedMethod) {
return new LinkedHashSet<>(Arrays.asList(annotatedMethod.getAnnotationsByType(annotation)));
}
// --------------------------------------------------------------------------------------------
// Parameter Extraction Utilities
// --------------------------------------------------------------------------------------------
/** Result of the extraction in {@link #extractAssigningConstructor(Class, List)}. */
public static class AssigningConstructor {
public final Constructor<?> constructor;
public final List<String> parameterNames;
private AssigningConstructor(Constructor<?> constructor, List<String> parameterNames) {
this.constructor = constructor;
this.parameterNames = parameterNames;
}
}
/**
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
public static @Nullable AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
final boolean qualifyingConstructor =
Modifier.isPublic(constructor.getModifiers())
&& constructor.getParameterTypes().length == fields.size();
if (!qualifyingConstructor) {
continue;
}
final List<String> parameterNames =
extractConstructorParameterNames(constructor, fields);
if (parameterNames != null) {
if (foundConstructor != null) {
throw extractionError(
"Multiple constructors found that assign all fields for class '%s'.",
clazz.getName());
}
foundConstructor = new AssigningConstructor(constructor, parameterNames);
}
}
return foundConstructor;
}
/** Extracts the parameter names of a method if possible. */
static @Nullable List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
/**
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive and lenient) type and name.
*/
private static @Nullable List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
List<String> parameterNames = extractExecutableNames(constructor);
if (parameterNames == null) {
return null;
}
final Map<String, Field> fieldMap =
fields.stream()
.collect(
Collectors.toMap(
f -> normalizeAccessorName(f.getName()),
Function.identity()));
// check that all fields are represented in the parameters of the constructor
final List<String> fieldNames = new ArrayList<>();
for (int i = 0; i < parameterNames.size(); i++) {
final String parameterName = normalizeAccessorName(parameterNames.get(i));
final Field field = fieldMap.get(parameterName);
if (field == null) {
return null;
}
final Type fieldType = field.getGenericType();
final Type parameterType = parameterTypes[i];
// we are tolerant here because frameworks such as Avro accept a boxed type even though
// the field is primitive
if (!primitiveToWrapper(parameterType).equals(primitiveToWrapper(fieldType))) {
return null;
}
fieldNames.add(field.getName());
}
return fieldNames;
}
private static @Nullable List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
offset = 1;
} else {
offset = 0;
}
// by default parameter names are "arg0, arg1, arg2, ..." if compiler flag is not set
// so we need to extract them manually if possible
List<String> parameterNames =
Stream.of(executable.getParameters())
.map(Parameter::getName)
.collect(Collectors.toList());
if (parameterNames.stream().allMatch(n -> n.startsWith("arg"))) {
final ParameterExtractor extractor;
if (executable instanceof Constructor) {
extractor = new ParameterExtractor((Constructor<?>) executable);
} else {
extractor = new ParameterExtractor((Method) executable);
}
getClassReader(executable.getDeclaringClass()).accept(extractor, 0);
final List<String> extractedNames = extractor.getParameterNames();
if (extractedNames.size() == 0) {
return null;
}
// remove "this" and additional local variables
// select less names if class file has not the required information
parameterNames =
extractedNames.subList(
offset,
Math.min(
executable.getParameterCount() + offset,
extractedNames.size()));
}
if (parameterNames.size() != executable.getParameterCount()) {
return null;
}
return parameterNames;
}
private static ClassReader getClassReader(Class<?> cls) {
final String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
if(ClassPool.exist(cls.getName())){
return new ClassReader(ClassPool.get(cls.getName()).getClassByte());
}
try (InputStream i = cls.getResourceAsStream(className)) {
return new ClassReader(i);
} catch (IOException e) {
throw new IllegalStateException("Could not instantiate ClassReader.", e);
}
}
/**
* Extracts the parameter names and descriptors from a constructor or method. Assuming the
* existence of a local variable table.
*
* <p>For example:
*
* <pre>{@code
* public WC(java.lang.String arg0, long arg1) { // <init> //(Ljava/lang/String;J)V
* <localVar:index=0 , name=this , desc=Lorg/apache/flink/WC;, sig=null, start=L1, end=L2>
* <localVar:index=1 , name=word , desc=Ljava/lang/String;, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=frequency , desc=J, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=otherLocal , desc=J, sig=null, start=L1, end=L2>
* <localVar:index=2 , name=otherLocal2 , desc=J, sig=null, start=L1, end=L2>
* }
* }</pre>
*/
private static class ParameterExtractor extends ClassVisitor {
private static final int OPCODE = Opcodes.ASM7;
private final String methodDescriptor;
private final List<String> parameterNames = new ArrayList<>();
ParameterExtractor(Constructor<?> constructor) {
super(OPCODE);
methodDescriptor = getConstructorDescriptor(constructor);
}
ParameterExtractor(Method method) {
super(OPCODE);
methodDescriptor = getMethodDescriptor(method);
}
List<String> getParameterNames() {
return parameterNames;
}
@Override
public MethodVisitor visitMethod(
int access, String name, String descriptor, String signature, String[] exceptions) {
if (descriptor.equals(methodDescriptor)) {
return new MethodVisitor(OPCODE) {
@Override
public void visitLocalVariable(
String name,
String descriptor,
String signature,
Label start,
Label end,
int index) {
parameterNames.add(name);
}
};
}
return super.visitMethod(access, name, descriptor, signature, exceptions);
}
}
// --------------------------------------------------------------------------------------------
// Class Assignment and Boxing
//
// copied from o.a.commons.lang3.ClassUtils (commons-lang3:3.3.2)
// --------------------------------------------------------------------------------------------
/**
* Checks if one {@code Class} can be assigned to a variable of another {@code Class}.
*
* <p>Unlike the {@link Class#isAssignableFrom(java.lang.Class)} method, this method takes into
* account widenings of primitive classes and {@code null}s.
*
* <p>Primitive widenings allow an int to be assigned to a long, float or double. This method
* returns the correct result for these cases.
*
* <p>{@code Null} may be assigned to any reference type. This method will return {@code true}
* if {@code null} is passed in and the toClass is non-primitive.
*
* <p>Specifically, this method tests whether the type represented by the specified {@code
* Class} parameter can be converted to the type represented by this {@code Class} object via an
* identity conversion widening primitive or widening reference conversion. See <em><a
* href="http://docs.oracle.com/javase/specs/">The Java Language Specification</a></em>,
* sections 5.1.1, 5.1.2 and 5.1.4 for details.
*
* @param cls the Class to check, may be null
* @param toClass the Class to try to assign into, returns false if null
* @param autoboxing whether to use implicit autoboxing/unboxing between primitives and wrappers
* @return {@code true} if assignment possible
*/
public static boolean isAssignable(
Class<?> cls, final Class<?> toClass, final boolean autoboxing) {
if (toClass == null) {
return false;
}
// have to check for null, as isAssignableFrom doesn't
if (cls == null) {
return !toClass.isPrimitive();
}
// autoboxing:
if (autoboxing) {
if (cls.isPrimitive() && !toClass.isPrimitive()) {
cls = primitiveToWrapper(cls);
if (cls == null) {
return false;
}
}
if (toClass.isPrimitive() && !cls.isPrimitive()) {
cls = wrapperToPrimitive(cls);
if (cls == null) {
return false;
}
}
}
if (cls.equals(toClass)) {
return true;
}
if (cls.isPrimitive()) {
if (!toClass.isPrimitive()) {
return false;
}
if (Integer.TYPE.equals(cls)) {
return Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Long.TYPE.equals(cls)) {
return Float.TYPE.equals(toClass) || Double.TYPE.equals(toClass);
}
if (Boolean.TYPE.equals(cls)) {
return false;
}
if (Double.TYPE.equals(cls)) {
return false;
}
if (Float.TYPE.equals(cls)) {
return Double.TYPE.equals(toClass);
}
if (Character.TYPE.equals(cls)) {
return Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Short.TYPE.equals(cls)) {
return Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
if (Byte.TYPE.equals(cls)) {
return Short.TYPE.equals(toClass)
|| Integer.TYPE.equals(toClass)
|| Long.TYPE.equals(toClass)
|| Float.TYPE.equals(toClass)
|| Double.TYPE.equals(toClass);
}
// should never get here
return false;
}
return toClass.isAssignableFrom(cls);
}
/** Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */
private static final Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
static {
primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
primitiveWrapperMap.put(Byte.TYPE, Byte.class);
primitiveWrapperMap.put(Character.TYPE, Character.class);
primitiveWrapperMap.put(Short.TYPE, Short.class);
primitiveWrapperMap.put(Integer.TYPE, Integer.class);
primitiveWrapperMap.put(Long.TYPE, Long.class);
primitiveWrapperMap.put(Double.TYPE, Double.class);
primitiveWrapperMap.put(Float.TYPE, Float.class);
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
/** Maps wrapper {@code Class}es to their corresponding primitive types. */
private static final Map<Class<?>, Class<?>> wrapperPrimitiveMap = new HashMap<>();
static {
for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
if (!primitiveClass.equals(wrapperClass)) {
wrapperPrimitiveMap.put(wrapperClass, primitiveClass);
}
}
}
/**
* Converts the specified primitive Class object to its corresponding wrapper Class object.
*
* <p>NOTE: From v2.2, this method handles {@code Void.TYPE}, returning {@code Void.TYPE}.
*
* @param cls the class to convert, may be null
* @return the wrapper class for {@code cls} or {@code cls} if {@code cls} is not a primitive.
* {@code null} if null input.
* @since 2.1
*/
public static Class<?> primitiveToWrapper(final Class<?> cls) {
Class<?> convertedClass = cls;
if (cls != null && cls.isPrimitive()) {
convertedClass = primitiveWrapperMap.get(cls);
}
return convertedClass;
}
/**
* Converts the specified wrapper class to its corresponding primitive class.
*
* <p>This method is the counter part of {@code primitiveToWrapper()}. If the passed in class is
* a wrapper class for a primitive type, this primitive type will be returned (e.g. {@code
* Integer.TYPE} for {@code Integer.class}). For other classes, or if the parameter is
* <b>null</b>, the return value is <b>null</b>.
*
* @param cls the class to convert, may be <b>null</b>
* @return the corresponding primitive type if {@code cls} is a wrapper class, <b>null</b>
* otherwise
* @see #primitiveToWrapper(Class)
* @since 2.4
*/
public static Class<?> wrapperToPrimitive(final Class<?> cls) {
return wrapperPrimitiveMap.get(cls);
}
// --------------------------------------------------------------------------------------------
private ExtractionUtils() {
// no instantiation
}
}
package com.dlink.pool;
import com.dlink.assertion.Asserts;
import lombok.Getter;
import lombok.Setter;
/**
* ClassEntity
*
* @author wenmo
* @since 2022/1/12 23:52
*/
@Getter
@Setter
public class ClassEntity {
private String name;
private String code;
private byte[] classByte;
public ClassEntity(String name, String code) {
this.name = name;
this.code = code;
}
public ClassEntity(String name, String code, byte[] classByte) {
this.name = name;
this.code = code;
this.classByte = classByte;
}
public static ClassEntity build(String name, String code){
return new ClassEntity(name,code);
}
public boolean equals(ClassEntity entity) {
if (Asserts.isEquals(name, entity.getName()) && Asserts.isEquals(code, entity.getCode())){
return true;
}else{
return false;
}
}
}
package com.dlink.pool;
import java.util.List;
import java.util.Vector;
/**
* ClassPool
*
* @author wenmo
* @since 2022/1/12 23:52
*/
public class ClassPool {
private static volatile List<ClassEntity> classList = new Vector<>();
public static boolean exist(String name) {
for (ClassEntity executorEntity : classList) {
if (executorEntity.getName().equals(name)) {
return true;
}
}
return false;
}
public static boolean exist(ClassEntity entity) {
for (ClassEntity executorEntity : classList) {
if (executorEntity.equals(entity)) {
return true;
}
}
return false;
}
public static Integer push(ClassEntity executorEntity){
if(exist(executorEntity.getName())){
remove(executorEntity.getName());
}
classList.add(executorEntity);
return classList.size();
}
public static Integer remove(String name) {
int count = classList.size();
for (int i = 0; i < classList.size(); i++) {
if (name.equals(classList.get(i).getName())) {
classList.remove(i);
break;
}
}
return count - classList.size();
}
public static ClassEntity get(String name) {
for (ClassEntity executorEntity : classList) {
if (executorEntity.getName().equals(name)) {
return executorEntity;
}
}
return null;
}
}
...@@ -35,6 +35,11 @@ ...@@ -35,6 +35,11 @@
<groupId>cn.hutool</groupId> <groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId> <artifactId>hutool-all</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>3.0.9</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
......
...@@ -19,6 +19,8 @@ import com.dlink.gateway.result.TestResult; ...@@ -19,6 +19,8 @@ import com.dlink.gateway.result.TestResult;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.model.SystemConfiguration; import com.dlink.model.SystemConfiguration;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool;
import com.dlink.result.*; import com.dlink.result.*;
import com.dlink.session.ExecutorEntity; import com.dlink.session.ExecutorEntity;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
...@@ -26,6 +28,7 @@ import com.dlink.session.SessionInfo; ...@@ -26,6 +28,7 @@ import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import com.dlink.utils.UDFUtil;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.DeploymentOptions;
...@@ -43,6 +46,8 @@ import java.util.ArrayList; ...@@ -43,6 +46,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* JobManager * JobManager
...@@ -491,4 +496,22 @@ public class JobManager { ...@@ -491,4 +496,22 @@ public class JobManager {
sb.append(statement); sb.append(statement);
return sb.toString(); return sb.toString();
} }
public static List<String> getUDFClassName(String statement){
Pattern pattern = Pattern.compile("function (.*?)'(.*?)'", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(statement);
List<String> classNameList = new ArrayList<>();
while (matcher.find()) {
classNameList.add(matcher.group(2));
}
return classNameList;
}
public static void initUDF(String className,String code){
if(ClassPool.exist(ClassEntity.build(className,code))){
UDFUtil.initClassLoader(className);
}else{
UDFUtil.buildClass(code);
}
}
} }
package com.dlink.utils;
import javax.tools.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* CustomStringJavaCompiler
*
* @author wenmo
* @since 2021/12/28 22:46
*/
public class CustomStringJavaCompiler {
//类全名
private String fullClassName;
private String sourceCode;
//存放编译之后的字节码(key:类全名,value:编译之后输出的字节码)
private Map<String, ByteJavaFileObject> javaFileObjectMap = new ConcurrentHashMap<>();
//获取java的编译器
private JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
//存放编译过程中输出的信息
private DiagnosticCollector<JavaFileObject> diagnosticsCollector = new DiagnosticCollector<>();
//编译耗时(单位ms)
private long compilerTakeTime;
public String getFullClassName() {
return fullClassName;
}
public ByteJavaFileObject getJavaFileObjectMap(String name) {
return javaFileObjectMap.get(name);
}
public CustomStringJavaCompiler(String sourceCode) {
this.sourceCode = sourceCode;
this.fullClassName = getFullClassName(sourceCode);
}
/**
* 编译字符串源代码,编译失败在 diagnosticsCollector 中获取提示信息
*
* @return true:编译成功 false:编译失败
*/
public boolean compiler() {
long startTime = System.currentTimeMillis();
//标准的内容管理器,更换成自己的实现,覆盖部分方法
StandardJavaFileManager standardFileManager = compiler.getStandardFileManager(diagnosticsCollector, null, null);
JavaFileManager javaFileManager = new StringJavaFileManage(standardFileManager);
//构造源代码对象
JavaFileObject javaFileObject = new StringJavaFileObject(fullClassName, sourceCode);
//获取一个编译任务
JavaCompiler.CompilationTask task = compiler.getTask(null, javaFileManager, diagnosticsCollector, null, null, Arrays.asList(javaFileObject));
//设置编译耗时
compilerTakeTime = System.currentTimeMillis() - startTime;
return task.call();
}
/**
* @return 编译信息(错误 警告)
*/
public String getCompilerMessage() {
StringBuilder sb = new StringBuilder();
List<Diagnostic<? extends JavaFileObject>> diagnostics = diagnosticsCollector.getDiagnostics();
for (Diagnostic diagnostic : diagnostics) {
sb.append(diagnostic.toString()).append("\r\n");
}
return sb.toString();
}
public long getCompilerTakeTime() {
return compilerTakeTime;
}
/**
* 获取类的全名称
*
* @param sourceCode 源码
* @return 类的全名称
*/
public static String getFullClassName(String sourceCode) {
String className = "";
Pattern pattern = Pattern.compile("package\\s+\\S+\\s*;");
Matcher matcher = pattern.matcher(sourceCode);
if (matcher.find()) {
className = matcher.group().replaceFirst("package", "").replace(";", "").trim() + ".";
}
pattern = Pattern.compile("class\\s+(\\S+)\\s+");
matcher = pattern.matcher(sourceCode);
if (matcher.find()) {
className += matcher.group(1).trim();
}
return className;
}
/**
* 自定义一个字符串的源码对象
*/
private class StringJavaFileObject extends SimpleJavaFileObject {
//等待编译的源码字段
private String contents;
//java源代码 => StringJavaFileObject对象 的时候使用
public StringJavaFileObject(String className, String contents) {
super(URI.create("string:///" + className.replaceAll("\\.", "/") + Kind.SOURCE.extension), Kind.SOURCE);
this.contents = contents;
}
//字符串源码会调用该方法
@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
return contents;
}
}
/**
* 自定义一个编译之后的字节码对象
*/
public class ByteJavaFileObject extends SimpleJavaFileObject {
//存放编译后的字节码
private ByteArrayOutputStream outPutStream;
public ByteJavaFileObject(String className, Kind kind) {
super(URI.create("string:///" + className.replaceAll("\\.", "/") + Kind.SOURCE.extension), kind);
}
//StringJavaFileManage 编译之后的字节码输出会调用该方法(把字节码输出到outputStream)
@Override
public OutputStream openOutputStream() {
outPutStream = new ByteArrayOutputStream();
return outPutStream;
}
//在类加载器加载的时候需要用到
public byte[] getCompiledBytes() {
return outPutStream.toByteArray();
}
}
/**
* 自定义一个JavaFileManage来控制编译之后字节码的输出位置
*/
private class StringJavaFileManage extends ForwardingJavaFileManager {
StringJavaFileManage(JavaFileManager fileManager) {
super(fileManager);
}
//获取输出的文件对象,它表示给定位置处指定类型的指定类。
@Override
public JavaFileObject getJavaFileForOutput(Location location, String className, JavaFileObject.Kind kind, FileObject sibling) throws IOException {
ByteJavaFileObject javaFileObject = new ByteJavaFileObject(className, kind);
javaFileObjectMap.put(className, javaFileObject);
return javaFileObject;
}
}
}
package com.dlink.utils;
import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool;
import groovy.lang.GroovyClassLoader;
import org.codehaus.groovy.control.CompilerConfiguration;
/**
* UDFUtil
*
* @author wenmo
* @since 2021/12/27 23:25
*/
public class UDFUtil {
public static void buildClass(String code){
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(code);
boolean res = compiler.compiler();
if (res) {
String className = compiler.getFullClassName();
byte[] compiledBytes = compiler.getJavaFileObjectMap(className).getCompiledBytes();
ClassPool.push(new ClassEntity(className,code,compiledBytes));
System.out.println("编译成功");
System.out.println("compilerTakeTime:" + compiler.getCompilerTakeTime());
initClassLoader(className);
} else {
System.out.println("编译失败");
System.out.println(compiler.getCompilerMessage());
}
}
public static void initClassLoader(String name){
ClassEntity classEntity = ClassPool.get(name);
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
CompilerConfiguration config = new CompilerConfiguration();
config.setSourceEncoding("UTF-8");
GroovyClassLoader groovyClassLoader = new GroovyClassLoader(contextClassLoader, config);
groovyClassLoader.setShouldRecompile(true);
groovyClassLoader.defineClass(classEntity.getName(),classEntity.getClassByte());
Thread.currentThread().setContextClassLoader(groovyClassLoader);
// Class<?> clazz = groovyClassLoader.parseClass(codeSource,"com.dlink.ud.udf.SubstringFunction");
}
}
@import '~antd/es/style/themes/default.less';
.form_setting{
padding-left: 10px;
}
.form_item{
margin-bottom: 5px;
}
import {connect} from "umi";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {Form, Switch, Row, Col, Tooltip, Button, Input} from "antd";
import {InfoCircleOutlined,MinusSquareOutlined} from "@ant-design/icons";
import styles from "./index.less";
import {useEffect} from "react";
import {JarStateType} from "@/pages/Jar/model";
import {Scrollbars} from "react-custom-scrollbars";
const StudioUDFInfo = (props: any) => {
const { current, form, toolHeight} = props;
useEffect(() => {
form.setFieldsValue(current.task);
}, [current.task]);
return (
<>
<Row>
<Col span={24}>
<div style={{float: "right"}}>
<Tooltip title="最小化">
<Button
type="text"
icon={<MinusSquareOutlined/>}
/>
</Tooltip>
</div>
</Col>
</Row>
<Scrollbars style={{height: (toolHeight - 32)}}>
<Form
form={form}
layout="vertical"
className={styles.form_setting}
>
<Row>
<Col span={24}>
<Form.Item
label="类名" className={styles.form_item} name="savePointPath"
>
<Input readOnly={true} placeholder="自动识别"/>
</Form.Item>
</Col>
</Row>
</Form>
</Scrollbars>
</>
);
};
export default connect(({Studio, Jar}: { Studio: StateType, Jar: JarStateType }) => ({
current: Studio.current,
toolHeight: Studio.toolHeight,
}))(StudioUDFInfo);
...@@ -8,6 +8,7 @@ import StudioSetting from "./StudioSetting"; ...@@ -8,6 +8,7 @@ import StudioSetting from "./StudioSetting";
import StudioSavePoint from "./StudioSavePoint"; import StudioSavePoint from "./StudioSavePoint";
import StudioEnvSetting from "./StudioEnvSetting"; import StudioEnvSetting from "./StudioEnvSetting";
import StudioSqlConfig from "./StudioSqlConfig"; import StudioSqlConfig from "./StudioSqlConfig";
import StudioUDFInfo from "./StudioUDFInfo";
import {DIALECT, isSql} from "@/components/Studio/conf"; import {DIALECT, isSql} from "@/components/Studio/conf";
const { TabPane } = Tabs; const { TabPane } = Tabs;
...@@ -26,7 +27,7 @@ const StudioRightTool = (props:any) => { ...@@ -26,7 +27,7 @@ const StudioRightTool = (props:any) => {
return renderEnvContent(); return renderEnvContent();
} }
if(DIALECT.JAVA === current.task.dialect){ if(DIALECT.JAVA === current.task.dialect){
return undefined; return renderUDFContent();
} }
return renderFlinkSqlContent(); return renderFlinkSqlContent();
}; };
...@@ -47,6 +48,14 @@ const StudioRightTool = (props:any) => { ...@@ -47,6 +48,14 @@ const StudioRightTool = (props:any) => {
</>) </>)
}; };
const renderUDFContent = () => {
return (<>
<TabPane tab={<span><SettingOutlined /> UDF信息</span>} key="StudioUDFInfo" >
<StudioUDFInfo form={form}/>
</TabPane>
</>)
};
const renderFlinkSqlContent = () => { const renderFlinkSqlContent = () => {
return (<><TabPane tab={<span><SettingOutlined /> 作业配置</span>} key="StudioSetting" > return (<><TabPane tab={<span><SettingOutlined /> 作业配置</span>} key="StudioSetting" >
<StudioSetting form={form} /> <StudioSetting form={form} />
......
...@@ -547,6 +547,9 @@ export default (): React.ReactNode => { ...@@ -547,6 +547,9 @@ export default (): React.ReactNode => {
<li> <li>
<Link>新增 快捷键保存、校验、美化</Link> <Link>新增 快捷键保存、校验、美化</Link>
</li> </li>
<li>
<Link>新增 UDF Java方言的Local模式的在线编写、调试、动态加载</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
...@@ -95,12 +95,12 @@ Dinky 通过已注册的集群配置来获取对应的 YarnClient 实例。对 ...@@ -95,12 +95,12 @@ Dinky 通过已注册的集群配置来获取对应的 YarnClient 实例。对
| | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 | | | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 | | | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 UDF Java 方言Local模式在线编写、调试、动态加载 | 0.5.0 |
| | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 | | | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 |
| | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 | | | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 |
| | | 支持 作业 Cancel | 0.4.0 | | | | 支持 作业 Cancel | 0.4.0 |
| | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 | | | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 |
| | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 | | | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 |
| | | 新增 UDF java方言代码的开发 | 0.5.0 |
| | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 | | | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 |
| | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 | | | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 |
| | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 | | | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 |
......
...@@ -95,12 +95,12 @@ Dinky 通过已注册的集群配置来获取对应的 YarnClient 实例。对 ...@@ -95,12 +95,12 @@ Dinky 通过已注册的集群配置来获取对应的 YarnClient 实例。对
| | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 | | | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 | | | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 | | | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.5.0 |
| | | 支持 UDF Java 方言Local模式在线编写、调试、动态加载 | 0.5.0 |
| | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 | | | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 |
| | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 | | | | 支持 k8s application 模式下 Jar 提交 | 0.5.0 |
| | | 支持 作业 Cancel | 0.4.0 | | | | 支持 作业 Cancel | 0.4.0 |
| | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 | | | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 |
| | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 | | | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 |
| | | 新增 UDF java方言代码的开发 | 0.5.0 |
| | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 | | | Flink 集群 | 支持 查看已注册集群的作业列表与运维 | 0.4.0 |
| | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 | | | | 新增 自动注册 Yarn 创建的集群 | 0.4.0 |
| | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 | | | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 |
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment