Unverified Commit 66eee408 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-503][client] Remove initFunctions

[Fix-503][client] Remove initFunctions
parents 514fd459 d5ea3676
......@@ -278,23 +278,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
return record;
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
}
public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
}
public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
List<Operation> operations = getParser().parse(statement);
for (Operation operation : operations) {
......
......@@ -286,23 +286,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
return record;
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
}
public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
}
public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
List<Operation> operations = getParser().parse(statement);
for (Operation operation : operations) {
......
package com.dlink.interceptor;
import org.apache.flink.table.api.TableResult;
import com.dlink.assertion.Asserts;
import com.dlink.catalog.function.FunctionManager;
import com.dlink.catalog.function.UDFunction;
import com.dlink.executor.CustomTableEnvironmentImpl;
import com.dlink.executor.Executor;
import com.dlink.trans.Operation;
import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* FlinkInterceptor
......@@ -31,7 +21,6 @@ public class FlinkInterceptor {
if (executor.isUseSqlFragment()) {
statement = executor.getSqlManager().parseVariable(statement);
}
// initFunctions(executor.getCustomTableEnvironmentImpl(), statement);
return statement.trim();
}
......@@ -46,29 +35,4 @@ public class FlinkInterceptor {
}
return FlinkInterceptorResult.build(noExecute, tableResult);
}
@Deprecated
private static void initFunctions(CustomTableEnvironmentImpl stEnvironment, String statement) {
Map<String, UDFunction> usedFunctions = FunctionManager.getUsedFunctions(statement);
String[] udfs = stEnvironment.listUserDefinedFunctions();
List<String> udflist = Arrays.asList(udfs);
for (Map.Entry<String, UDFunction> entry : usedFunctions.entrySet()) {
if (!udflist.contains(entry.getKey())) {
if (entry.getValue().getType() == UDFunction.UDFunctionType.Scalar) {
stEnvironment.registerFunction(entry.getKey(),
(ScalarFunction) entry.getValue().getFunction());
} else if (entry.getValue().getType() == UDFunction.UDFunctionType.Table) {
stEnvironment.registerFunction(entry.getKey(),
(TableFunction) entry.getValue().getFunction());
} else if (entry.getValue().getType() == UDFunction.UDFunctionType.Aggregate) {
stEnvironment.registerFunction(entry.getKey(),
(AggregateFunction) entry.getValue().getFunction());
} else if (entry.getValue().getType() == UDFunction.UDFunctionType.TableAggregate) {
stEnvironment.registerFunction(entry.getKey(),
(TableAggregateFunction) entry.getValue().getFunction());
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment