Unverified Commit f44cfd2d authored by Kerwin's avatar Kerwin Committed by GitHub

Added dlink-client module code style. (#898)

parent f8c751af
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.*;
import com.dlink.model.Column;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
......@@ -32,20 +35,41 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.*;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* AbstractCDCBuilder
......@@ -111,6 +135,7 @@ public abstract class AbstractSinkBuilder {
}
});
}
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
......@@ -118,6 +143,7 @@ public abstract class AbstractSinkBuilder {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
......
......@@ -17,17 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,15 +17,14 @@
*
*/
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* SinkBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,13 @@
*
*/
package com.dlink.cdc.doris;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
......@@ -45,7 +45,7 @@ import java.util.Map;
**/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-doris";
private static final String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() {
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc.kafka;
import com.dlink.assertion.Asserts;
......@@ -28,6 +27,7 @@ import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
......@@ -54,7 +54,7 @@ import java.util.Map;
**/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "datastream-kafka";
private static final String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() {
}
......
......@@ -17,9 +17,15 @@
*
*/
package com.dlink.cdc.mysql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -32,12 +38,6 @@ import java.util.Properties;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
/**
* MysqlCDCBuilder
......@@ -47,8 +47,8 @@ import com.dlink.model.FlinkCDCConfig;
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql";
private static final String KEY_WORD = "mysql-cdc";
private static final String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() {
}
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
......@@ -31,6 +30,7 @@ import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -45,19 +45,29 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.xml.bind.DatatypeConverter;
/**
* SQLSinkBuilder
......@@ -67,7 +77,7 @@ import java.util.*;
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
......@@ -167,7 +177,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
modifyOperations.add((ModifyOperation) operation);
}
}
}catch (Exception e) {
} catch (Exception e) {
logger.error("Translate to plan occur exception: {}", e);
throw e;
}
......@@ -191,7 +201,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
if (Asserts.isNotNullString(timeZone)) {
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
......
......@@ -17,9 +17,10 @@
*
*/
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
......@@ -66,7 +67,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
......@@ -103,7 +103,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
this.executionEnvironment = executionEnvironment;
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
}
......@@ -175,7 +174,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
classLoader);
}
private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
......
......@@ -17,30 +17,51 @@
*
*/
package org.apache.flink.table.types.extraction;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.shaded.asm7.org.objectweb.asm.*;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.*;
import java.util.*;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
import javax.annotation.Nullable;
/**
* Utilities for performing reflection tasks.
......@@ -355,8 +376,7 @@ public final class ExtractionUtils {
/**
* Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise.
*/
static @Nullable
Class<?> toClass(Type type) {
static @Nullable Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
......@@ -428,9 +448,7 @@ public final class ExtractionUtils {
return variable;
}
private static @Nullable
Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
private static @Nullable Type resolveVariableInParameterizedType(TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
// search for matching type variable
......@@ -442,8 +460,7 @@ public final class ExtractionUtils {
return null;
}
private static boolean typeVariableEquals(
TypeVariable<?> variable, TypeVariable<?> currentVariable) {
private static boolean typeVariableEquals(TypeVariable<?> variable, TypeVariable<?> currentVariable) {
return currentVariable.getGenericDeclaration().equals(variable.getGenericDeclaration())
&& currentVariable.getName().equals(variable.getName());
}
......@@ -611,9 +628,7 @@ public final class ExtractionUtils {
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
static @Nullable
AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
static @Nullable AssigningConstructor extractAssigningConstructor(Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
final boolean qualifyingConstructor =
......@@ -639,8 +654,7 @@ public final class ExtractionUtils {
/**
* Extracts the parameter names of a method if possible.
*/
static @Nullable
List<String> extractMethodParameterNames(Method method) {
static @Nullable List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
......@@ -648,8 +662,7 @@ public final class ExtractionUtils {
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive) type and name.
*/
private static @Nullable
List<String> extractConstructorParameterNames(
private static @Nullable List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
......@@ -676,8 +689,7 @@ public final class ExtractionUtils {
return parameterNames;
}
private static @Nullable
List<String> extractExecutableNames(Executable executable) {
private static @Nullable List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.*;
import com.dlink.model.Column;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
......@@ -32,20 +35,41 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.*;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* AbstractCDCBuilder
......@@ -111,6 +135,7 @@ public abstract class AbstractSinkBuilder {
}
});
}
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
......@@ -118,6 +143,7 @@ public abstract class AbstractSinkBuilder {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
......
......@@ -17,17 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,15 +17,14 @@
*
*/
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* SinkBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,13 @@
*
*/
package com.dlink.cdc.doris;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
......@@ -45,7 +45,7 @@ import java.util.Map;
**/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-doris";
private static final String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() {
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc.kafka;
import com.dlink.assertion.Asserts;
......@@ -28,6 +27,7 @@ import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
......@@ -54,7 +54,7 @@ import java.util.Map;
**/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "datastream-kafka";
private static final String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() {
}
......
......@@ -17,9 +17,15 @@
*
*/
package com.dlink.cdc.mysql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -33,12 +39,6 @@ import java.util.Properties;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
/**
* MysqlCDCBuilder
......@@ -48,8 +48,8 @@ import com.dlink.model.FlinkCDCConfig;
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql";
private static final String KEY_WORD = "mysql-cdc";
private static final String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() {
}
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
......@@ -31,6 +30,7 @@ import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -45,19 +45,29 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.xml.bind.DatatypeConverter;
/**
* SQLSinkBuilder
......@@ -67,7 +77,7 @@ import java.util.*;
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
......@@ -167,7 +177,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
modifyOperations.add((ModifyOperation) operation);
}
}
}catch (Exception e) {
} catch (Exception e) {
logger.error("Translate to plan occur exception: {}", e);
throw e;
}
......@@ -191,7 +201,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
if (Asserts.isNotNullString(timeZone)) {
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
......
......@@ -17,9 +17,10 @@
*
*/
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
......@@ -68,7 +69,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
......@@ -176,7 +176,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
classLoader);
}
private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
......@@ -197,7 +196,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
}
}
public ObjectNode getStreamGraph(String statement) {
List<Operation> operations = super.parser.parse(statement);
if (operations.size() != 1) {
......
......@@ -17,24 +17,33 @@
*
*/
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
/**
* 定制CustomTableResultImpl
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
......
......@@ -19,27 +19,49 @@
package org.apache.flink.table.types.extraction;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.shaded.asm7.org.objectweb.asm.*;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.*;
import java.util.*;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
import javax.annotation.Nullable;
/**
* Utilities for performing reflection tasks.
......@@ -354,8 +376,7 @@ public final class ExtractionUtils {
/**
* Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise.
*/
static @Nullable
Class<?> toClass(Type type) {
static @Nullable Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
......@@ -427,8 +448,7 @@ public final class ExtractionUtils {
return variable;
}
private static @Nullable
Type resolveVariableInParameterizedType(
private static @Nullable Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
......@@ -610,8 +630,7 @@ public final class ExtractionUtils {
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
static @Nullable
AssigningConstructor extractAssigningConstructor(
static @Nullable AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
......@@ -638,8 +657,7 @@ public final class ExtractionUtils {
/**
* Extracts the parameter names of a method if possible.
*/
static @Nullable
List<String> extractMethodParameterNames(Method method) {
static @Nullable List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
......@@ -647,8 +665,7 @@ public final class ExtractionUtils {
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive) type and name.
*/
private static @Nullable
List<String> extractConstructorParameterNames(
private static @Nullable List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
......@@ -675,8 +692,7 @@ public final class ExtractionUtils {
return parameterNames;
}
private static @Nullable
List<String> extractExecutableNames(Executable executable) {
private static @Nullable List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.*;
import com.dlink.model.Column;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
......@@ -32,20 +35,41 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.*;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* AbstractCDCBuilder
......@@ -111,6 +135,7 @@ public abstract class AbstractSinkBuilder {
}
});
}
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
......@@ -118,6 +143,7 @@ public abstract class AbstractSinkBuilder {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
......
......@@ -17,17 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,15 +17,14 @@
*
*/
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* SinkBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,13 @@
*
*/
package com.dlink.cdc.doris;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
......@@ -45,7 +45,7 @@ import java.util.Map;
**/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-doris";
private static final String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() {
......
......@@ -17,13 +17,13 @@
*
*/
package com.dlink.cdc.hudi;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -51,7 +51,7 @@ import java.util.Map;
*/
public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-hudi";
private static final String KEY_WORD = "datastream-hudi";
private static final long serialVersionUID = 5324199407472847422L;
public HudiSinkBuilder() {
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc.kafka;
import com.dlink.assertion.Asserts;
......@@ -28,6 +27,7 @@ import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
......@@ -54,7 +54,7 @@ import java.util.Map;
**/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "datastream-kafka";
private static final String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() {
}
......
......@@ -9,10 +9,7 @@ import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.ObjectConvertUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
......@@ -23,27 +20,28 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
/**
* @className: com.dlink.cdc.kafka.KafkaSinkSimpleBuilder
*/
public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-kafka-json";
private static final String KEY_WORD = "datastream-kafka-json";
private transient ObjectMapper objectMapper;
public KafkaSinkJsonBuilder() {
......@@ -69,11 +67,11 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
try{
try {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
if(objectMapper == null){
if (objectMapper == null) {
initializeObjectMapper();
}
return objectMapper.readValue(value, Map.class);
......@@ -101,42 +99,42 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
List<String> columnNameList = new LinkedList<>();
List<LogicalType> columnTypeList = new LinkedList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
SingleOutputStreamOperator<String> stringOperator =filterOperator.process(new ProcessFunction<Map, String>() {
SingleOutputStreamOperator<String> stringOperator = filterOperator.process(new ProcessFunction<Map, String>() {
@Override
public void processElement(Map value, Context context, Collector<String> collector) throws Exception {
Map after = null;
Map before = null;
String ts_ms = value.get("ts_ms").toString();
String tsMs = value.get("ts_ms").toString();
try {
switch (value.get("op").toString()) {
case "r":
case "c":
after = (Map) value.get("after");
convertAttr(columnNameList,columnTypeList,after,value.get("op").toString(),0,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, after, value.get("op").toString(), 0, schemaName, tableName, tsMs);
break;
case "u":
before = (Map) value.get("before");
convertAttr(columnNameList,columnTypeList,before,value.get("op").toString(),1,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs);
after = (Map) value.get("after");
convertAttr(columnNameList,columnTypeList,after,value.get("op").toString(),0,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, after, value.get("op").toString(), 0, schemaName, tableName, tsMs);
break;
case "d":
before = (Map) value.get("before");
convertAttr(columnNameList,columnTypeList,before,value.get("op").toString(),1,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Exception:", e);
throw e;
}
if(objectMapper == null){
if (objectMapper == null) {
initializeObjectMapper();
}
if(before != null){
if (before != null) {
collector.collect(objectMapper.writeValueAsString(before));
}
if(after != null){
if (after != null) {
collector.collect(objectMapper.writeValueAsString(after));
}
}
......@@ -147,7 +145,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
}
}
}
}catch (Exception ex){
} catch (Exception ex) {
logger.error("kafka sink error:",ex);
}
return dataStreamSource;
......@@ -176,8 +174,8 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
return ObjectConvertUtil.convertValue(value,logicalType);
}
private void convertAttr(List<String> columnNameList,List<LogicalType> columnTypeList,Map value,String op,int is_deleted,
String schemaName,String tableName,String ts_ms){
private void convertAttr(List<String> columnNameList, List<LogicalType> columnTypeList, Map value, String op, int isDeleted,
String schemaName, String tableName, String tsMs) {
for (int i = 0; i < columnNameList.size(); i++) {
String columnName = columnNameList.get(i);
Object columnNameValue = value.remove(columnName);
......@@ -185,9 +183,9 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
value.put(columnName, columnNameNewVal);
}
value.put("__op",op);
value.put("is_deleted",Integer.valueOf(is_deleted));
value.put("is_deleted",Integer.valueOf(isDeleted));
value.put("db",schemaName);
value.put("table",tableName);
value.put("ts_ms",ts_ms);
value.put("ts_ms",tsMs);
}
}
......@@ -17,9 +17,15 @@
*
*/
package com.dlink.cdc.mysql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -32,16 +38,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* MysqlCDCBuilder
......@@ -51,8 +50,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql";
private static final String KEY_WORD = "mysql-cdc";
private static final String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() {
}
......
package com.dlink.cdc.mysql;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
......@@ -12,12 +11,13 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
/**
* @version 1.0
* @className: com.dlink.cdc.mysql.MysqlJsonDebeziumDeserializationSchema
* @Description:
* @author: jack zhong
* @date 8/2/221:43 PM
*/
public class MysqlJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = 1L;
......
......@@ -17,9 +17,15 @@
*
*/
package com.dlink.cdc.oracle;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -28,12 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
......@@ -46,8 +46,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/
public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "oracle-cdc";
private final static String METADATA_TYPE = "Oracle";
private static final String KEY_WORD = "oracle-cdc";
private static final String METADATA_TYPE = "Oracle";
public OracleCDCBuilder() {
}
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
......@@ -31,6 +30,7 @@ import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -45,19 +45,29 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.xml.bind.DatatypeConverter;
/**
* SQLSinkBuilder
......@@ -67,7 +77,7 @@ import java.util.*;
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
......@@ -139,6 +149,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
}, rowTypeInfo);
}
private void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
......@@ -166,7 +177,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
modifyOperations.add((ModifyOperation) operation);
}
}
}catch (Exception e) {
} catch (Exception e) {
logger.error("Translate to plan occur exception: {}", e);
throw e;
}
......@@ -190,7 +201,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
if (Asserts.isNotNullString(timeZone)) {
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
......
......@@ -6,38 +6,38 @@ import com.dlink.model.Column;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import com.dlink.utils.ObjectConvertUtil;
import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
/**
* StarrocksSinkBuilder
*
**/
public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-starrocks";
private static final String KEY_WORD = "datastream-starrocks";
private static final long serialVersionUID = 8330362249137431824L;
private final ZoneId sinkZoneIdUTC = ZoneId.of("UTC");
......@@ -65,23 +65,23 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
try{
try {
List<Column> columns = table.getColumns();
List<String> primaryKeys = new LinkedList<>();
String[] columnNames = new String[columns.size()];
for(int i=0;i<columns.size();i++){
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
if(column.isKeyFlag()){
if (column.isKeyFlag()) {
primaryKeys.add(column.getName());
}
columnNames[i] = column.getName();
}
String[] primaryKeyArrays = primaryKeys.stream().toArray(String[]::new);
DataType[] dataTypes = new DataType[columnTypeList.size()];
for(int i = 0 ; i < columnTypeList.size() ; i++){
for (int i = 0; i < columnTypeList.size(); i++) {
LogicalType logicalType = columnTypeList.get(i);
String columnName = columnNameList.get(i);
if(primaryKeys.contains(columnName)){
if (primaryKeys.contains(columnName)) {
logicalType = logicalType.copy(false);
}
dataTypes[i] = TypeConversions.fromLogicalToDataType(logicalType);
......@@ -99,8 +99,8 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
.withProperty("sink.properties.strip_outer_array", "true")
// 设置并行度,多并行度情况下需要考虑如何保证数据有序性
.withProperty("sink.parallelism", "1");
sink.forEach((key,value)->{
if(key.startsWith("sink.")){
sink.forEach((key,value) -> {
if (key.startsWith("sink.")) {
builder.withProperty(key,value);
}
});
......@@ -111,7 +111,7 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
);
rowDataDataStream.addSink(starrocksSinkFunction);
logger.info("handler connector name:{} sink successful.....",getHandle());
}catch (Exception ex){
} catch (Exception ex) {
logger.error("handler connector name:{} sink ex:",getHandle(),ex);
}
}
......@@ -122,9 +122,9 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
if (object == null) {
return null;
}
if(logicalType instanceof TimestampType && object instanceof LocalDateTime){
if (logicalType instanceof TimestampType && object instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime) object);
}else if(logicalType instanceof DateType){
} else if (logicalType instanceof DateType) {
if (value instanceof Integer) {
return Instant.ofEpochSecond((int) value).atZone(sinkZoneIdUTC).toEpochSecond();
}
......
......@@ -17,9 +17,11 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
......@@ -51,10 +53,6 @@ import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
......@@ -75,8 +73,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.dlink.assertion.Asserts;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
......@@ -34,11 +33,21 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
/**
* 定制CustomTableResultImpl
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
......
package com.dlink.utils;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import javax.xml.bind.DatatypeConverter;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import javax.xml.bind.DatatypeConverter;
/**
* @className: com.dlink.utils.ObjectConvertUtil
* @Description:
* @author: jack zhong
* @date 8/10/222:49 PM
*/
public class ObjectConvertUtil {
public static Object convertValue(Object value, LogicalType logicalType){
public static Object convertValue(Object value, LogicalType logicalType) {
return ObjectConvertUtil.convertValue(value,logicalType,null);
}
......@@ -23,7 +28,7 @@ public class ObjectConvertUtil {
if (value == null) {
return null;
}
if(sinkTimeZone == null){
if (sinkTimeZone == null) {
sinkTimeZone = ZoneId.of("UTC");
}
if (logicalType instanceof DateType) {
......
......@@ -17,33 +17,53 @@
*
*/
package org.apache.flink.table.types.extraction;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.shaded.asm7.org.objectweb.asm.*;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.*;
import java.util.*;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
import javax.annotation.Nullable;
/**
* Utilities for performing reflection tasks.
......@@ -384,8 +404,7 @@ public final class ExtractionUtils {
/**
* Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise.
*/
static @Nullable
Class<?> toClass(Type type) {
static @Nullable Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
......@@ -457,8 +476,7 @@ public final class ExtractionUtils {
return variable;
}
private static @Nullable
Type resolveVariableInParameterizedType(
private static @Nullable Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
......@@ -640,8 +658,7 @@ public final class ExtractionUtils {
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
public static @Nullable
AssigningConstructor extractAssigningConstructor(
public static @Nullable AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
......@@ -668,8 +685,7 @@ public final class ExtractionUtils {
/**
* Extracts the parameter names of a method if possible.
*/
static @Nullable
List<String> extractMethodParameterNames(Method method) {
static @Nullable List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
......@@ -677,8 +693,7 @@ public final class ExtractionUtils {
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive and lenient) type and name.
*/
private static @Nullable
List<String> extractConstructorParameterNames(
private static @Nullable List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
......@@ -715,8 +730,7 @@ public final class ExtractionUtils {
return fieldNames;
}
private static @Nullable
List<String> extractExecutableNames(Executable executable) {
private static @Nullable List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.*;
import com.dlink.model.Column;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
......@@ -32,20 +35,41 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.*;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* AbstractCDCBuilder
......@@ -111,6 +135,7 @@ public abstract class AbstractSinkBuilder {
}
});
}
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
......@@ -118,6 +143,7 @@ public abstract class AbstractSinkBuilder {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
......
......@@ -17,18 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/**
* CDCBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,15 +17,14 @@
*
*/
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* SinkBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,13 @@
*
*/
package com.dlink.cdc.doris;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
......@@ -45,7 +45,7 @@ import java.util.Map;
**/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-doris";
private static final String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() {
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc.kafka;
import com.dlink.assertion.Asserts;
......@@ -28,6 +27,7 @@ import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
......@@ -56,7 +56,7 @@ import java.util.Map;
**/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "datastream-kafka";
private static final String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() {
}
......
......@@ -9,11 +9,7 @@ import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.ObjectConvertUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
......@@ -24,28 +20,28 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
/**
* @className: com.dlink.cdc.kafka.KafkaSinkSimpleBuilder
*/
public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-kafka-json";
private static final String KEY_WORD = "datastream-kafka-json";
private transient ObjectMapper objectMapper;
public KafkaSinkJsonBuilder() {
......@@ -71,7 +67,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
try{
try {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
......@@ -101,42 +97,42 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
List<String> columnNameList = new LinkedList<>();
List<LogicalType> columnTypeList = new LinkedList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
SingleOutputStreamOperator<String> stringOperator =filterOperator.process(new ProcessFunction<Map, String>() {
SingleOutputStreamOperator<String> stringOperator = filterOperator.process(new ProcessFunction<Map, String>() {
@Override
public void processElement(Map value, Context context, Collector<String> collector) throws Exception {
Map after = null;
Map before = null;
String ts_ms = value.get("ts_ms").toString();
String tsMs = value.get("ts_ms").toString();
try {
switch (value.get("op").toString()) {
case "r":
case "c":
after = (Map) value.get("after");
convertAttr(columnNameList,columnTypeList,after,value.get("op").toString(),0,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, after,value.get("op").toString(), 0, schemaName, tableName, tsMs);
break;
case "u":
before = (Map) value.get("before");
convertAttr(columnNameList,columnTypeList,before,value.get("op").toString(),1,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, before,value.get("op").toString(), 1, schemaName, tableName, tsMs);
after = (Map) value.get("after");
convertAttr(columnNameList,columnTypeList,after,value.get("op").toString(),0,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, after,value.get("op").toString(), 0, schemaName, tableName, tsMs);
break;
case "d":
before = (Map) value.get("before");
convertAttr(columnNameList,columnTypeList,before,value.get("op").toString(),1,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, before,value.get("op").toString(), 1,schemaName, tableName, tsMs);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Exception:", e);
throw e;
}
if(objectMapper == null){
if (objectMapper == null) {
initializeObjectMapper();
}
if(before != null){
if (before != null) {
collector.collect(objectMapper.writeValueAsString(before));
}
if(after != null){
if (after != null) {
collector.collect(objectMapper.writeValueAsString(after));
}
}
......@@ -147,7 +143,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
}
}
}
}catch (Exception ex){
} catch (Exception ex) {
logger.error("kafka sink error:",ex);
}
return dataStreamSource;
......@@ -176,18 +172,18 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
return ObjectConvertUtil.convertValue(value,logicalType);
}
private void convertAttr(List<String> columnNameList,List<LogicalType> columnTypeList,Map value,String op,int is_deleted,
String schemaName,String tableName,String ts_ms){
private void convertAttr(List<String> columnNameList, List<LogicalType> columnTypeList, Map value, String op, int isDeleted,
String schemaName, String tableName, String tsMs) {
for (int i = 0; i < columnNameList.size(); i++) {
String columnName = columnNameList.get(i);
Object columnNameValue = value.remove(columnName);
Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i));
value.put(columnName, columnNameNewVal);
}
value.put("__op",op);
value.put("is_deleted",Integer.valueOf(is_deleted));
value.put("db",schemaName);
value.put("table",tableName);
value.put("ts_ms",ts_ms);
value.put("__op", op);
value.put("is_deleted", Integer.valueOf(isDeleted));
value.put("db", schemaName);
value.put("table", tableName);
value.put("ts_ms", tsMs);
}
}
......@@ -17,9 +17,15 @@
*
*/
package com.dlink.cdc.mysql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -32,16 +38,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* MysqlCDCBuilder
......@@ -51,8 +50,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql";
private static final String KEY_WORD = "mysql-cdc";
private static final String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() {
}
......
package com.dlink.cdc.mysql;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.ConverterType;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
......@@ -12,12 +8,16 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.ConverterType;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
/**
* @version 1.0
* @className: com.dlink.cdc.mysql.MysqlJsonDebeziumDeserializationSchema
* @Description:
* @author: jack zhong
* @date 8/2/221:43 PM
*/
public class MysqlJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = 1L;
......
......@@ -17,9 +17,15 @@
*
*/
package com.dlink.cdc.oracle;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -28,12 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
......@@ -46,8 +46,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/
public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "oracle-cdc";
private final static String METADATA_TYPE = "Oracle";
private static final String KEY_WORD = "oracle-cdc";
private static final String METADATA_TYPE = "Oracle";
public OracleCDCBuilder() {
}
......
......@@ -17,9 +17,20 @@
*
*/
package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -34,31 +45,29 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter;
/**
* SQLSinkBuilder
......@@ -68,7 +77,7 @@ import org.apache.flink.util.OutputTag;
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
......@@ -168,7 +177,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
modifyOperations.add((ModifyOperation) operation);
}
}
}catch (Exception e) {
} catch (Exception e) {
logger.error("Translate to plan occur exception: {}", e);
throw e;
}
......@@ -192,7 +201,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
if (Asserts.isNotNullString(timeZone)) {
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
......
......@@ -6,23 +6,20 @@ import com.dlink.model.Column;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import com.dlink.utils.ObjectConvertUtil;
import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
......@@ -30,13 +27,17 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
/**
* StarrocksSinkBuilder
*
**/
public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-starrocks";
private static final String KEY_WORD = "datastream-starrocks";
private static final long serialVersionUID = 8330362249137431824L;
private final ZoneId sinkZoneIdUTC = ZoneId.of("UTC");
......@@ -64,23 +65,23 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
try{
try {
List<Column> columns = table.getColumns();
List<String> primaryKeys = new LinkedList<>();
String[] columnNames = new String[columns.size()];
for(int i=0;i<columns.size();i++){
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
if(column.isKeyFlag()){
if (column.isKeyFlag()) {
primaryKeys.add(column.getName());
}
columnNames[i] = column.getName();
}
String[] primaryKeyArrays = primaryKeys.stream().toArray(String[]::new);
DataType[] dataTypes = new DataType[columnTypeList.size()];
for(int i = 0 ; i < columnTypeList.size() ; i++){
for (int i = 0; i < columnTypeList.size(); i++) {
LogicalType logicalType = columnTypeList.get(i);
String columnName = columnNameList.get(i);
if(primaryKeys.contains(columnName)){
if (primaryKeys.contains(columnName)) {
logicalType = logicalType.copy(false);
}
dataTypes[i] = TypeConversions.fromLogicalToDataType(logicalType);
......@@ -98,8 +99,8 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
.withProperty("sink.properties.strip_outer_array", "true")
// 设置并行度,多并行度情况下需要考虑如何保证数据有序性
.withProperty("sink.parallelism", "1");
sink.forEach((key,value)->{
if(key.startsWith("sink.")){
sink.forEach((key,value) -> {
if (key.startsWith("sink.")) {
builder.withProperty(key,value);
}
});
......@@ -110,7 +111,7 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
);
rowDataDataStream.addSink(starrocksSinkFunction);
logger.info("handler connector name:{} sink successful.....",getHandle());
}catch (Exception ex){
} catch (Exception ex) {
logger.error("handler connector name:{} sink ex:",getHandle(),ex);
}
}
......@@ -121,9 +122,9 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
if (object == null) {
return null;
}
if(logicalType instanceof TimestampType && object instanceof LocalDateTime){
if (logicalType instanceof TimestampType && object instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime) object);
}else if(logicalType instanceof DateType){
} else if (logicalType instanceof DateType) {
if (value instanceof Integer) {
return Instant.ofEpochSecond((int) value).atZone(sinkZoneIdUTC).toEpochSecond();
}
......
......@@ -17,14 +17,11 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
......@@ -41,7 +38,6 @@ import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
......@@ -50,40 +46,24 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaResolver;
import org.apache.flink.table.catalog.SchemaTranslator;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
import org.apache.flink.table.operations.JavaExternalQueryOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.planner.delegation.DefaultExecutor;
import org.apache.flink.table.typeutils.FieldInfoUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -91,7 +71,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* 定制TableEnvironmentImpl
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
......@@ -34,11 +33,21 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
/**
* 定制TableResultImpl
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
......
package com.dlink.utils;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import javax.xml.bind.DatatypeConverter;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import javax.xml.bind.DatatypeConverter;
/**
* @className: com.dlink.utils.ObjectConvertUtil
* @Description:
* @author: jack zhong
* @date 8/10/222:49 PM
*/
public class ObjectConvertUtil {
public static Object convertValue(Object value, LogicalType logicalType){
public static Object convertValue(Object value, LogicalType logicalType) {
return ObjectConvertUtil.convertValue(value,logicalType,null);
}
......@@ -23,7 +28,7 @@ public class ObjectConvertUtil {
if (value == null) {
return null;
}
if(sinkTimeZone == null){
if (sinkTimeZone == null) {
sinkTimeZone = ZoneId.of("UTC");
}
if (logicalType instanceof DateType) {
......
......@@ -17,33 +17,54 @@
*
*/
package org.apache.flink.table.types.extraction;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.shaded.asm7.org.objectweb.asm.*;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.*;
import java.util.*;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
import javax.annotation.Nullable;
/**
* Utilities for performing reflection tasks.
......@@ -384,8 +405,7 @@ public final class ExtractionUtils {
/**
* Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise.
*/
static @Nullable
Class<?> toClass(Type type) {
static @Nullable Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
......@@ -457,8 +477,7 @@ public final class ExtractionUtils {
return variable;
}
private static @Nullable
Type resolveVariableInParameterizedType(
private static @Nullable Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
......@@ -640,8 +659,7 @@ public final class ExtractionUtils {
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
public static @Nullable
AssigningConstructor extractAssigningConstructor(
public static @Nullable AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
......@@ -668,8 +686,7 @@ public final class ExtractionUtils {
/**
* Extracts the parameter names of a method if possible.
*/
static @Nullable
List<String> extractMethodParameterNames(Method method) {
static @Nullable List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
......@@ -677,8 +694,7 @@ public final class ExtractionUtils {
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive and lenient) type and name.
*/
private static @Nullable
List<String> extractConstructorParameterNames(
private static @Nullable List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
......@@ -715,8 +731,7 @@ public final class ExtractionUtils {
return fieldNames;
}
private static @Nullable
List<String> extractExecutableNames(Executable executable) {
private static @Nullable List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.*;
import com.dlink.model.Column;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
......@@ -32,20 +35,41 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.*;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* AbstractCDCBuilder
......@@ -111,6 +135,7 @@ public abstract class AbstractSinkBuilder {
}
});
}
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
......@@ -118,6 +143,7 @@ public abstract class AbstractSinkBuilder {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
......
......@@ -17,17 +17,16 @@
*
*/
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,15 +17,14 @@
*
*/
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* SinkBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
......
......@@ -17,13 +17,13 @@
*
*/
package com.dlink.cdc.doris;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
......@@ -45,7 +45,7 @@ import java.util.Map;
**/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-doris";
private static final String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() {
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc.kafka;
import com.dlink.assertion.Asserts;
......@@ -28,6 +27,7 @@ import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
......@@ -56,7 +56,7 @@ import java.util.Map;
**/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "datastream-kafka";
private static final String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() {
}
......
......@@ -9,10 +9,7 @@ import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.ObjectConvertUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
......@@ -23,27 +20,28 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
/**
* @className: com.dlink.cdc.kafka.KafkaSinkSimpleBuilder
*/
public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-kafka-json";
private static final String KEY_WORD = "datastream-kafka-json";
private transient ObjectMapper objectMapper;
public KafkaSinkJsonBuilder() {
......@@ -69,7 +67,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
try{
try {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
......@@ -99,42 +97,42 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
List<String> columnNameList = new LinkedList<>();
List<LogicalType> columnTypeList = new LinkedList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
SingleOutputStreamOperator<String> stringOperator =filterOperator.process(new ProcessFunction<Map, String>() {
SingleOutputStreamOperator<String> stringOperator = filterOperator.process(new ProcessFunction<Map, String>() {
@Override
public void processElement(Map value, Context context, Collector<String> collector) throws Exception {
Map after = null;
Map before = null;
String ts_ms = value.get("ts_ms").toString();
String tsMs = value.get("ts_ms").toString();
try {
switch (value.get("op").toString()) {
case "r":
case "c":
after = (Map) value.get("after");
convertAttr(columnNameList,columnTypeList,after,value.get("op").toString(),0,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, after, value.get("op").toString(), 0, schemaName, tableName, tsMs);
break;
case "u":
before = (Map) value.get("before");
convertAttr(columnNameList,columnTypeList,before,value.get("op").toString(),1,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs);
after = (Map) value.get("after");
convertAttr(columnNameList,columnTypeList,after,value.get("op").toString(),0,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, after,value.get("op").toString(), 0, schemaName, tableName, tsMs);
break;
case "d":
before = (Map) value.get("before");
convertAttr(columnNameList,columnTypeList,before,value.get("op").toString(),1,schemaName,tableName,ts_ms);
convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Exception:", e);
throw e;
}
if(objectMapper == null){
if (objectMapper == null) {
initializeObjectMapper();
}
if(before != null){
if (before != null) {
collector.collect(objectMapper.writeValueAsString(before));
}
if(after != null){
if (after != null) {
collector.collect(objectMapper.writeValueAsString(after));
}
}
......@@ -145,7 +143,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
}
}
}
}catch (Exception ex){
} catch (Exception ex) {
logger.error("kafka sink error:",ex);
}
return dataStreamSource;
......@@ -174,18 +172,18 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
return ObjectConvertUtil.convertValue(value,logicalType);
}
private void convertAttr(List<String> columnNameList,List<LogicalType> columnTypeList,Map value,String op,int is_deleted,
String schemaName,String tableName,String ts_ms){
private void convertAttr(List<String> columnNameList, List<LogicalType> columnTypeList, Map value, String op, int isDeleted,
String schemaName, String tableName, String tsMs) {
for (int i = 0; i < columnNameList.size(); i++) {
String columnName = columnNameList.get(i);
Object columnNameValue = value.remove(columnName);
Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i));
value.put(columnName, columnNameNewVal);
}
value.put("__op",op);
value.put("is_deleted",Integer.valueOf(is_deleted));
value.put("db",schemaName);
value.put("table",tableName);
value.put("ts_ms",ts_ms);
value.put("__op", op);
value.put("is_deleted", Integer.valueOf(isDeleted));
value.put("db", schemaName);
value.put("table", tableName);
value.put("ts_ms", tsMs);
}
}
......@@ -17,9 +17,15 @@
*
*/
package com.dlink.cdc.mysql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -32,16 +38,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* MysqlCDCBuilder
......@@ -51,8 +50,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql";
private static final String KEY_WORD = "mysql-cdc";
private static final String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() {
}
......
package com.dlink.cdc.mysql;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.ConverterType;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
......@@ -12,12 +8,16 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.ConverterType;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
/**
* @version 1.0
* @className: com.dlink.cdc.mysql.MysqlJsonDebeziumDeserializationSchema
* @Description:
* @author: jack zhong
* @date 8/2/221:43 PM
*/
public class MysqlJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = 1L;
......
......@@ -17,9 +17,15 @@
*
*/
package com.dlink.cdc.oracle;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -28,12 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
......@@ -46,8 +46,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/
public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "oracle-cdc";
private final static String METADATA_TYPE = "Oracle";
private static final String KEY_WORD = "oracle-cdc";
private static final String METADATA_TYPE = "Oracle";
public OracleCDCBuilder() {
}
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
......@@ -31,6 +30,7 @@ import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -45,19 +45,29 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.xml.bind.DatatypeConverter;
/**
* SQLSinkBuilder
......@@ -67,7 +77,7 @@ import java.util.*;
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
......@@ -167,7 +177,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
modifyOperations.add((ModifyOperation) operation);
}
}
}catch (Exception e) {
} catch (Exception e) {
logger.error("Translate to plan occur exception: {}", e);
throw e;
}
......@@ -191,7 +201,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
if (Asserts.isNotNullString(timeZone)) {
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
......
......@@ -17,9 +17,11 @@
*
*/
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
......@@ -64,8 +66,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.dlink.assertion.Asserts;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
......
......@@ -17,9 +17,6 @@
*
*/
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
......@@ -41,8 +38,6 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
......@@ -56,6 +51,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
/** Implementation for {@link TableResult}. */
@Internal
public class CustomTableResultImpl implements TableResultInternal {
......
......@@ -17,8 +17,6 @@
*
*/
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
......
......@@ -17,10 +17,8 @@
*
*/
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
......@@ -52,7 +51,6 @@ public class FlinkUtil {
}
}
public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames();
}
......
package com.dlink.utils;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import javax.xml.bind.DatatypeConverter;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import javax.xml.bind.DatatypeConverter;
/**
* @className: com.dlink.utils.ObjectConvertUtil
* @Description:
* @author: jack zhong
* @date 8/10/222:49 PM
*/
public class ObjectConvertUtil {
public static Object convertValue(Object value, LogicalType logicalType){
public static Object convertValue(Object value, LogicalType logicalType) {
return ObjectConvertUtil.convertValue(value,logicalType,null);
}
......@@ -23,7 +28,7 @@ public class ObjectConvertUtil {
if (value == null) {
return null;
}
if(sinkTimeZone == null){
if (sinkTimeZone == null) {
sinkTimeZone = ZoneId.of("UTC");
}
if (logicalType instanceof DateType) {
......
......@@ -17,16 +17,16 @@
*
*/
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
......@@ -36,13 +36,11 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.types.Row;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
......@@ -89,7 +87,7 @@ public interface CustomTableEnvironment {
<T> void createTemporaryView(String path, DataStream<T> dataStream, String fields);
// <T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
// <T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
Parser getParser();
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.model;
import java.util.List;
......
......@@ -17,7 +17,6 @@
*
*/
package com.dlink.utils;
import com.dlink.constant.FlinkParamConstant;
......@@ -25,6 +24,7 @@ import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.flink.api.java.utils.ParameterTool;
import java.util.ArrayList;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment