Commit d531a855 authored by wenmo's avatar wenmo

[Feature-447][client] CDCSource sync sql

parent bb920d25
...@@ -12,6 +12,7 @@ import org.apache.flink.table.data.DecimalData; ...@@ -12,6 +12,7 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
...@@ -49,6 +50,7 @@ import com.dlink.model.Table; ...@@ -49,6 +50,7 @@ import com.dlink.model.Table;
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
public AbstractSinkBuilder() { public AbstractSinkBuilder() {
} }
...@@ -220,6 +222,9 @@ public abstract class AbstractSinkBuilder { ...@@ -220,6 +222,9 @@ public abstract class AbstractSinkBuilder {
} }
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
......
...@@ -7,7 +7,6 @@ import java.util.List; ...@@ -7,7 +7,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* CDCBuilder * CDCBuilder
...@@ -29,7 +28,5 @@ public interface CDCBuilder { ...@@ -29,7 +28,5 @@ public interface CDCBuilder {
Map<String, Map<String, String>> parseMetaDataConfigs(); Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
String getSchemaFieldName(); String getSchemaFieldName();
} }
...@@ -26,7 +26,7 @@ import com.dlink.model.Table; ...@@ -26,7 +26,7 @@ import com.dlink.model.Table;
**/ **/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "doris"; private final static String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L; private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() { public DorisSinkBuilder() {
......
...@@ -33,7 +33,7 @@ import com.dlink.model.Table; ...@@ -33,7 +33,7 @@ import com.dlink.model.Table;
**/ **/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder { public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "kafka"; private final static String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() { public KafkaSinkBuilder() {
} }
......
...@@ -103,26 +103,4 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -103,26 +103,4 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
return allConfigMap; return allConfigMap;
} }
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
/* sb.append(" WHERE database_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");*/
return sb.toString();
}
} }
package com.dlink.cdc.sql;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil;
/**
* SQLSinkBuilder
*
* @author wenmo
* @since 2022/4/25 23:02
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public void addSink(StreamExecutionEnvironment env, DataStream<RowData> rowDataDataStream, Table table, List<String> columnNameList, List<LogicalType> columnTypeList) {
}
protected DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes));
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "c":
Row irow = Row.ofKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.ofKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
}
}, rowTypeInfo);
}
public void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}
}
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new SQLSinkBuilder(config);
}
@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
}
return dataStreamSource;
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof VarCharType) {
return value;
} else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else {
return value;
}
}
protected String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
return SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
}
}
...@@ -7,6 +7,7 @@ import org.apache.flink.configuration.PipelineOptions; ...@@ -7,6 +7,7 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
...@@ -16,6 +17,7 @@ import org.apache.flink.table.api.ExplainDetail; ...@@ -16,6 +17,7 @@ import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionCatalog;
...@@ -24,26 +26,26 @@ import org.apache.flink.table.delegation.Executor; ...@@ -24,26 +26,26 @@ import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory; import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase; import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils; import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row; import org.apache.flink.table.typeutils.FieldInfoUtils;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import com.dlink.exception.FlinkClientException;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
...@@ -272,40 +274,63 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -272,40 +274,63 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
return record; return record;
} }
public <T> void registerFunction(String name, TableFunction<T> tableFunction) { public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction); return false;
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
} }
public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) { public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction); JavaDataStreamQueryOperation<T> queryOperation =
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction); asQueryOperation(dataStream, Optional.of(Arrays.asList(fields)));
this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
}
public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) { return createTable(queryOperation);
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
} }
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) { public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
return false; List<Expression> expressions = ExpressionParser.parseExpressionList(fields);
return fromDataStream(dataStream, expressions.toArray(new Expression[0]));
} }
@Override @Override
public Table fromChangelogStream(DataStream<Row> dataStream) { public <T> void createTemporaryView(String path, DataStream<T> dataStream, String fields) {
throw new FlinkClientException("Flink 1.12 not support"); createTemporaryView(path, fromDataStream(dataStream, fields));
} }
@Override @Override
public <T> void registerDataStream(String name, DataStream<T> dataStream) { public <T> void createTemporaryView(
throw new FlinkClientException("Flink 1.12 not support"); String path, DataStream<T> dataStream, Expression... fields) {
createTemporaryView(path, fromDataStream(dataStream, fields));
} }
@Override private <T> JavaDataStreamQueryOperation<T> asQueryOperation(
public <T> void createTemporaryView(String path, DataStream<T> dataStream) { DataStream<T> dataStream, Optional<List<Expression>> fields) {
throw new FlinkClientException("Flink 1.12 not support"); TypeInformation<T> streamType = dataStream.getType();
// get field names and types for all non-replaced fields
FieldInfoUtils.TypeInfoSchema typeInfoSchema =
fields.map(
f -> {
FieldInfoUtils.TypeInfoSchema fieldsInfo =
FieldInfoUtils.getFieldsInfo(
streamType, f.toArray(new Expression[0]));
// check if event-time is enabled
validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
return fieldsInfo;
})
.orElseGet(() -> FieldInfoUtils.getFieldsInfo(streamType));
return new JavaDataStreamQueryOperation<>(
dataStream, typeInfoSchema.getIndices(), typeInfoSchema.toTableSchema());
}
private void validateTimeCharacteristic(boolean isRowtimeDefined) {
if (isRowtimeDefined
&& executionEnvironment.getStreamTimeCharacteristic()
!= TimeCharacteristic.EventTime) {
throw new ValidationException(
String.format(
"A rowtime attribute requires an EventTime time characteristic in stream environment. But is: %s",
executionEnvironment.getStreamTimeCharacteristic()));
}
} }
} }
...@@ -12,6 +12,7 @@ import org.apache.flink.table.data.DecimalData; ...@@ -12,6 +12,7 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
...@@ -49,6 +50,7 @@ import com.dlink.model.Table; ...@@ -49,6 +50,7 @@ import com.dlink.model.Table;
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
public AbstractSinkBuilder() { public AbstractSinkBuilder() {
} }
...@@ -220,6 +222,9 @@ public abstract class AbstractSinkBuilder { ...@@ -220,6 +222,9 @@ public abstract class AbstractSinkBuilder {
} }
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
......
...@@ -7,7 +7,6 @@ import java.util.List; ...@@ -7,7 +7,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* CDCBuilder * CDCBuilder
...@@ -29,7 +28,5 @@ public interface CDCBuilder { ...@@ -29,7 +28,5 @@ public interface CDCBuilder {
Map<String, Map<String, String>> parseMetaDataConfigs(); Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
String getSchemaFieldName(); String getSchemaFieldName();
} }
...@@ -3,6 +3,7 @@ package com.dlink.cdc; ...@@ -3,6 +3,7 @@ package com.dlink.cdc;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.doris.DorisSinkBuilder; import com.dlink.cdc.doris.DorisSinkBuilder;
import com.dlink.cdc.kafka.KafkaSinkBuilder; import com.dlink.cdc.kafka.KafkaSinkBuilder;
import com.dlink.cdc.sql.SQLSinkBuilder;
import com.dlink.exception.FlinkClientException; import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
...@@ -16,7 +17,8 @@ public class SinkBuilderFactory { ...@@ -16,7 +17,8 @@ public class SinkBuilderFactory {
private static SinkBuilder[] sinkBuilders = { private static SinkBuilder[] sinkBuilders = {
new KafkaSinkBuilder(), new KafkaSinkBuilder(),
new DorisSinkBuilder() new DorisSinkBuilder(),
new SQLSinkBuilder()
}; };
public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) { public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) {
...@@ -28,6 +30,6 @@ public class SinkBuilderFactory { ...@@ -28,6 +30,6 @@ public class SinkBuilderFactory {
return sinkBuilders[i].create(config); return sinkBuilders[i].create(config);
} }
} }
throw new FlinkClientException("未匹配到对应 Sink 类型的【" + config.getSink().get("connector") + "】。"); return new SQLSinkBuilder().create(config);
} }
} }
...@@ -26,7 +26,7 @@ import com.dlink.model.Table; ...@@ -26,7 +26,7 @@ import com.dlink.model.Table;
**/ **/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "doris"; private final static String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L; private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() { public DorisSinkBuilder() {
......
...@@ -33,7 +33,7 @@ import com.dlink.model.Table; ...@@ -33,7 +33,7 @@ import com.dlink.model.Table;
**/ **/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder { public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "kafka"; private final static String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() { public KafkaSinkBuilder() {
} }
......
...@@ -19,7 +19,6 @@ import com.dlink.cdc.CDCBuilder; ...@@ -19,7 +19,6 @@ import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
...@@ -121,26 +120,4 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -121,26 +120,4 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
return allConfigMap; return allConfigMap;
} }
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
/* sb.append(" WHERE database_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");*/
return sb.toString();
}
} }
package com.dlink.cdc.sql;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil;
/**
* SQLSinkBuilder
*
* @author wenmo
* @since 2022/4/25 23:02
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public void addSink(StreamExecutionEnvironment env, DataStream<RowData> rowDataDataStream, Table table, List<String> columnNameList, List<LogicalType> columnTypeList) {
}
protected DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes));
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "c":
Row irow = Row.ofKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.ofKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
}
}, rowTypeInfo);
}
public void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}
}
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new SQLSinkBuilder(config);
}
@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
}
return dataStreamSource;
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof VarCharType) {
return value;
} else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else {
return value;
}
}
protected String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
return SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
}
}
...@@ -9,6 +9,7 @@ import org.apache.flink.configuration.PipelineOptions; ...@@ -9,6 +9,7 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
...@@ -18,6 +19,7 @@ import org.apache.flink.table.api.ExplainDetail; ...@@ -18,6 +19,7 @@ import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionCatalog;
...@@ -26,26 +28,26 @@ import org.apache.flink.table.delegation.Executor; ...@@ -26,26 +28,26 @@ import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory; import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase; import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils; import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row; import org.apache.flink.table.typeutils.FieldInfoUtils;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import com.dlink.exception.FlinkClientException;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
...@@ -274,39 +276,63 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -274,39 +276,63 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
return record; return record;
} }
public <T> void registerFunction(String name, TableFunction<T> tableFunction) { public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction); return false;
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
} }
public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) { public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction); JavaDataStreamQueryOperation<T> queryOperation =
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction); asQueryOperation(dataStream, Optional.of(Arrays.asList(fields)));
this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
}
public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) { return createTable(queryOperation);
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
} }
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) { public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
return false; List<Expression> expressions = ExpressionParser.parseExpressionList(fields);
return fromDataStream(dataStream, expressions.toArray(new Expression[0]));
} }
@Override @Override
public Table fromChangelogStream(DataStream<Row> dataStream) { public <T> void createTemporaryView(String path, DataStream<T> dataStream, String fields) {
throw new FlinkClientException("Flink 1.12 not support"); createTemporaryView(path, fromDataStream(dataStream, fields));
} }
@Override @Override
public <T> void registerDataStream(String name, DataStream<T> dataStream) { public <T> void createTemporaryView(
throw new FlinkClientException("Flink 1.12 not support"); String path, DataStream<T> dataStream, Expression... fields) {
createTemporaryView(path, fromDataStream(dataStream, fields));
} }
@Override private <T> JavaDataStreamQueryOperation<T> asQueryOperation(
public <T> void createTemporaryView(String path, DataStream<T> dataStream) { DataStream<T> dataStream, Optional<List<Expression>> fields) {
throw new FlinkClientException("Flink 1.12 not support"); TypeInformation<T> streamType = dataStream.getType();
// get field names and types for all non-replaced fields
FieldInfoUtils.TypeInfoSchema typeInfoSchema =
fields.map(
f -> {
FieldInfoUtils.TypeInfoSchema fieldsInfo =
FieldInfoUtils.getFieldsInfo(
streamType, f.toArray(new Expression[0]));
// check if event-time is enabled
validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
return fieldsInfo;
})
.orElseGet(() -> FieldInfoUtils.getFieldsInfo(streamType));
return new JavaDataStreamQueryOperation<>(
dataStream, typeInfoSchema.getIndices(), typeInfoSchema.toTableSchema());
}
private void validateTimeCharacteristic(boolean isRowtimeDefined) {
if (isRowtimeDefined
&& executionEnvironment.getStreamTimeCharacteristic()
!= TimeCharacteristic.EventTime) {
throw new ValidationException(
String.format(
"A rowtime attribute requires an EventTime time characteristic in stream environment. But is: %s",
executionEnvironment.getStreamTimeCharacteristic()));
}
} }
} }
\ No newline at end of file
...@@ -12,6 +12,7 @@ import org.apache.flink.table.data.DecimalData; ...@@ -12,6 +12,7 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
...@@ -49,6 +50,7 @@ import com.dlink.model.Table; ...@@ -49,6 +50,7 @@ import com.dlink.model.Table;
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
public AbstractSinkBuilder() { public AbstractSinkBuilder() {
} }
...@@ -162,17 +164,23 @@ public abstract class AbstractSinkBuilder { ...@@ -162,17 +164,23 @@ public abstract class AbstractSinkBuilder {
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource); SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
} }
...@@ -220,6 +228,9 @@ public abstract class AbstractSinkBuilder { ...@@ -220,6 +228,9 @@ public abstract class AbstractSinkBuilder {
} }
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
...@@ -232,7 +243,7 @@ public abstract class AbstractSinkBuilder { ...@@ -232,7 +243,7 @@ public abstract class AbstractSinkBuilder {
} }
} }
protected String getSinkSchemaName(Table table){ protected String getSinkSchemaName(Table table) {
String schemaName = table.getSchema(); String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) { if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db"); schemaName = config.getSink().get("sink.db");
...@@ -240,7 +251,7 @@ public abstract class AbstractSinkBuilder { ...@@ -240,7 +251,7 @@ public abstract class AbstractSinkBuilder {
return schemaName; return schemaName;
} }
protected String getSinkTableName(Table table){ protected String getSinkTableName(Table table) {
String tableName = table.getName(); String tableName = table.getName();
if (config.getSink().containsKey("table.prefix.schema")) { if (config.getSink().containsKey("table.prefix.schema")) {
if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) { if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) {
...@@ -265,4 +276,5 @@ public abstract class AbstractSinkBuilder { ...@@ -265,4 +276,5 @@ public abstract class AbstractSinkBuilder {
} }
return tableName; return tableName;
} }
} }
...@@ -7,7 +7,6 @@ import java.util.List; ...@@ -7,7 +7,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* CDCBuilder * CDCBuilder
...@@ -29,7 +28,5 @@ public interface CDCBuilder { ...@@ -29,7 +28,5 @@ public interface CDCBuilder {
Map<String, Map<String, String>> parseMetaDataConfigs(); Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
String getSchemaFieldName(); String getSchemaFieldName();
} }
...@@ -3,8 +3,8 @@ package com.dlink.cdc; ...@@ -3,8 +3,8 @@ package com.dlink.cdc;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.doris.DorisSinkBuilder; import com.dlink.cdc.doris.DorisSinkBuilder;
import com.dlink.cdc.hudi.HudiSinkBuilder; import com.dlink.cdc.hudi.HudiSinkBuilder;
import com.dlink.cdc.jdbc.JdbcSinkBuilder;
import com.dlink.cdc.kafka.KafkaSinkBuilder; import com.dlink.cdc.kafka.KafkaSinkBuilder;
import com.dlink.cdc.sql.SQLSinkBuilder;
import com.dlink.exception.FlinkClientException; import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
...@@ -18,9 +18,9 @@ public class SinkBuilderFactory { ...@@ -18,9 +18,9 @@ public class SinkBuilderFactory {
private static SinkBuilder[] sinkBuilders = { private static SinkBuilder[] sinkBuilders = {
new KafkaSinkBuilder(), new KafkaSinkBuilder(),
new JdbcSinkBuilder(),
new DorisSinkBuilder(), new DorisSinkBuilder(),
new HudiSinkBuilder(), new HudiSinkBuilder(),
new SQLSinkBuilder(),
}; };
public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) { public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) {
...@@ -32,6 +32,6 @@ public class SinkBuilderFactory { ...@@ -32,6 +32,6 @@ public class SinkBuilderFactory {
return sinkBuilders[i].create(config); return sinkBuilders[i].create(config);
} }
} }
throw new FlinkClientException("未匹配到对应 Sink 类型的【" + config.getSink().get("connector") + "】。"); return new SQLSinkBuilder().create(config);
} }
} }
...@@ -4,7 +4,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; ...@@ -4,7 +4,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink; import org.apache.doris.flink.cfg.DorisSink;
import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
...@@ -27,7 +26,7 @@ import com.dlink.model.Table; ...@@ -27,7 +26,7 @@ import com.dlink.model.Table;
**/ **/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "doris"; private final static String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L; private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() { public DorisSinkBuilder() {
......
...@@ -29,7 +29,7 @@ import com.dlink.model.Table; ...@@ -29,7 +29,7 @@ import com.dlink.model.Table;
*/ */
public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "hudi"; private final static String KEY_WORD = "datastream-hudi";
private static final long serialVersionUID = 5324199407472847422L; private static final long serialVersionUID = 5324199407472847422L;
public HudiSinkBuilder() { public HudiSinkBuilder() {
......
package com.dlink.cdc.jdbc;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.connector.jdbc.dialect.ClickHouseDialect;
import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
import org.apache.flink.connector.jdbc.dialect.OracleDialect;
import org.apache.flink.connector.jdbc.dialect.PostgresDialect;
import org.apache.flink.connector.jdbc.dialect.SQLServerDialect;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.table.JdbcUpsertTableSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "jdbc";
private final static String TABLE_NAME = "cdc_table";
public JdbcSinkBuilder() {
}
public JdbcSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new JdbcSinkBuilder(config);
}
@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
JdbcUpsertTableSink.Builder builder = JdbcUpsertTableSink.builder();
Map<String, String> sink = config.getSink();
if (sink.containsKey("sink.buffer-flush.interval")) {
builder.setFlushIntervalMills(Integer.valueOf(sink.get("sink.buffer-flush.interval")));
}
if (sink.containsKey("sink.buffer-flush.max-rows")) {
builder.setFlushMaxSize(Integer.valueOf(sink.get("sink.buffer-flush.max-rows")));
}
if (sink.containsKey("sink.max-retries")) {
builder.setMaxRetryTimes(Integer.valueOf(sink.get("sink.max-retries")));
}
JdbcOptions.Builder jdbcOptionsBuilder = JdbcOptions.builder();
if (sink.containsKey("connection.max-retry-timeout")) {
jdbcOptionsBuilder.setConnectionCheckTimeoutSeconds(Integer.valueOf(sink.get("connection.max-retry-timeout")));
}
if (sink.containsKey("url")) {
jdbcOptionsBuilder.setDBUrl(sink.get("url"));
}
if (sink.containsKey("dialect")) {
switch (sink.get("dialect")) {
case "MySql":
jdbcOptionsBuilder.setDialect(new MySQLDialect());
break;
case "Oracle":
jdbcOptionsBuilder.setDialect(new OracleDialect());
break;
case "ClickHouse":
jdbcOptionsBuilder.setDialect(new ClickHouseDialect());
break;
case "SQLServer":
jdbcOptionsBuilder.setDialect(new SQLServerDialect());
break;
case "Postgres":
jdbcOptionsBuilder.setDialect(new PostgresDialect());
break;
}
}
if (sink.containsKey("driver")) {
jdbcOptionsBuilder.setDriverName(sink.get("driver"));
}
if (sink.containsKey("sink.parallelism")) {
jdbcOptionsBuilder.setParallelism(Integer.valueOf(sink.get("sink.parallelism")));
}
if (sink.containsKey("password")) {
jdbcOptionsBuilder.setPassword(sink.get("password"));
}
if (sink.containsKey("username")) {
jdbcOptionsBuilder.setUsername(sink.get("username"));
}
jdbcOptionsBuilder.setTableName(table.getSchemaTableName());
builder.setOptions(jdbcOptionsBuilder.build());
builder.setTableSchema(TableSchema.fromTypeInfo(rowDataDataStream.getType()));
/*JdbcUpsertTableSink build = builder.build();
build.consumeDataStream(rowDataDataStream);
rowDataDataStream.addSink(build.);*/
}
/*@Override
public DataStreamSource build(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
*//*dataStreamSource.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
return value.containsKey("table_name") && table.getName().equals(value.get("table_name"));
}
});
dataStreamSource.addSink(
JdbcSink.sink(
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));*//*
}
}
}
return dataStreamSource;
}*/
@Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
/*org.apache.flink.table.api.Table table = env.fromChangelogStream(dataStreamSource);
env.registerTable("cdc_table",table);*/
customTableEnvironment.registerDataStream(TABLE_NAME, dataStreamSource);
List<ModifyOperation> modifyOperations = new ArrayList();
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : config.getSink().entrySet()) {
sb.append("'");
sb.append(entry.getKey());
sb.append("' = '");
sb.append(entry.getValue());
sb.append("',\n");
}
for (Schema schema : schemaList) {
for (Table item : schema.getTables()) {
customTableEnvironment.executeSql(item.getFlinkTableSql(sb.toString() + "'table-name' = '" + item.getSchemaTableName() + "'\n"));
List<Operation> operations = customTableEnvironment.getParser().parse(cdcBuilder.getInsertSQL(item, TABLE_NAME));
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}
}
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
}
return dataStreamSource;
}
}
...@@ -33,7 +33,7 @@ import com.dlink.model.Table; ...@@ -33,7 +33,7 @@ import com.dlink.model.Table;
**/ **/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder { public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "kafka"; private final static String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() { public KafkaSinkBuilder() {
} }
......
...@@ -17,7 +17,6 @@ import com.dlink.cdc.CDCBuilder; ...@@ -17,7 +17,6 @@ import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.table.StartupOptions;
...@@ -127,28 +126,6 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -127,28 +126,6 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
return allConfigMap; return allConfigMap;
} }
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
sb.append(" WHERE database_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");
return sb.toString();
}
@Override @Override
public String getSchemaFieldName() { public String getSchemaFieldName() {
return "db"; return "db";
......
...@@ -107,26 +107,4 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -107,26 +107,4 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
return allConfigList; return allConfigList;
} }
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
sb.append(" WHERE schema_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");
return sb.toString();
}
} }
package com.dlink.cdc.sql;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil;
/**
* SQLSinkBuilder
*
* @author wenmo
* @since 2022/4/25 23:02
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public void addSink(StreamExecutionEnvironment env, DataStream<RowData> rowDataDataStream, Table table, List<String> columnNameList, List<LogicalType> columnTypeList) {
}
protected DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes));
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "c":
Row irow = Row.withNames(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(columnNameList.get(i), convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withNames(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(columnNameList.get(i), convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withNames(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(columnNameList.get(i), convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withNames(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(columnNameList.get(i), convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
}
}, rowTypeInfo);
}
public void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}
}
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new SQLSinkBuilder(config);
}
@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
}
return dataStreamSource;
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof VarCharType) {
return value;
} else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else {
return value;
}
}
protected String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
return SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
}
}
...@@ -9,31 +9,27 @@ import org.apache.flink.configuration.PipelineOptions; ...@@ -9,31 +9,27 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ExternalSchemaTranslator;
import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaResolver;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory; import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableAggregateFunction;
...@@ -41,26 +37,23 @@ import org.apache.flink.table.functions.TableFunction; ...@@ -41,26 +37,23 @@ import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.JavaExternalQueryOperation; import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation; import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation; import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.planner.delegation.ExecutorBase; import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils; import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row; import org.apache.flink.table.typeutils.FieldInfoUtils;
import org.apache.flink.util.Preconditions;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.Optional;
import javax.annotation.Nullable;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
...@@ -349,71 +342,42 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -349,71 +342,42 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
} }
} }
@Override public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
public Table fromChangelogStream(DataStream<Row> dataStream) { List<Expression> expressions = ExpressionParser.parseExpressionList(fields);
return fromStreamInternal(dataStream, null, null, ChangelogMode.all()); return this.fromDataStream(dataStream, (Expression[]) expressions.toArray(new Expression[0]));
} }
@Override public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields) {
public <T> void registerDataStream(String name, DataStream<T> dataStream) { JavaDataStreamQueryOperation<T> queryOperation = this.asQueryOperation(dataStream, Optional.of(Arrays.asList(fields)));
createTemporaryView(name, dataStream); return this.createTable(queryOperation);
} }
@Override @Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream) { public <T> void createTemporaryView(String path, DataStream<T> dataStream, String fields) {
createTemporaryView( this.createTemporaryView(path, this.fromDataStream(dataStream, fields));
path, fromStreamInternal(dataStream, null, path, ChangelogMode.insertOnly()));
} }
private <T> Table fromStreamInternal( @Override
DataStream<T> dataStream, public <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields) {
@Nullable Schema schema, this.createTemporaryView(path, this.fromDataStream(dataStream, fields));
@Nullable String viewPath, }
ChangelogMode changelogMode) {
Preconditions.checkNotNull(dataStream, "Data stream must not be null.");
Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null.");
final CatalogManager catalogManager = getCatalogManager();
final SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder();
final UnresolvedIdentifier unresolvedIdentifier;
if (viewPath != null) {
unresolvedIdentifier = getParser().parseIdentifier(viewPath);
} else {
unresolvedIdentifier =
UnresolvedIdentifier.of("Unregistered_DataStream_Source_" + dataStream.getId());
}
final ObjectIdentifier objectIdentifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
final ExternalSchemaTranslator.InputResult schemaTranslationResult =
ExternalSchemaTranslator.fromExternal(
catalogManager.getDataTypeFactory(), dataStream.getType(), schema);
final ResolvedSchema resolvedSchema =
schemaTranslationResult.getSchema().resolve(schemaResolver);
final QueryOperation scanOperation =
new JavaExternalQueryOperation<>(
objectIdentifier,
dataStream,
schemaTranslationResult.getPhysicalDataType(),
schemaTranslationResult.isTopLevelRecord(),
changelogMode,
resolvedSchema);
final List<String> projections = schemaTranslationResult.getProjections();
if (projections == null) {
return createTable(scanOperation);
}
final QueryOperation projectOperation = private <T> JavaDataStreamQueryOperation<T> asQueryOperation(DataStream<T> dataStream, Optional<List<Expression>> fields) {
operationTreeBuilder.project( TypeInformation<T> streamType = dataStream.getType();
projections.stream() FieldInfoUtils.TypeInfoSchema typeInfoSchema = (FieldInfoUtils.TypeInfoSchema) fields.map((f) -> {
.map(ApiExpressionUtils::unresolvedRef) FieldInfoUtils.TypeInfoSchema fieldsInfo = FieldInfoUtils.getFieldsInfo(streamType, (Expression[]) f.toArray(new Expression[0]));
.collect(Collectors.toList()), this.validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
scanOperation); return fieldsInfo;
}).orElseGet(() -> {
return FieldInfoUtils.getFieldsInfo(streamType);
});
return new JavaDataStreamQueryOperation(dataStream, typeInfoSchema.getIndices(), typeInfoSchema.toResolvedSchema());
}
return createTable(projectOperation); private void validateTimeCharacteristic(boolean isRowtimeDefined) {
if (isRowtimeDefined && this.executionEnvironment.getStreamTimeCharacteristic() != TimeCharacteristic.EventTime) {
throw new ValidationException(
String.format("A rowtime attribute requires an EventTime time characteristic in stream environment. But is: %s", this.executionEnvironment.getStreamTimeCharacteristic()));
}
} }
} }
...@@ -12,6 +12,7 @@ import org.apache.flink.table.data.DecimalData; ...@@ -12,6 +12,7 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
...@@ -49,6 +50,7 @@ import com.dlink.model.Table; ...@@ -49,6 +50,7 @@ import com.dlink.model.Table;
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
public AbstractSinkBuilder() { public AbstractSinkBuilder() {
} }
...@@ -162,17 +164,23 @@ public abstract class AbstractSinkBuilder { ...@@ -162,17 +164,23 @@ public abstract class AbstractSinkBuilder {
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource); SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
} }
...@@ -220,6 +228,9 @@ public abstract class AbstractSinkBuilder { ...@@ -220,6 +228,9 @@ public abstract class AbstractSinkBuilder {
} }
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
......
...@@ -29,7 +29,5 @@ public interface CDCBuilder { ...@@ -29,7 +29,5 @@ public interface CDCBuilder {
Map<String, Map<String, String>> parseMetaDataConfigs(); Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
String getSchemaFieldName(); String getSchemaFieldName();
} }
...@@ -2,8 +2,8 @@ package com.dlink.cdc; ...@@ -2,8 +2,8 @@ package com.dlink.cdc;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.doris.DorisSinkBuilder; import com.dlink.cdc.doris.DorisSinkBuilder;
import com.dlink.cdc.jdbc.JdbcSinkBuilder;
import com.dlink.cdc.kafka.KafkaSinkBuilder; import com.dlink.cdc.kafka.KafkaSinkBuilder;
import com.dlink.cdc.sql.SQLSinkBuilder;
import com.dlink.exception.FlinkClientException; import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
...@@ -17,8 +17,8 @@ public class SinkBuilderFactory { ...@@ -17,8 +17,8 @@ public class SinkBuilderFactory {
private static SinkBuilder[] sinkBuilders = { private static SinkBuilder[] sinkBuilders = {
new KafkaSinkBuilder(), new KafkaSinkBuilder(),
new JdbcSinkBuilder(), new DorisSinkBuilder(),
new DorisSinkBuilder() new SQLSinkBuilder()
}; };
public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) { public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) {
...@@ -30,6 +30,6 @@ public class SinkBuilderFactory { ...@@ -30,6 +30,6 @@ public class SinkBuilderFactory {
return sinkBuilders[i].create(config); return sinkBuilders[i].create(config);
} }
} }
throw new FlinkClientException("未匹配到对应 Sink 类型的【" + config.getSink().get("connector") + "】。"); return new SQLSinkBuilder().create(config);
} }
} }
...@@ -26,7 +26,7 @@ import com.dlink.model.Table; ...@@ -26,7 +26,7 @@ import com.dlink.model.Table;
**/ **/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "doris"; private final static String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L; private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() { public DorisSinkBuilder() {
......
package com.dlink.cdc.jdbc;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.List;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "jdbc";
private final static String TABLE_NAME = "cdc_table";
public JdbcSinkBuilder() {
}
public JdbcSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new JdbcSinkBuilder(config);
}
}
...@@ -34,7 +34,7 @@ import com.dlink.model.Table; ...@@ -34,7 +34,7 @@ import com.dlink.model.Table;
**/ **/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder { public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "kafka"; private final static String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() { public KafkaSinkBuilder() {
} }
......
...@@ -17,7 +17,6 @@ import com.dlink.cdc.CDCBuilder; ...@@ -17,7 +17,6 @@ import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.table.StartupOptions;
...@@ -127,29 +126,6 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -127,29 +126,6 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
return allConfigMap; return allConfigMap;
} }
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
sb.append(" WHERE database_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");
return sb.toString();
}
@Override @Override
public String getSchemaFieldName() { public String getSchemaFieldName() {
return "db"; return "db";
......
...@@ -13,7 +13,6 @@ import com.dlink.cdc.AbstractCDCBuilder; ...@@ -13,7 +13,6 @@ import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions; import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
...@@ -108,26 +107,4 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -108,26 +107,4 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
return allConfigList; return allConfigList;
} }
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
sb.append(" WHERE schema_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");
return sb.toString();
}
} }
package com.dlink.cdc.sql;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil;
/**
* SQLSinkBuilder
*
* @author wenmo
* @since 2022/4/25 23:02
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public void addSink(StreamExecutionEnvironment env, DataStream<RowData> rowDataDataStream, Table table, List<String> columnNameList, List<LogicalType> columnTypeList) {
}
protected DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes));
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "c":
Row irow = Row.withNames(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(columnNameList.get(i), convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withNames(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(columnNameList.get(i), convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withNames(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(columnNameList.get(i), convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withNames(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(columnNameList.get(i), convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
}
}, rowTypeInfo);
}
public void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}
}
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new SQLSinkBuilder(config);
}
@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
}
return dataStreamSource;
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof VarCharType) {
return value;
} else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else {
return value;
}
}
protected String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
return SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
}
}
...@@ -14,6 +14,7 @@ import org.apache.flink.configuration.PipelineOptions; ...@@ -14,6 +14,7 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
...@@ -39,6 +40,8 @@ import org.apache.flink.table.delegation.Executor; ...@@ -39,6 +40,8 @@ import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.AggregateFunction;
...@@ -47,6 +50,7 @@ import org.apache.flink.table.functions.TableFunction; ...@@ -47,6 +50,7 @@ import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
import org.apache.flink.table.operations.JavaExternalQueryOperation; import org.apache.flink.table.operations.JavaExternalQueryOperation;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
...@@ -55,15 +59,18 @@ import org.apache.flink.table.operations.command.ResetOperation; ...@@ -55,15 +59,18 @@ import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation; import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder; import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.planner.delegation.DefaultExecutor; import org.apache.flink.table.planner.delegation.DefaultExecutor;
import org.apache.flink.table.typeutils.FieldInfoUtils;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -343,79 +350,59 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -343,79 +350,59 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
} }
} }
@Override public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields) {
public Table fromChangelogStream(DataStream<Row> dataStream) { JavaDataStreamQueryOperation<T> queryOperation =
return fromStreamInternal(dataStream, null, null, ChangelogMode.all()); asQueryOperation(dataStream, Optional.of(Arrays.asList(fields)));
return createTable(queryOperation);
}
public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
List<Expression> expressions = ExpressionParser.parseExpressionList(fields);
return fromDataStream(dataStream, expressions.toArray(new Expression[0]));
} }
@Override @Override
public <T> void registerDataStream(String name, DataStream<T> dataStream) { public <T> void createTemporaryView(String path, DataStream<T> dataStream, String fields) {
createTemporaryView(name, dataStream); createTemporaryView(path, fromDataStream(dataStream, fields));
} }
@Override @Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream) { public <T> void createTemporaryView(
createTemporaryView( String path, DataStream<T> dataStream, Expression... fields) {
path, fromStreamInternal(dataStream, null, path, ChangelogMode.insertOnly())); createTemporaryView(path, fromDataStream(dataStream, fields));
} }
private <T> Table fromStreamInternal( private <T> JavaDataStreamQueryOperation<T> asQueryOperation(
DataStream<T> dataStream, DataStream<T> dataStream, Optional<List<Expression>> fields) {
@Nullable Schema schema, TypeInformation<T> streamType = dataStream.getType();
@Nullable String viewPath,
ChangelogMode changelogMode) { // get field names and types for all non-replaced fields
Preconditions.checkNotNull(dataStream, "Data stream must not be null."); FieldInfoUtils.TypeInfoSchema typeInfoSchema =
Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null."); fields.map(
f -> {
FieldInfoUtils.TypeInfoSchema fieldsInfo =
FieldInfoUtils.getFieldsInfo(
streamType, f.toArray(new Expression[0]));
// check if event-time is enabled
validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
return fieldsInfo;
})
.orElseGet(() -> FieldInfoUtils.getFieldsInfo(streamType));
return new JavaDataStreamQueryOperation<>(
dataStream, typeInfoSchema.getIndices(), typeInfoSchema.toResolvedSchema());
}
if (dataStream.getExecutionEnvironment() != executionEnvironment) { private void validateTimeCharacteristic(boolean isRowtimeDefined) {
if (isRowtimeDefined
&& executionEnvironment.getStreamTimeCharacteristic()
!= TimeCharacteristic.EventTime) {
throw new ValidationException( throw new ValidationException(
"The DataStream's StreamExecutionEnvironment must be identical to the one that " String.format(
+ "has been passed to the StreamTableEnvironment during instantiation."); "A rowtime attribute requires an EventTime time characteristic in stream environment. But is: %s",
executionEnvironment.getStreamTimeCharacteristic()));
} }
final CatalogManager catalogManager = getCatalogManager();
final SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder();
final UnresolvedIdentifier unresolvedIdentifier;
if (viewPath != null) {
unresolvedIdentifier = getParser().parseIdentifier(viewPath);
} else {
unresolvedIdentifier =
UnresolvedIdentifier.of("Unregistered_DataStream_Source_" + dataStream.getId());
}
final ObjectIdentifier objectIdentifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
final SchemaTranslator.ConsumingResult schemaTranslationResult =
SchemaTranslator.createConsumingResult(
catalogManager.getDataTypeFactory(), dataStream.getType(), schema);
final ResolvedSchema resolvedSchema =
schemaTranslationResult.getSchema().resolve(schemaResolver);
final QueryOperation scanOperation =
new JavaExternalQueryOperation<>(
objectIdentifier,
dataStream,
schemaTranslationResult.getPhysicalDataType(),
schemaTranslationResult.isTopLevelRecord(),
changelogMode,
resolvedSchema);
final List<String> projections = schemaTranslationResult.getProjections();
if (projections == null) {
return createTable(scanOperation);
}
final QueryOperation projectOperation =
operationTreeBuilder.project(
projections.stream()
.map(ApiExpressionUtils::unresolvedRef)
.collect(Collectors.toList()),
scanOperation);
return createTable(projectOperation);
} }
} }
...@@ -6,6 +6,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -6,6 +6,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableConfig;
...@@ -14,6 +15,7 @@ import org.apache.flink.table.catalog.Catalog; ...@@ -14,6 +15,7 @@ import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import java.util.List; import java.util.List;
...@@ -63,11 +65,11 @@ public interface CustomTableEnvironment { ...@@ -63,11 +65,11 @@ public interface CustomTableEnvironment {
StatementSet createStatementSet(); StatementSet createStatementSet();
Table fromChangelogStream(DataStream<Row> dataStream); <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);
<T> void registerDataStream(String name, DataStream<T> dataStream); <T> void createTemporaryView(String path, DataStream<T> dataStream, String fields);
<T> void createTemporaryView(String path, DataStream<T> dataStream); // <T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
Parser getParser(); Parser getParser();
......
...@@ -131,6 +131,39 @@ public class FlinkCDCConfig { ...@@ -131,6 +131,39 @@ public class FlinkCDCConfig {
return sink; return sink;
} }
private boolean skip(String key) {
switch (key) {
case "db":
case "table.prefix":
case "table.suffix":
case "table.upper":
case "table.lower":
return true;
default:
return false;
}
}
public String getSinkConfigurationString() {
StringBuilder sb = new StringBuilder();
int index = 0;
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (skip(entry.getKey())) {
continue;
}
if (index > 0) {
sb.append(",");
}
sb.append("'");
sb.append(entry.getKey());
sb.append("' = '");
sb.append(entry.getValue());
sb.append("'\n");
index++;
}
return sb.toString();
}
public void setSink(Map<String, String> sink) { public void setSink(Map<String, String> sink) {
this.sink = sink; this.sink = sink;
} }
......
...@@ -79,8 +79,12 @@ public class Table implements Serializable, Comparable<Table> { ...@@ -79,8 +79,12 @@ public class Table implements Serializable, Comparable<Table> {
} }
public String getFlinkTableSql(String flinkConfig) { public String getFlinkTableSql(String flinkConfig) {
return getFlinkDDL(flinkConfig,name);
}
public String getFlinkDDL(String flinkConfig, String tableName) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS " + name + " (\n"); sb.append("CREATE TABLE IF NOT EXISTS " + tableName + " (\n");
List<String> pks = new ArrayList<>(); List<String> pks = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) { for (int i = 0; i < columns.size(); i++) {
String type = columns.get(i).getJavaType().getFlinkType(); String type = columns.get(i).getJavaType().getFlinkType();
...@@ -122,7 +126,7 @@ public class Table implements Serializable, Comparable<Table> { ...@@ -122,7 +126,7 @@ public class Table implements Serializable, Comparable<Table> {
} }
} }
sb.append(" WITH (\n"); sb.append(" WITH (\n");
sb.append(getFlinkTableWith(flinkConfig)); sb.append(flinkConfig);
sb.append(")\n"); sb.append(")\n");
return sb.toString(); return sb.toString();
} }
...@@ -204,9 +208,9 @@ public class Table implements Serializable, Comparable<Table> { ...@@ -204,9 +208,9 @@ public class Table implements Serializable, Comparable<Table> {
return sb.toString(); return sb.toString();
} }
public String getCDCSqlInsertIntoBySourceName(String sourceName, String schemaName, String tableName) { public String getCDCSqlInsert(String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO "); StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(name); sb.append(targetName);
sb.append(" SELECT\n"); sb.append(" SELECT\n");
for (int i = 0; i < columns.size(); i++) { for (int i = 0; i < columns.size(); i++) {
sb.append(" "); sb.append(" ");
...@@ -217,11 +221,6 @@ public class Table implements Serializable, Comparable<Table> { ...@@ -217,11 +221,6 @@ public class Table implements Serializable, Comparable<Table> {
} }
sb.append(" FROM "); sb.append(" FROM ");
sb.append(sourceName); sb.append(sourceName);
sb.append(" WHERE database_name = '");
sb.append(schemaName);
sb.append("' and table_name = '");
sb.append(tableName);
sb.append("'");
return sb.toString(); return sb.toString();
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment