Commit 8e5b2ca2 authored by wenmo's avatar wenmo

[Feature-445][client] CDCSource sync add sink table-name rule

parent 0474e811
......@@ -151,8 +151,9 @@ public abstract class AbstractSinkBuilder {
}
public abstract void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
String schemaTableName,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList);
......@@ -172,7 +173,7 @@ public abstract class AbstractSinkBuilder {
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(rowDataDataStream, table.getSchemaTableName(), columnNameList, columnTypeList);
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
}
}
}
......@@ -230,4 +231,38 @@ public abstract class AbstractSinkBuilder {
return value;
}
}
protected String getSinkSchemaName(Table table) {
String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db");
}
return schemaName;
}
protected String getSinkTableName(Table table) {
String tableName = table.getName();
if (config.getSink().containsKey("table.prefix.schema")) {
if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) {
tableName = table.getSchema() + "_" + tableName;
}
}
if (config.getSink().containsKey("table.prefix")) {
tableName = config.getSink().get("table.prefix") + tableName;
}
if (config.getSink().containsKey("table.suffix")) {
tableName = tableName + config.getSink().get("table.suffix");
}
if (config.getSink().containsKey("table.lower")) {
if (Boolean.valueOf(config.getSink().get("table.lower"))) {
tableName = tableName.toLowerCase();
}
}
if (config.getSink().containsKey("table.upper")) {
if (Boolean.valueOf(config.getSink().get("table.upper"))) {
tableName = tableName.toUpperCase();
}
}
return tableName;
}
}
......@@ -5,6 +5,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
......@@ -15,6 +16,7 @@ import java.util.Map;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/**
* DorisSinkBuilder
......@@ -46,8 +48,9 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
String schemaTableName,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
......@@ -78,7 +81,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
dorisExecutionOptionsBuilder.build(),
DorisOptions.builder()
.setFenodes(config.getSink().get("fenodes"))
.setTableIdentifier(schemaTableName)
.setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table))
.setUsername(config.getSink().get("username"))
.setPassword(config.getSink().get("password")).build()
));
......
......@@ -43,7 +43,12 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public void addSink(DataStream<RowData> rowDataDataStream, String schemaTableName, List<String> columnNameList, List<LogicalType> columnTypeList) {
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
}
......@@ -98,7 +103,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
table.getSchemaTableName(),
getSinkTableName(table),
new SimpleStringSchema()));
}
}
......
......@@ -151,8 +151,9 @@ public abstract class AbstractSinkBuilder {
}
public abstract void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
String schemaTableName,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList);
......@@ -172,7 +173,7 @@ public abstract class AbstractSinkBuilder {
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(rowDataDataStream, table.getSchemaTableName(), columnNameList, columnTypeList);
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
}
}
}
......@@ -230,4 +231,38 @@ public abstract class AbstractSinkBuilder {
return value;
}
}
protected String getSinkSchemaName(Table table) {
String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db");
}
return schemaName;
}
protected String getSinkTableName(Table table) {
String tableName = table.getName();
if (config.getSink().containsKey("table.prefix.schema")) {
if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) {
tableName = table.getSchema() + "_" + tableName;
}
}
if (config.getSink().containsKey("table.prefix")) {
tableName = config.getSink().get("table.prefix") + tableName;
}
if (config.getSink().containsKey("table.suffix")) {
tableName = tableName + config.getSink().get("table.suffix");
}
if (config.getSink().containsKey("table.lower")) {
if (Boolean.valueOf(config.getSink().get("table.lower"))) {
tableName = tableName.toLowerCase();
}
}
if (config.getSink().containsKey("table.upper")) {
if (Boolean.valueOf(config.getSink().get("table.upper"))) {
tableName = tableName.toUpperCase();
}
}
return tableName;
}
}
......@@ -5,6 +5,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
......@@ -15,6 +16,7 @@ import java.util.Map;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/**
* DorisSinkBuilder
......@@ -46,8 +48,9 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
String schemaTableName,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
......@@ -78,7 +81,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
dorisExecutionOptionsBuilder.build(),
DorisOptions.builder()
.setFenodes(config.getSink().get("fenodes"))
.setTableIdentifier(schemaTableName)
.setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table))
.setUsername(config.getSink().get("username"))
.setPassword(config.getSink().get("password")).build()
));
......
......@@ -43,7 +43,12 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public void addSink(DataStream<RowData> rowDataDataStream, String schemaTableName, List<String> columnNameList, List<LogicalType> columnTypeList) {
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
}
......@@ -98,7 +103,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
table.getSchemaTableName(),
getSinkTableName(table),
new SimpleStringSchema()));
}
}
......
......@@ -231,4 +231,38 @@ public abstract class AbstractSinkBuilder {
return value;
}
}
protected String getSinkSchemaName(Table table){
String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db");
}
return schemaName;
}
protected String getSinkTableName(Table table){
String tableName = table.getName();
if (config.getSink().containsKey("table.prefix.schema")) {
if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) {
tableName = table.getSchema() + "_" + tableName;
}
}
if (config.getSink().containsKey("table.prefix")) {
tableName = config.getSink().get("table.prefix") + tableName;
}
if (config.getSink().containsKey("table.suffix")) {
tableName = tableName + config.getSink().get("table.suffix");
}
if (config.getSink().containsKey("table.lower")) {
if (Boolean.valueOf(config.getSink().get("table.lower"))) {
tableName = tableName.toLowerCase();
}
}
if (config.getSink().containsKey("table.upper")) {
if (Boolean.valueOf(config.getSink().get("table.upper"))) {
tableName = tableName.toUpperCase();
}
}
return tableName;
}
}
......@@ -4,6 +4,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
......@@ -81,7 +82,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
dorisExecutionOptionsBuilder.build(),
DorisOptions.builder()
.setFenodes(config.getSink().get("fenodes"))
.setTableIdentifier(table.getSchemaTableName())
.setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table))
.setUsername(config.getSink().get("username"))
.setPassword(config.getSink().get("password")).build()
));
......
......@@ -60,6 +60,8 @@ public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
final String tableName = getSinkTableName(table);
Integer parallelism = 1;
boolean isMor = true;
Map<String, String> sink = config.getSink();
......@@ -68,17 +70,17 @@ public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
parallelism = Integer.valueOf(sink.get("parallelism"));
}
if (configuration.contains(FlinkOptions.PATH)) {
configuration.set(FlinkOptions.PATH, configuration.getValue(FlinkOptions.PATH) + table.getSchemaTableNameWithUnderline());
configuration.set(FlinkOptions.PATH, configuration.getValue(FlinkOptions.PATH) + tableName);
}
if (sink.containsKey(FlinkOptions.TABLE_TYPE.key())) {
isMor = HoodieTableType.MERGE_ON_READ.name().equals(sink.get(FlinkOptions.TABLE_TYPE.key()));
}
configuration.set(FlinkOptions.TABLE_NAME, table.getSchemaTableNameWithUnderline());
configuration.set(FlinkOptions.HIVE_SYNC_DB, table.getSchema());
configuration.set(FlinkOptions.HIVE_SYNC_TABLE, table.getName());
configuration.set(FlinkOptions.TABLE_NAME, tableName);
configuration.set(FlinkOptions.HIVE_SYNC_DB, getSinkSchemaName(table));
configuration.set(FlinkOptions.HIVE_SYNC_TABLE, tableName);
RowType rowType = RowType.of(false, columnTypes, columnNames);
configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA,
AvroSchemaConverter.convertToSchema(rowType, table.getSchemaTableNameWithUnderline()).toString());
AvroSchemaConverter.convertToSchema(rowType, tableName).toString());
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism, rowDataDataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism, hoodieRecordDataStream);
......
......@@ -93,7 +93,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
table.getSchemaTableNameWithUnderline(),
getSinkTableName(table),
new SimpleStringSchema()));
}
}
......
......@@ -151,8 +151,9 @@ public abstract class AbstractSinkBuilder {
}
public abstract void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
String schemaTableName,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList);
......@@ -172,7 +173,7 @@ public abstract class AbstractSinkBuilder {
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(rowDataDataStream, table.getSchemaTableName(), columnNameList, columnTypeList);
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
}
}
}
......@@ -230,4 +231,38 @@ public abstract class AbstractSinkBuilder {
return value;
}
}
protected String getSinkSchemaName(Table table) {
String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db");
}
return schemaName;
}
protected String getSinkTableName(Table table) {
String tableName = table.getName();
if (config.getSink().containsKey("table.prefix.schema")) {
if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) {
tableName = table.getSchema() + "_" + tableName;
}
}
if (config.getSink().containsKey("table.prefix")) {
tableName = config.getSink().get("table.prefix") + tableName;
}
if (config.getSink().containsKey("table.suffix")) {
tableName = tableName + config.getSink().get("table.suffix");
}
if (config.getSink().containsKey("table.lower")) {
if (Boolean.valueOf(config.getSink().get("table.lower"))) {
tableName = tableName.toLowerCase();
}
}
if (config.getSink().containsKey("table.upper")) {
if (Boolean.valueOf(config.getSink().get("table.upper"))) {
tableName = tableName.toUpperCase();
}
}
return tableName;
}
}
......@@ -5,6 +5,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
......@@ -15,6 +16,7 @@ import java.util.Map;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/**
* DorisSinkBuilder
......@@ -46,8 +48,9 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
String schemaTableName,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
......@@ -78,7 +81,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
dorisExecutionOptionsBuilder.build(),
DorisOptions.builder()
.setFenodes(config.getSink().get("fenodes"))
.setTableIdentifier(schemaTableName)
.setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table))
.setUsername(config.getSink().get("username"))
.setPassword(config.getSink().get("password")).build()
));
......
package com.dlink.cdc.jdbc;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
......@@ -9,6 +10,7 @@ import java.util.List;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/**
* MysqlCDCBuilder
......@@ -29,7 +31,12 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public void addSink(DataStream<RowData> rowDataDataStream, String schemaTableName, List<String> columnNameList, List<LogicalType> columnTypeList) {
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
}
......
......@@ -44,7 +44,12 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public void addSink(DataStream<RowData> rowDataDataStream, String schemaTableName, List<String> columnNameList, List<LogicalType> columnTypeList) {
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
}
......@@ -106,7 +111,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
stringOperator.sinkTo(KafkaSink.<String>builder()
.setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(table.getSchemaTableName())
.setTopic(getSinkTableName(table))
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment