Commit f6d81b81 authored by wenmo's avatar wenmo

client format

parent 2a471036
......@@ -21,10 +21,10 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if(Asserts.isNotNull(config.getParallelism())){
if (Asserts.isNotNull(config.getParallelism())) {
env.setParallelism(config.getParallelism());
}
if(Asserts.isNotNull(config.getCheckpoint())){
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
......@@ -32,16 +32,16 @@ public class FlinkCDCMergeBuilder {
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if(Asserts.isNotNull(config.getDatabase())&&config.getDatabase().size()>0){
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSourceBuilder<String> builder = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema());
if(Asserts.isNotNullString(config.getStartupMode())){
switch (config.getStartupMode().toUpperCase()){
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
builder.startupOptions(StartupOptions.initial());
break;
......@@ -54,12 +54,12 @@ public class FlinkCDCMergeBuilder {
default:
builder.startupOptions(StartupOptions.latest());
}
}else {
} else {
builder.startupOptions(StartupOptions.latest());
}
MySqlSource<String> sourceFunction = builder.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
......
......@@ -17,6 +17,7 @@ import java.util.Optional;
/**
* 定制TableResultImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
......@@ -49,12 +50,12 @@ class CustomTableResultImpl implements TableResult {
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields,List<Row> rows){
public static TableResult buildTableResult(List<TableSchemaField> fields, List<Row> rows) {
Builder builder = builder().resultKind(ResultKind.SUCCESS);
if(fields.size()>0) {
if (fields.size() > 0) {
TableSchema.Builder tableSchemaBuild = TableSchema.builder();
for (int i = 0; i < fields.size(); i++) {
tableSchemaBuild.field(fields.get(i).getName(),fields.get(i).getType());
tableSchemaBuild.field(fields.get(i).getName(), fields.get(i).getType());
}
builder.tableSchema(tableSchemaBuild.build()).data(rows);
}
......@@ -109,7 +110,9 @@ class CustomTableResultImpl implements TableResult {
return new Builder();
}
/** Builder for creating a {@link CustomTableResultImpl}. */
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public static class Builder {
private JobClient jobClient = null;
private TableSchema tableSchema = null;
......@@ -118,7 +121,8 @@ class CustomTableResultImpl implements TableResult {
private PrintStyle printStyle =
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false);
private Builder() {}
private Builder() {
}
/**
* Specifies job client which associates the submitted Flink job.
......@@ -174,20 +178,26 @@ class CustomTableResultImpl implements TableResult {
return this;
}
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
/** Returns a {@link TableResult} instance. */
/**
* Returns a {@link TableResult} instance.
*/
public TableResult build() {
return new CustomTableResultImpl(jobClient, tableSchema, resultKind, data, printStyle);
}
}
/** Root interface for all print styles. */
/**
* Root interface for all print styles.
*/
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, and a flag to
......@@ -211,7 +221,9 @@ class CustomTableResultImpl implements TableResult {
}
}
/** print the result schema and content as tableau form. */
/**
* print the result schema and content as tableau form.
*/
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
......@@ -245,5 +257,6 @@ class CustomTableResultImpl implements TableResult {
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {}
private static final class RawContentStyle implements PrintStyle {
}
}
......@@ -17,18 +17,18 @@ import java.util.Optional;
*/
public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table){
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table) {
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return Arrays.asList(tableOpt.get().getResolvedSchema().getFieldNames());
}else{
} else {
return new ArrayList<String>();
}
}
public static List<String> catchColumn(TableResult tableResult){
public static List<String> catchColumn(TableResult tableResult) {
return Arrays.asList(tableResult.getTableSchema().getFieldNames());
}
}
......@@ -21,48 +21,28 @@ package org.apache.flink.table.types.extraction;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.shaded.asm7.org.objectweb.asm.*;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.lang.reflect.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
/** Utilities for performing reflection tasks. */
/**
* Utilities for performing reflection tasks.
*/
@Internal
public final class ExtractionUtils {
......@@ -70,7 +50,9 @@ public final class ExtractionUtils {
// Methods shared across packages
// --------------------------------------------------------------------------------------------
/** Collects methods of the given name. */
/**
* Collects methods of the given name.
*/
public static List<Method> collectMethods(Class<?> function, String methodName) {
return Arrays.stream(function.getMethods())
.filter(method -> method.getName().equals(methodName))
......@@ -130,7 +112,9 @@ public final class ExtractionUtils {
return clz == null || ExtractionUtils.isAssignable(clz, param, true);
}
/** Creates a method signature string like {@code int eval(Integer, String)}. */
/**
* Creates a method signature string like {@code int eval(Integer, String)}.
*/
public static String createMethodSignatureString(
String methodName, Class<?>[] parameters, @Nullable Class<?> returnType) {
final StringBuilder builder = new StringBuilder();
......@@ -299,7 +283,9 @@ public final class ExtractionUtils {
return false;
}
/** Checks whether a field is directly readable without a getter. */
/**
* Checks whether a field is directly readable without a getter.
*/
public static boolean isStructuredFieldDirectlyReadable(Field field) {
final int m = field.getModifiers();
......@@ -307,7 +293,9 @@ public final class ExtractionUtils {
return Modifier.isPublic(m);
}
/** Checks whether a field is directly writable without a setter or constructor. */
/**
* Checks whether a field is directly writable without a setter or constructor.
*/
public static boolean isStructuredFieldDirectlyWritable(Field field) {
final int m = field.getModifiers();
......@@ -324,12 +312,16 @@ public final class ExtractionUtils {
// Methods intended for this package
// --------------------------------------------------------------------------------------------
/** Helper method for creating consistent exceptions during extraction. */
/**
* Helper method for creating consistent exceptions during extraction.
*/
static ValidationException extractionError(String message, Object... args) {
return extractionError(null, message, args);
}
/** Helper method for creating consistent exceptions during extraction. */
/**
* Helper method for creating consistent exceptions during extraction.
*/
static ValidationException extractionError(Throwable cause, String message, Object... args) {
return new ValidationException(String.format(message, args), cause);
}
......@@ -358,8 +350,11 @@ public final class ExtractionUtils {
return typeHierarchy;
}
/** Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise. */
static @Nullable Class<?> toClass(Type type) {
/**
* Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise.
*/
static @Nullable
Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
......@@ -370,7 +365,9 @@ public final class ExtractionUtils {
return null;
}
/** Creates a raw data type. */
/**
* Creates a raw data type.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static DataType createRawType(
DataTypeFactory typeFactory,
......@@ -404,7 +401,9 @@ public final class ExtractionUtils {
}
}
/** Resolves a {@link TypeVariable} using the given type hierarchy if possible. */
/**
* Resolves a {@link TypeVariable} using the given type hierarchy if possible.
*/
static Type resolveVariable(List<Type> typeHierarchy, TypeVariable<?> variable) {
// iterate through hierarchy from top to bottom until type variable gets a non-variable
// assigned
......@@ -427,7 +426,8 @@ public final class ExtractionUtils {
return variable;
}
private static @Nullable Type resolveVariableInParameterizedType(
private static @Nullable
Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
......@@ -465,7 +465,9 @@ public final class ExtractionUtils {
}
}
/** Returns the fields of a class for a {@link StructuredType}. */
/**
* Returns the fields of a class for a {@link StructuredType}.
*/
static List<Field> collectStructuredFields(Class<?> clazz) {
final List<Field> fields = new ArrayList<>();
while (clazz != Object.class) {
......@@ -482,7 +484,9 @@ public final class ExtractionUtils {
return fields;
}
/** Validates if a field is properly readable either directly or through a getter. */
/**
* Validates if a field is properly readable either directly or through a getter.
*/
static void validateStructuredFieldReadability(Class<?> clazz, Field field) {
// field is accessible
if (isStructuredFieldDirectlyReadable(field)) {
......@@ -525,7 +529,9 @@ public final class ExtractionUtils {
field.getName(), clazz.getName());
}
/** Returns the boxed type of a primitive type. */
/**
* Returns the boxed type of a primitive type.
*/
static Type primitiveToWrapper(Type type) {
if (type instanceof Class) {
return primitiveToWrapper((Class<?>) type);
......@@ -533,7 +539,9 @@ public final class ExtractionUtils {
return type;
}
/** Collects all methods that qualify as methods of a {@link StructuredType}. */
/**
* Collects all methods that qualify as methods of a {@link StructuredType}.
*/
static List<Method> collectStructuredMethods(Class<?> clazz) {
final List<Method> methods = new ArrayList<>();
while (clazz != Object.class) {
......@@ -584,7 +592,9 @@ public final class ExtractionUtils {
// Parameter Extraction Utilities
// --------------------------------------------------------------------------------------------
/** Result of the extraction in {@link #extractAssigningConstructor(Class, List)}. */
/**
* Result of the extraction in {@link #extractAssigningConstructor(Class, List)}.
*/
static class AssigningConstructor {
public final Constructor<?> constructor;
public final List<String> parameterNames;
......@@ -599,7 +609,8 @@ public final class ExtractionUtils {
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
static @Nullable AssigningConstructor extractAssigningConstructor(
static @Nullable
AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
......@@ -623,8 +634,11 @@ public final class ExtractionUtils {
return foundConstructor;
}
/** Extracts the parameter names of a method if possible. */
static @Nullable List<String> extractMethodParameterNames(Method method) {
/**
* Extracts the parameter names of a method if possible.
*/
static @Nullable
List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
......@@ -632,7 +646,8 @@ public final class ExtractionUtils {
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive) type and name.
*/
private static @Nullable List<String> extractConstructorParameterNames(
private static @Nullable
List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
......@@ -659,7 +674,8 @@ public final class ExtractionUtils {
return parameterNames;
}
private static @Nullable List<String> extractExecutableNames(Executable executable) {
private static @Nullable
List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
......@@ -705,7 +721,7 @@ public final class ExtractionUtils {
private static ClassReader getClassReader(Class<?> cls) {
final String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
if(ClassPool.exist(cls.getName())){
if (ClassPool.exist(cls.getName())) {
return new ClassReader(ClassPool.get(cls.getName()).getClassByte());
}
try {
......@@ -876,7 +892,9 @@ public final class ExtractionUtils {
return toClass.isAssignableFrom(cls);
}
/** Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */
/**
* Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
*/
private static final Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
static {
......@@ -891,7 +909,9 @@ public final class ExtractionUtils {
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
/** Maps wrapper {@code Class}es to their corresponding primitive types. */
/**
* Maps wrapper {@code Class}es to their corresponding primitive types.
*/
private static final Map<Class<?>, Class<?>> wrapperPrimitiveMap = new HashMap<>();
static {
......
......@@ -21,10 +21,10 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if(Asserts.isNotNull(config.getParallelism())){
if (Asserts.isNotNull(config.getParallelism())) {
env.setParallelism(config.getParallelism());
}
if(Asserts.isNotNull(config.getCheckpoint())){
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
......@@ -32,16 +32,16 @@ public class FlinkCDCMergeBuilder {
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if(Asserts.isNotNull(config.getDatabase())&&config.getDatabase().size()>0){
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSourceBuilder<String> builder = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema());
if(Asserts.isNotNullString(config.getStartupMode())){
switch (config.getStartupMode().toUpperCase()){
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
builder.startupOptions(StartupOptions.initial());
break;
......@@ -54,12 +54,12 @@ public class FlinkCDCMergeBuilder {
default:
builder.startupOptions(StartupOptions.latest());
}
}else {
} else {
builder.startupOptions(StartupOptions.latest());
}
MySqlSource<String> sourceFunction = builder.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
......
......@@ -2,11 +2,7 @@ package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.*;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
......@@ -18,15 +14,11 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
/**
* 定制CustomTableResultImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
......@@ -60,12 +52,12 @@ class CustomTableResultImpl implements TableResult {
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields,List<Row> rows){
public static TableResult buildTableResult(List<TableSchemaField> fields, List<Row> rows) {
Builder builder = builder().resultKind(ResultKind.SUCCESS);
if(fields.size()>0) {
if (fields.size() > 0) {
TableSchema.Builder tableSchemaBuild = TableSchema.builder();
for (int i = 0; i < fields.size(); i++) {
tableSchemaBuild.field(fields.get(i).getName(),fields.get(i).getType());
tableSchemaBuild.field(fields.get(i).getName(), fields.get(i).getType());
}
builder.tableSchema(tableSchemaBuild.build()).data(rows);
}
......@@ -169,7 +161,9 @@ class CustomTableResultImpl implements TableResult {
return new Builder();
}
/** Builder for creating a {@link CustomTableResultImpl}. */
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public static class Builder {
private JobClient jobClient = null;
private TableSchema tableSchema = null;
......@@ -178,7 +172,8 @@ class CustomTableResultImpl implements TableResult {
private PrintStyle printStyle =
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false, false);
private Builder() {}
private Builder() {
}
/**
* Specifies job client which associates the submitted Flink job.
......@@ -234,20 +229,26 @@ class CustomTableResultImpl implements TableResult {
return this;
}
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
/** Returns a {@link TableResult} instance. */
/**
* Returns a {@link TableResult} instance.
*/
public TableResult build() {
return new CustomTableResultImpl(jobClient, tableSchema, resultKind, data, printStyle);
}
}
/** Root interface for all print styles. */
/**
* Root interface for all print styles.
*/
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, change mode
......@@ -275,7 +276,9 @@ class CustomTableResultImpl implements TableResult {
}
}
/** print the result schema and content as tableau form. */
/**
* print the result schema and content as tableau form.
*/
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
......@@ -285,7 +288,9 @@ class CustomTableResultImpl implements TableResult {
private final int maxColumnWidth;
private final String nullColumn;
/** A flag to indicate whether print row kind info. */
/**
* A flag to indicate whether print row kind info.
*/
private final boolean printRowKind;
private TableauStyle(
......@@ -319,7 +324,8 @@ class CustomTableResultImpl implements TableResult {
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {}
private static final class RawContentStyle implements PrintStyle {
}
/**
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
......
......@@ -17,18 +17,18 @@ import java.util.Optional;
*/
public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table){
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table) {
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return Arrays.asList(tableOpt.get().getResolvedSchema().getFieldNames());
}else{
} else {
return new ArrayList<String>();
}
}
public static List<String> catchColumn(TableResult tableResult){
public static List<String> catchColumn(TableResult tableResult) {
return Arrays.asList(tableResult.getTableSchema().getFieldNames());
}
}
......@@ -21,48 +21,28 @@ package org.apache.flink.table.types.extraction;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.shaded.asm7.org.objectweb.asm.*;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.lang.reflect.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
/** Utilities for performing reflection tasks. */
/**
* Utilities for performing reflection tasks.
*/
@Internal
public final class ExtractionUtils {
......@@ -70,7 +50,9 @@ public final class ExtractionUtils {
// Methods shared across packages
// --------------------------------------------------------------------------------------------
/** Collects methods of the given name. */
/**
* Collects methods of the given name.
*/
public static List<Method> collectMethods(Class<?> function, String methodName) {
return Arrays.stream(function.getMethods())
.filter(method -> method.getName().equals(methodName))
......@@ -130,7 +112,9 @@ public final class ExtractionUtils {
return clz == null || ExtractionUtils.isAssignable(clz, param, true);
}
/** Creates a method signature string like {@code int eval(Integer, String)}. */
/**
* Creates a method signature string like {@code int eval(Integer, String)}.
*/
public static String createMethodSignatureString(
String methodName, Class<?>[] parameters, @Nullable Class<?> returnType) {
final StringBuilder builder = new StringBuilder();
......@@ -299,7 +283,9 @@ public final class ExtractionUtils {
return false;
}
/** Checks whether a field is directly readable without a getter. */
/**
* Checks whether a field is directly readable without a getter.
*/
public static boolean isStructuredFieldDirectlyReadable(Field field) {
final int m = field.getModifiers();
......@@ -307,7 +293,9 @@ public final class ExtractionUtils {
return Modifier.isPublic(m);
}
/** Checks whether a field is directly writable without a setter or constructor. */
/**
* Checks whether a field is directly writable without a setter or constructor.
*/
public static boolean isStructuredFieldDirectlyWritable(Field field) {
final int m = field.getModifiers();
......@@ -324,12 +312,16 @@ public final class ExtractionUtils {
// Methods intended for this package
// --------------------------------------------------------------------------------------------
/** Helper method for creating consistent exceptions during extraction. */
/**
* Helper method for creating consistent exceptions during extraction.
*/
static ValidationException extractionError(String message, Object... args) {
return extractionError(null, message, args);
}
/** Helper method for creating consistent exceptions during extraction. */
/**
* Helper method for creating consistent exceptions during extraction.
*/
static ValidationException extractionError(Throwable cause, String message, Object... args) {
return new ValidationException(String.format(message, args), cause);
}
......@@ -358,8 +350,11 @@ public final class ExtractionUtils {
return typeHierarchy;
}
/** Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise. */
static @Nullable Class<?> toClass(Type type) {
/**
* Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise.
*/
static @Nullable
Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
......@@ -370,7 +365,9 @@ public final class ExtractionUtils {
return null;
}
/** Creates a raw data type. */
/**
* Creates a raw data type.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static DataType createRawType(
DataTypeFactory typeFactory,
......@@ -404,7 +401,9 @@ public final class ExtractionUtils {
}
}
/** Resolves a {@link TypeVariable} using the given type hierarchy if possible. */
/**
* Resolves a {@link TypeVariable} using the given type hierarchy if possible.
*/
static Type resolveVariable(List<Type> typeHierarchy, TypeVariable<?> variable) {
// iterate through hierarchy from top to bottom until type variable gets a non-variable
// assigned
......@@ -427,7 +426,8 @@ public final class ExtractionUtils {
return variable;
}
private static @Nullable Type resolveVariableInParameterizedType(
private static @Nullable
Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
......@@ -465,7 +465,9 @@ public final class ExtractionUtils {
}
}
/** Returns the fields of a class for a {@link StructuredType}. */
/**
* Returns the fields of a class for a {@link StructuredType}.
*/
static List<Field> collectStructuredFields(Class<?> clazz) {
final List<Field> fields = new ArrayList<>();
while (clazz != Object.class) {
......@@ -482,7 +484,9 @@ public final class ExtractionUtils {
return fields;
}
/** Validates if a field is properly readable either directly or through a getter. */
/**
* Validates if a field is properly readable either directly or through a getter.
*/
static void validateStructuredFieldReadability(Class<?> clazz, Field field) {
// field is accessible
if (isStructuredFieldDirectlyReadable(field)) {
......@@ -525,7 +529,9 @@ public final class ExtractionUtils {
field.getName(), clazz.getName());
}
/** Returns the boxed type of a primitive type. */
/**
* Returns the boxed type of a primitive type.
*/
static Type primitiveToWrapper(Type type) {
if (type instanceof Class) {
return primitiveToWrapper((Class<?>) type);
......@@ -533,7 +539,9 @@ public final class ExtractionUtils {
return type;
}
/** Collects all methods that qualify as methods of a {@link StructuredType}. */
/**
* Collects all methods that qualify as methods of a {@link StructuredType}.
*/
static List<Method> collectStructuredMethods(Class<?> clazz) {
final List<Method> methods = new ArrayList<>();
while (clazz != Object.class) {
......@@ -584,7 +592,9 @@ public final class ExtractionUtils {
// Parameter Extraction Utilities
// --------------------------------------------------------------------------------------------
/** Result of the extraction in {@link #extractAssigningConstructor(Class, List)}. */
/**
* Result of the extraction in {@link #extractAssigningConstructor(Class, List)}.
*/
static class AssigningConstructor {
public final Constructor<?> constructor;
public final List<String> parameterNames;
......@@ -599,7 +609,8 @@ public final class ExtractionUtils {
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
static @Nullable AssigningConstructor extractAssigningConstructor(
static @Nullable
AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
......@@ -623,8 +634,11 @@ public final class ExtractionUtils {
return foundConstructor;
}
/** Extracts the parameter names of a method if possible. */
static @Nullable List<String> extractMethodParameterNames(Method method) {
/**
* Extracts the parameter names of a method if possible.
*/
static @Nullable
List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
......@@ -632,7 +646,8 @@ public final class ExtractionUtils {
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive) type and name.
*/
private static @Nullable List<String> extractConstructorParameterNames(
private static @Nullable
List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
......@@ -659,7 +674,8 @@ public final class ExtractionUtils {
return parameterNames;
}
private static @Nullable List<String> extractExecutableNames(Executable executable) {
private static @Nullable
List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
......@@ -705,7 +721,7 @@ public final class ExtractionUtils {
private static ClassReader getClassReader(Class<?> cls) {
final String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
if(ClassPool.exist(cls.getName())){
if (ClassPool.exist(cls.getName())) {
return new ClassReader(ClassPool.get(cls.getName()).getClassByte());
}
try {
......@@ -876,7 +892,9 @@ public final class ExtractionUtils {
return toClass.isAssignableFrom(cls);
}
/** Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */
/**
* Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
*/
private static final Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
static {
......@@ -891,7 +909,9 @@ public final class ExtractionUtils {
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
/** Maps wrapper {@code Class}es to their corresponding primitive types. */
/**
* Maps wrapper {@code Class}es to their corresponding primitive types.
*/
private static final Map<Class<?>, Class<?>> wrapperPrimitiveMap = new HashMap<>();
static {
......
......@@ -21,10 +21,10 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if(Asserts.isNotNull(config.getParallelism())){
if (Asserts.isNotNull(config.getParallelism())) {
env.setParallelism(config.getParallelism());
}
if(Asserts.isNotNull(config.getCheckpoint())){
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
......@@ -32,16 +32,16 @@ public class FlinkCDCMergeBuilder {
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if(Asserts.isNotNull(config.getDatabase())&&config.getDatabase().size()>0){
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSourceBuilder<String> builder = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema());
if(Asserts.isNotNullString(config.getStartupMode())){
switch (config.getStartupMode().toUpperCase()){
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
builder.startupOptions(StartupOptions.initial());
break;
......@@ -54,12 +54,12 @@ public class FlinkCDCMergeBuilder {
default:
builder.startupOptions(StartupOptions.latest());
}
}else {
} else {
builder.startupOptions(StartupOptions.latest());
}
MySqlSource<String> sourceFunction = builder.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
......
......@@ -235,7 +235,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
if(Asserts.isNullString(key)||Asserts.isNullString(value)){
if (Asserts.isNullString(key) || Asserts.isNullString(value)) {
return;
}
Map<String, String> confMap = new HashMap<>();
......@@ -250,7 +250,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
private void callReset(ResetOperation resetOperation, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
if(Asserts.isNullString(key)){
if (Asserts.isNullString(key)) {
return;
}
Map<String, String> confMap = new HashMap<>();
......
......@@ -17,20 +17,12 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.*;
import java.util.concurrent.*;
/**
* 定制CustomTableResultImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
......@@ -68,16 +60,16 @@ public class CustomTableResultImpl implements TableResult {
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields,List<Row> rows){
public static TableResult buildTableResult(List<TableSchemaField> fields, List<Row> rows) {
Builder builder = builder().resultKind(ResultKind.SUCCESS);
if(fields.size()>0) {
if (fields.size() > 0) {
List<String> columnNames = new ArrayList<>();
List<DataType> columnTypes = new ArrayList<>();
for (int i = 0; i < fields.size(); i++) {
columnNames.add(fields.get(i).getName());
columnTypes.add(fields.get(i).getType());
}
builder.schema(ResolvedSchema.physical(columnNames,columnTypes)).data(rows);
builder.schema(ResolvedSchema.physical(columnNames, columnTypes)).data(rows);
}
return builder.build();
}
......@@ -184,7 +176,9 @@ public class CustomTableResultImpl implements TableResult {
return new Builder();
}
/** Builder for creating a {@link CustomTableResultImpl}. */
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public static class Builder {
private JobClient jobClient = null;
private ResolvedSchema resolvedSchema = null;
......@@ -194,7 +188,8 @@ public class CustomTableResultImpl implements TableResult {
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false, false);
private ZoneId sessionTimeZone = ZoneId.of("UTC");
private Builder() {}
private Builder() {
}
/**
* Specifies job client which associates the submitted Flink job.
......@@ -250,28 +245,36 @@ public class CustomTableResultImpl implements TableResult {
return this;
}
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
/** Specifies session time zone. */
/**
* Specifies session time zone.
*/
public Builder setSessionTimeZone(ZoneId sessionTimeZone) {
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
this.sessionTimeZone = sessionTimeZone;
return this;
}
/** Returns a {@link TableResult} instance. */
/**
* Returns a {@link TableResult} instance.
*/
public TableResult build() {
return new CustomTableResultImpl(
jobClient, resolvedSchema, resultKind, data, printStyle, sessionTimeZone);
}
}
/** Root interface for all print styles. */
/**
* Root interface for all print styles.
*/
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, change mode
......@@ -299,7 +302,9 @@ public class CustomTableResultImpl implements TableResult {
}
}
/** print the result schema and content as tableau form. */
/**
* print the result schema and content as tableau form.
*/
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
......@@ -309,7 +314,9 @@ public class CustomTableResultImpl implements TableResult {
private final int maxColumnWidth;
private final String nullColumn;
/** A flag to indicate whether print row kind info. */
/**
* A flag to indicate whether print row kind info.
*/
private final boolean printRowKind;
private TableauStyle(
......@@ -343,7 +350,8 @@ public class CustomTableResultImpl implements TableResult {
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {}
private static final class RawContentStyle implements PrintStyle {
}
/**
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
......
......@@ -4,7 +4,9 @@ import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
/**
* FlinkUtil
......@@ -14,18 +16,18 @@ import java.util.*;
*/
public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table){
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table) {
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return tableOpt.get().getResolvedSchema().getColumnNames();
}else{
} else {
return new ArrayList<>();
}
}
public static List<String> catchColumn(TableResult tableResult){
public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames();
}
}
......@@ -21,41 +21,19 @@ package org.apache.flink.table.types.extraction;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.shaded.asm7.org.objectweb.asm.*;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.lang.reflect.*;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
......@@ -64,7 +42,9 @@ import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
/** Utilities for performing reflection tasks. */
/**
* Utilities for performing reflection tasks.
*/
@Internal
public final class ExtractionUtils {
......@@ -72,7 +52,9 @@ public final class ExtractionUtils {
// Methods shared across packages
// --------------------------------------------------------------------------------------------
/** Collects methods of the given name. */
/**
* Collects methods of the given name.
*/
public static List<Method> collectMethods(Class<?> function, String methodName) {
return Arrays.stream(function.getMethods())
.filter(method -> method.getName().equals(methodName))
......@@ -132,7 +114,9 @@ public final class ExtractionUtils {
return clz == null || ExtractionUtils.isAssignable(clz, param, true);
}
/** Creates a method signature string like {@code int eval(Integer, String)}. */
/**
* Creates a method signature string like {@code int eval(Integer, String)}.
*/
public static String createMethodSignatureString(
String methodName, Class<?>[] parameters, @Nullable Class<?> returnType) {
final StringBuilder builder = new StringBuilder();
......@@ -305,7 +289,9 @@ public final class ExtractionUtils {
return false;
}
/** Checks whether a field is directly readable without a getter. */
/**
* Checks whether a field is directly readable without a getter.
*/
public static boolean isStructuredFieldDirectlyReadable(Field field) {
final int m = field.getModifiers();
......@@ -313,7 +299,9 @@ public final class ExtractionUtils {
return Modifier.isPublic(m);
}
/** Checks whether a field is directly writable without a setter or constructor. */
/**
* Checks whether a field is directly writable without a setter or constructor.
*/
public static boolean isStructuredFieldDirectlyWritable(Field field) {
final int m = field.getModifiers();
......@@ -352,12 +340,16 @@ public final class ExtractionUtils {
// Methods intended for this package
// --------------------------------------------------------------------------------------------
/** Helper method for creating consistent exceptions during extraction. */
/**
* Helper method for creating consistent exceptions during extraction.
*/
static ValidationException extractionError(String message, Object... args) {
return extractionError(null, message, args);
}
/** Helper method for creating consistent exceptions during extraction. */
/**
* Helper method for creating consistent exceptions during extraction.
*/
static ValidationException extractionError(Throwable cause, String message, Object... args) {
return new ValidationException(String.format(message, args), cause);
}
......@@ -386,8 +378,11 @@ public final class ExtractionUtils {
return typeHierarchy;
}
/** Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise. */
static @Nullable Class<?> toClass(Type type) {
/**
* Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise.
*/
static @Nullable
Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
......@@ -398,7 +393,9 @@ public final class ExtractionUtils {
return null;
}
/** Creates a raw data type. */
/**
* Creates a raw data type.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static DataType createRawType(
DataTypeFactory typeFactory,
......@@ -432,7 +429,9 @@ public final class ExtractionUtils {
}
}
/** Resolves a {@link TypeVariable} using the given type hierarchy if possible. */
/**
* Resolves a {@link TypeVariable} using the given type hierarchy if possible.
*/
static Type resolveVariable(List<Type> typeHierarchy, TypeVariable<?> variable) {
// iterate through hierarchy from top to bottom until type variable gets a non-variable
// assigned
......@@ -455,7 +454,8 @@ public final class ExtractionUtils {
return variable;
}
private static @Nullable Type resolveVariableInParameterizedType(
private static @Nullable
Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
......@@ -493,7 +493,9 @@ public final class ExtractionUtils {
}
}
/** Returns the fields of a class for a {@link StructuredType}. */
/**
* Returns the fields of a class for a {@link StructuredType}.
*/
static List<Field> collectStructuredFields(Class<?> clazz) {
final List<Field> fields = new ArrayList<>();
while (clazz != Object.class) {
......@@ -510,7 +512,9 @@ public final class ExtractionUtils {
return fields;
}
/** Validates if a field is properly readable either directly or through a getter. */
/**
* Validates if a field is properly readable either directly or through a getter.
*/
static void validateStructuredFieldReadability(Class<?> clazz, Field field) {
// field is accessible
if (isStructuredFieldDirectlyReadable(field)) {
......@@ -553,7 +557,9 @@ public final class ExtractionUtils {
field.getName(), clazz.getName());
}
/** Returns the boxed type of a primitive type. */
/**
* Returns the boxed type of a primitive type.
*/
static Type primitiveToWrapper(Type type) {
if (type instanceof Class) {
return primitiveToWrapper((Class<?>) type);
......@@ -561,7 +567,9 @@ public final class ExtractionUtils {
return type;
}
/** Collects all methods that qualify as methods of a {@link StructuredType}. */
/**
* Collects all methods that qualify as methods of a {@link StructuredType}.
*/
static List<Method> collectStructuredMethods(Class<?> clazz) {
final List<Method> methods = new ArrayList<>();
while (clazz != Object.class) {
......@@ -612,7 +620,9 @@ public final class ExtractionUtils {
// Parameter Extraction Utilities
// --------------------------------------------------------------------------------------------
/** Result of the extraction in {@link #extractAssigningConstructor(Class, List)}. */
/**
* Result of the extraction in {@link #extractAssigningConstructor(Class, List)}.
*/
public static class AssigningConstructor {
public final Constructor<?> constructor;
public final List<String> parameterNames;
......@@ -627,7 +637,8 @@ public final class ExtractionUtils {
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
public static @Nullable AssigningConstructor extractAssigningConstructor(
public static @Nullable
AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
......@@ -651,8 +662,11 @@ public final class ExtractionUtils {
return foundConstructor;
}
/** Extracts the parameter names of a method if possible. */
static @Nullable List<String> extractMethodParameterNames(Method method) {
/**
* Extracts the parameter names of a method if possible.
*/
static @Nullable
List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
......@@ -660,7 +674,8 @@ public final class ExtractionUtils {
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive and lenient) type and name.
*/
private static @Nullable List<String> extractConstructorParameterNames(
private static @Nullable
List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
......@@ -697,7 +712,8 @@ public final class ExtractionUtils {
return fieldNames;
}
private static @Nullable List<String> extractExecutableNames(Executable executable) {
private static @Nullable
List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
......@@ -743,7 +759,7 @@ public final class ExtractionUtils {
private static ClassReader getClassReader(Class<?> cls) {
final String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
if(ClassPool.exist(cls.getName())){
if (ClassPool.exist(cls.getName())) {
return new ClassReader(ClassPool.get(cls.getName()).getClassByte());
}
try {
......@@ -914,7 +930,9 @@ public final class ExtractionUtils {
return toClass.isAssignableFrom(cls);
}
/** Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */
/**
* Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
*/
private static final Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
static {
......@@ -929,7 +947,9 @@ public final class ExtractionUtils {
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
/** Maps wrapper {@code Class}es to their corresponding primitive types. */
/**
* Maps wrapper {@code Class}es to their corresponding primitive types.
*/
private static final Map<Class<?>, Class<?>> wrapperPrimitiveMap = new HashMap<>();
static {
......
......@@ -21,10 +21,10 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if(Asserts.isNotNull(config.getParallelism())){
if (Asserts.isNotNull(config.getParallelism())) {
env.setParallelism(config.getParallelism());
}
if(Asserts.isNotNull(config.getCheckpoint())){
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
......@@ -32,16 +32,16 @@ public class FlinkCDCMergeBuilder {
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if(Asserts.isNotNull(config.getDatabase())&&config.getDatabase().size()>0){
if (Asserts.isNotNull(config.getDatabase()) && config.getDatabase().size() > 0) {
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
if (Asserts.isNotNull(config.getTable()) && config.getTable().size() > 0) {
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSourceBuilder<String> builder = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema());
if(Asserts.isNotNullString(config.getStartupMode())){
switch (config.getStartupMode().toUpperCase()){
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) {
case "INITIAL":
builder.startupOptions(StartupOptions.initial());
break;
......@@ -54,12 +54,12 @@ public class FlinkCDCMergeBuilder {
default:
builder.startupOptions(StartupOptions.latest());
}
}else {
} else {
builder.startupOptions(StartupOptions.latest());
}
MySqlSource<String> sourceFunction = builder.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
......
......@@ -298,7 +298,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
if(Asserts.isNullString(key)||Asserts.isNullString(value)){
if (Asserts.isNullString(key) || Asserts.isNullString(value)) {
return;
}
Map<String, String> confMap = new HashMap<>();
......@@ -313,7 +313,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
private void callReset(ResetOperation resetOperation, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
if(Asserts.isNullString(key)){
if (Asserts.isNullString(key)) {
return;
}
Map<String, String> confMap = new HashMap<>();
......
......@@ -17,20 +17,12 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.*;
import java.util.concurrent.*;
/**
* 定制TableResultImpl
*
* @author wenmo
* @since 2021/10/22 10:02
**/
......@@ -68,16 +60,16 @@ public class CustomTableResultImpl implements TableResult {
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields,List<Row> rows){
public static TableResult buildTableResult(List<TableSchemaField> fields, List<Row> rows) {
Builder builder = builder().resultKind(ResultKind.SUCCESS);
if(fields.size()>0) {
if (fields.size() > 0) {
List<String> columnNames = new ArrayList<>();
List<DataType> columnTypes = new ArrayList<>();
for (int i = 0; i < fields.size(); i++) {
columnNames.add(fields.get(i).getName());
columnTypes.add(fields.get(i).getType());
}
builder.schema(ResolvedSchema.physical(columnNames,columnTypes)).data(rows);
builder.schema(ResolvedSchema.physical(columnNames, columnTypes)).data(rows);
}
return builder.build();
}
......@@ -184,7 +176,9 @@ public class CustomTableResultImpl implements TableResult {
return new Builder();
}
/** Builder for creating a {@link CustomTableResultImpl}. */
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public static class Builder {
private JobClient jobClient = null;
private ResolvedSchema resolvedSchema = null;
......@@ -194,7 +188,8 @@ public class CustomTableResultImpl implements TableResult {
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false, false);
private ZoneId sessionTimeZone = ZoneId.of("UTC");
private Builder() {}
private Builder() {
}
/**
* Specifies job client which associates the submitted Flink job.
......@@ -250,28 +245,36 @@ public class CustomTableResultImpl implements TableResult {
return this;
}
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
/** Specifies session time zone. */
/**
* Specifies session time zone.
*/
public Builder setSessionTimeZone(ZoneId sessionTimeZone) {
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
this.sessionTimeZone = sessionTimeZone;
return this;
}
/** Returns a {@link TableResult} instance. */
/**
* Returns a {@link TableResult} instance.
*/
public TableResult build() {
return new CustomTableResultImpl(
jobClient, resolvedSchema, resultKind, data, printStyle, sessionTimeZone);
}
}
/** Root interface for all print styles. */
/**
* Root interface for all print styles.
*/
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, change mode
......@@ -299,7 +302,9 @@ public class CustomTableResultImpl implements TableResult {
}
}
/** print the result schema and content as tableau form. */
/**
* print the result schema and content as tableau form.
*/
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
......@@ -309,7 +314,9 @@ public class CustomTableResultImpl implements TableResult {
private final int maxColumnWidth;
private final String nullColumn;
/** A flag to indicate whether print row kind info. */
/**
* A flag to indicate whether print row kind info.
*/
private final boolean printRowKind;
private TableauStyle(
......@@ -343,7 +350,8 @@ public class CustomTableResultImpl implements TableResult {
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {}
private static final class RawContentStyle implements PrintStyle {
}
/**
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
......
......@@ -16,19 +16,19 @@ import java.util.Optional;
*/
public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table){
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table) {
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return tableOpt.get().getResolvedSchema().getColumnNames();
}else{
} else {
return new ArrayList<String>();
}
}
public static List<String> catchColumn(TableResult tableResult){
public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames();
}
}
......@@ -21,42 +21,20 @@ package org.apache.flink.table.types.extraction;
import com.dlink.pool.ClassPool;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.shaded.asm7.org.objectweb.asm.*;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassReader;
import org.apache.flink.shaded.asm7.org.objectweb.asm.ClassVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Label;
import org.apache.flink.shaded.asm7.org.objectweb.asm.MethodVisitor;
import org.apache.flink.shaded.asm7.org.objectweb.asm.Opcodes;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.lang.reflect.*;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
......@@ -65,7 +43,9 @@ import java.util.stream.Stream;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm7.org.objectweb.asm.Type.getMethodDescriptor;
/** Utilities for performing reflection tasks. */
/**
* Utilities for performing reflection tasks.
*/
@Internal
public final class ExtractionUtils {
......@@ -73,7 +53,9 @@ public final class ExtractionUtils {
// Methods shared across packages
// --------------------------------------------------------------------------------------------
/** Collects methods of the given name. */
/**
* Collects methods of the given name.
*/
public static List<Method> collectMethods(Class<?> function, String methodName) {
return Arrays.stream(function.getMethods())
.filter(method -> method.getName().equals(methodName))
......@@ -133,7 +115,9 @@ public final class ExtractionUtils {
return clz == null || ExtractionUtils.isAssignable(clz, param, true);
}
/** Creates a method signature string like {@code int eval(Integer, String)}. */
/**
* Creates a method signature string like {@code int eval(Integer, String)}.
*/
public static String createMethodSignatureString(
String methodName, Class<?>[] parameters, @Nullable Class<?> returnType) {
final StringBuilder builder = new StringBuilder();
......@@ -306,7 +290,9 @@ public final class ExtractionUtils {
return false;
}
/** Checks whether a field is directly readable without a getter. */
/**
* Checks whether a field is directly readable without a getter.
*/
public static boolean isStructuredFieldDirectlyReadable(Field field) {
final int m = field.getModifiers();
......@@ -314,7 +300,9 @@ public final class ExtractionUtils {
return Modifier.isPublic(m);
}
/** Checks whether a field is directly writable without a setter or constructor. */
/**
* Checks whether a field is directly writable without a setter or constructor.
*/
public static boolean isStructuredFieldDirectlyWritable(Field field) {
final int m = field.getModifiers();
......@@ -353,12 +341,16 @@ public final class ExtractionUtils {
// Methods intended for this package
// --------------------------------------------------------------------------------------------
/** Helper method for creating consistent exceptions during extraction. */
/**
* Helper method for creating consistent exceptions during extraction.
*/
static ValidationException extractionError(String message, Object... args) {
return extractionError(null, message, args);
}
/** Helper method for creating consistent exceptions during extraction. */
/**
* Helper method for creating consistent exceptions during extraction.
*/
static ValidationException extractionError(Throwable cause, String message, Object... args) {
return new ValidationException(String.format(message, args), cause);
}
......@@ -387,8 +379,11 @@ public final class ExtractionUtils {
return typeHierarchy;
}
/** Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise. */
static @Nullable Class<?> toClass(Type type) {
/**
* Converts a {@link Type} to {@link Class} if possible, {@code null} otherwise.
*/
static @Nullable
Class<?> toClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
} else if (type instanceof ParameterizedType) {
......@@ -399,7 +394,9 @@ public final class ExtractionUtils {
return null;
}
/** Creates a raw data type. */
/**
* Creates a raw data type.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static DataType createRawType(
DataTypeFactory typeFactory,
......@@ -433,7 +430,9 @@ public final class ExtractionUtils {
}
}
/** Resolves a {@link TypeVariable} using the given type hierarchy if possible. */
/**
* Resolves a {@link TypeVariable} using the given type hierarchy if possible.
*/
static Type resolveVariable(List<Type> typeHierarchy, TypeVariable<?> variable) {
// iterate through hierarchy from top to bottom until type variable gets a non-variable
// assigned
......@@ -456,7 +455,8 @@ public final class ExtractionUtils {
return variable;
}
private static @Nullable Type resolveVariableInParameterizedType(
private static @Nullable
Type resolveVariableInParameterizedType(
TypeVariable<?> variable, ParameterizedType currentType) {
final Class<?> currentRaw = (Class<?>) currentType.getRawType();
final TypeVariable<?>[] currentVariables = currentRaw.getTypeParameters();
......@@ -494,7 +494,9 @@ public final class ExtractionUtils {
}
}
/** Returns the fields of a class for a {@link StructuredType}. */
/**
* Returns the fields of a class for a {@link StructuredType}.
*/
static List<Field> collectStructuredFields(Class<?> clazz) {
final List<Field> fields = new ArrayList<>();
while (clazz != Object.class) {
......@@ -511,7 +513,9 @@ public final class ExtractionUtils {
return fields;
}
/** Validates if a field is properly readable either directly or through a getter. */
/**
* Validates if a field is properly readable either directly or through a getter.
*/
static void validateStructuredFieldReadability(Class<?> clazz, Field field) {
// field is accessible
if (isStructuredFieldDirectlyReadable(field)) {
......@@ -554,7 +558,9 @@ public final class ExtractionUtils {
field.getName(), clazz.getName());
}
/** Returns the boxed type of a primitive type. */
/**
* Returns the boxed type of a primitive type.
*/
static Type primitiveToWrapper(Type type) {
if (type instanceof Class) {
return primitiveToWrapper((Class<?>) type);
......@@ -562,7 +568,9 @@ public final class ExtractionUtils {
return type;
}
/** Collects all methods that qualify as methods of a {@link StructuredType}. */
/**
* Collects all methods that qualify as methods of a {@link StructuredType}.
*/
static List<Method> collectStructuredMethods(Class<?> clazz) {
final List<Method> methods = new ArrayList<>();
while (clazz != Object.class) {
......@@ -613,7 +621,9 @@ public final class ExtractionUtils {
// Parameter Extraction Utilities
// --------------------------------------------------------------------------------------------
/** Result of the extraction in {@link #extractAssigningConstructor(Class, List)}. */
/**
* Result of the extraction in {@link #extractAssigningConstructor(Class, List)}.
*/
public static class AssigningConstructor {
public final Constructor<?> constructor;
public final List<String> parameterNames;
......@@ -628,7 +638,8 @@ public final class ExtractionUtils {
* Checks whether the given constructor takes all of the given fields with matching (possibly
* primitive) type and name. An assigning constructor can define the order of fields.
*/
public static @Nullable AssigningConstructor extractAssigningConstructor(
public static @Nullable
AssigningConstructor extractAssigningConstructor(
Class<?> clazz, List<Field> fields) {
AssigningConstructor foundConstructor = null;
for (Constructor<?> constructor : clazz.getDeclaredConstructors()) {
......@@ -652,8 +663,11 @@ public final class ExtractionUtils {
return foundConstructor;
}
/** Extracts the parameter names of a method if possible. */
static @Nullable List<String> extractMethodParameterNames(Method method) {
/**
* Extracts the parameter names of a method if possible.
*/
static @Nullable
List<String> extractMethodParameterNames(Method method) {
return extractExecutableNames(method);
}
......@@ -661,7 +675,8 @@ public final class ExtractionUtils {
* Extracts ordered parameter names from a constructor that takes all of the given fields with
* matching (possibly primitive and lenient) type and name.
*/
private static @Nullable List<String> extractConstructorParameterNames(
private static @Nullable
List<String> extractConstructorParameterNames(
Constructor<?> constructor, List<Field> fields) {
final Type[] parameterTypes = constructor.getGenericParameterTypes();
......@@ -698,7 +713,8 @@ public final class ExtractionUtils {
return fieldNames;
}
private static @Nullable List<String> extractExecutableNames(Executable executable) {
private static @Nullable
List<String> extractExecutableNames(Executable executable) {
final int offset;
if (!Modifier.isStatic(executable.getModifiers())) {
// remove "this" as first parameter
......@@ -744,7 +760,7 @@ public final class ExtractionUtils {
private static ClassReader getClassReader(Class<?> cls) {
final String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
if(ClassPool.exist(cls.getName())){
if (ClassPool.exist(cls.getName())) {
return new ClassReader(ClassPool.get(cls.getName()).getClassByte());
}
try (InputStream i = cls.getResourceAsStream(className)) {
......@@ -915,7 +931,9 @@ public final class ExtractionUtils {
return toClass.isAssignableFrom(cls);
}
/** Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. */
/**
* Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
*/
private static final Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
static {
......@@ -930,7 +948,9 @@ public final class ExtractionUtils {
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
/** Maps wrapper {@code Class}es to their corresponding primitive types. */
/**
* Maps wrapper {@code Class}es to their corresponding primitive types.
*/
private static final Map<Class<?>, Class<?>> wrapperPrimitiveMap = new HashMap<>();
static {
......
......@@ -14,14 +14,14 @@ import java.util.Map;
*/
public class FlinkBaseUtil {
public static Map<String,String> getParamsFromArgs(String[] args){
Map<String,String> params = new HashMap<>();
public static Map<String, String> getParamsFromArgs(String[] args) {
Map<String, String> params = new HashMap<>();
ParameterTool parameters = ParameterTool.fromArgs(args);
params.put(FlinkParamConstant.ID,parameters.get(FlinkParamConstant.ID, null));
params.put(FlinkParamConstant.DRIVER,parameters.get(FlinkParamConstant.DRIVER, null));
params.put(FlinkParamConstant.URL,parameters.get(FlinkParamConstant.URL, null));
params.put(FlinkParamConstant.USERNAME,parameters.get(FlinkParamConstant.USERNAME, null));
params.put(FlinkParamConstant.PASSWORD,parameters.get(FlinkParamConstant.PASSWORD, null));
params.put(FlinkParamConstant.ID, parameters.get(FlinkParamConstant.ID, null));
params.put(FlinkParamConstant.DRIVER, parameters.get(FlinkParamConstant.DRIVER, null));
params.put(FlinkParamConstant.URL, parameters.get(FlinkParamConstant.URL, null));
params.put(FlinkParamConstant.USERNAME, parameters.get(FlinkParamConstant.USERNAME, null));
params.put(FlinkParamConstant.PASSWORD, parameters.get(FlinkParamConstant.PASSWORD, null));
return params;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder/>
<dubbo:application name="demo-consumer"/>
<dubbo:registry address="zookeeper://${zookeeper.address:127.0.0.1}:2181"/>
<!-- <dubbo:reference id="demoService" check="true" interface="com.dlink.service.DemoService" version="1.0.0"/>-->
</beans>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment