Unverified Commit 6985b446 authored by byd-android-2017's avatar byd-android-2017 Committed by GitHub

修订TypeImformation原因,自定义隐函数不能在Flink 1.14.x 版本以上运行。 (#973)

1.引入泛型,以自适应多种数据类型;
2.直接返回Map<K,V>,避免数据转换
parent 4714b27e
......@@ -21,27 +21,44 @@ package com.dlink.ud.udf;
import org.apache.flink.table.functions.ScalarFunction;
/**
* GetKey
*
* @author wenmo
* @since 2021/5/25 15:50
**/
import java.util.Objects;
public class GetKey extends ScalarFunction {
public int eval(String map, String key, int defaultValue) {
if (map == null || !map.contains(key)) {
return defaultValue;
}
String[] maps = extractProperties(map);
for (String s : maps) {
String[] items = s.split("=");
if (items.length == 2 && Objects.equals(key, items[0])) {
return Integer.parseInt(items[1]);
}
}
return defaultValue;
}
public String eval(String map, String key, String defaultValue) {
if (map == null || !map.contains(key)) {
return defaultValue;
}
String[] maps = map.replaceAll("\\{", "").replaceAll("\\}", "").split(",");
for (int i = 0; i < maps.length; i++) {
String[] items = maps[i].split("=");
if (items.length >= 2) {
if (key.equals(items[0].trim())) {
return items[1];
}
String[] maps = extractProperties(map);
for (String s : maps) {
String[] items = s.split("=");
if (items.length == 2 && Objects.equals(key, items[0])) {
return items[1];
}
}
return defaultValue;
}
private String[] extractProperties(String map) {
map = map.replace("{", "").replace("}", "");
return map.split(", ");
}
}
......@@ -19,49 +19,123 @@
package com.dlink.ud.udtaf;
import com.dlink.ud.udtaf.RowsToMap.MyAccum;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* RowsToMap
*
* @author wenmo
* @param <K> Map key type
* @param <V> Map value type
* @author wenmo, lixiaoPing
* @since 2021/5/25 15:50
**/
public class RowsToMap extends TableAggregateFunction<String, Map> {
public class RowsToMap<K, V> extends TableAggregateFunction<Map<K, V>, MyAccum<K, V>> {
private static final long serialVersionUID = 42L;
@Override
public Map createAccumulator() {
return new HashMap();
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.inputTypeStrategy(InputTypeStrategies.sequence(
InputTypeStrategies.ANY,
InputTypeStrategies.ANY
))
.accumulatorTypeStrategy(callContext -> {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
final DataType arg0DataType = argumentDataTypes.get(0);
final DataType arg1DataType = argumentDataTypes.get(1);
final DataType accDataType = DataTypes.STRUCTURED(
MyAccum.class,
DataTypes.FIELD("mapView",
DataTypes.MAP(arg0DataType, arg1DataType)));
return Optional.of(accDataType);
})
.outputTypeStrategy(callContext -> {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
final DataType arg0DataType = argumentDataTypes.get(0);
final DataType arg1DataType = argumentDataTypes.get(1);
return Optional.of(DataTypes.MAP(arg0DataType, arg1DataType));
})
.build();
}
public void accumulate(Map acc, String cls, Object v, String key) {
String[] keys = key.split(",");
for (int i = 0; i < keys.length; i++) {
if (keys[i].equals(cls)) {
acc.put(cls, v);
}
@Override
public MyAccum<K, V> createAccumulator() {
return new MyAccum<>();
}
public void accumulate(
MyAccum<K, V> acc, K cls, V v) {
if (v == null) {
return;
}
acc.mapView.put(cls, v);
}
public void accumulate(Map acc, String cls, Object v) {
acc.put(cls, v);
/**
* Retracts the input values from the accumulator instance. The current design assumes the
* inputs are the values that have been previously accumulated. The method retract can be
* overloaded with different custom types and arguments. This function must be implemented for
* datastream bounded over aggregate.
*
* @param acc the accumulator which contains the current aggregated results
*/
public void retract(MyAccum<K, V> acc, K cls, V v) {
if (v == null) {
return;
}
acc.mapView.remove(cls);
}
public void merge(Map acc, Iterable<Map> iterable) {
for (Map otherAcc : iterable) {
Iterator iter = otherAcc.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
accumulate(acc, entry.getKey().toString(), entry.getValue());
/**
* Merges a group of accumulator instances into one accumulator instance. This function must be
* implemented for datastream session window grouping aggregate and bounded grouping aggregate.
*
* @param acc the accumulator which will keep the merged aggregate results. It should be
* noted that the accumulator may contain the previous aggregated results.
* Therefore user should not replace or clean this instance in the custom merge
* method.
* @param iterable an {@link Iterable} pointed to a group of accumulators that will be merged.
*/
public void merge(MyAccum<K, V> acc, Iterable<MyAccum<K, V>> iterable) {
for (MyAccum<K, V> otherAcc : iterable) {
for (Map.Entry<K, V> entry : otherAcc.mapView.entrySet()) {
accumulate(acc, entry.getKey(), entry.getValue());
}
}
}
public void emitValue(Map acc, Collector<String> out) {
out.collect(acc.toString());
public void emitValue(MyAccum<K, V> acc, Collector<Map<K, V>> out) {
out.collect(acc.mapView);
}
public static class MyAccum<K, V> {
/**
* 不能 final
*/
public Map<K, V> mapView;
/**
* 不能删除,否则不能生成查询计划
*/
public MyAccum() {
this.mapView = new HashMap<>();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment