Commit 3e62e609 authored by wenmo's avatar wenmo

[Feature-442][client,executor] CDCSource sync hudi

parent 2d536dde
...@@ -151,8 +151,9 @@ public abstract class AbstractSinkBuilder { ...@@ -151,8 +151,9 @@ public abstract class AbstractSinkBuilder {
} }
public abstract void addSink( public abstract void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream, DataStream<RowData> rowDataDataStream,
String schemaTableName, Table table,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList); List<LogicalType> columnTypeList);
...@@ -172,7 +173,7 @@ public abstract class AbstractSinkBuilder { ...@@ -172,7 +173,7 @@ public abstract class AbstractSinkBuilder {
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(rowDataDataStream, table.getSchemaTableName(), columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
} }
} }
......
...@@ -2,6 +2,7 @@ package com.dlink.cdc; ...@@ -2,6 +2,7 @@ package com.dlink.cdc;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.doris.DorisSinkBuilder; import com.dlink.cdc.doris.DorisSinkBuilder;
import com.dlink.cdc.hudi.HudiSinkBuilder;
import com.dlink.cdc.jdbc.JdbcSinkBuilder; import com.dlink.cdc.jdbc.JdbcSinkBuilder;
import com.dlink.cdc.kafka.KafkaSinkBuilder; import com.dlink.cdc.kafka.KafkaSinkBuilder;
import com.dlink.exception.FlinkClientException; import com.dlink.exception.FlinkClientException;
...@@ -19,6 +20,7 @@ public class SinkBuilderFactory { ...@@ -19,6 +20,7 @@ public class SinkBuilderFactory {
new KafkaSinkBuilder(), new KafkaSinkBuilder(),
new JdbcSinkBuilder(), new JdbcSinkBuilder(),
new DorisSinkBuilder(), new DorisSinkBuilder(),
new HudiSinkBuilder(),
}; };
public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) { public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) {
......
...@@ -5,6 +5,7 @@ import org.apache.doris.flink.cfg.DorisOptions; ...@@ -5,6 +5,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink; import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
...@@ -15,6 +16,7 @@ import java.util.Map; ...@@ -15,6 +16,7 @@ import java.util.Map;
import com.dlink.cdc.AbstractSinkBuilder; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder; import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* DorisSinkBuilder * DorisSinkBuilder
...@@ -46,8 +48,9 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -46,8 +48,9 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
@Override @Override
public void addSink( public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream, DataStream<RowData> rowDataDataStream,
String schemaTableName, Table table,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
...@@ -78,7 +81,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -78,7 +81,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
dorisExecutionOptionsBuilder.build(), dorisExecutionOptionsBuilder.build(),
DorisOptions.builder() DorisOptions.builder()
.setFenodes(config.getSink().get("fenodes")) .setFenodes(config.getSink().get("fenodes"))
.setTableIdentifier(schemaTableName) .setTableIdentifier(table.getSchemaTableName())
.setUsername(config.getSink().get("username")) .setUsername(config.getSink().get("username"))
.setPassword(config.getSink().get("password")).build() .setPassword(config.getSink().get("password")).build()
)); ));
......
package com.dlink.cdc.hudi;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.Pipelines;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.cdc.doris.DorisSinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/**
* HudiSinkBuilder
*
* @author wenmo
* @since 2022/4/22 23:50
*/
public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "hudi";
private static final long serialVersionUID = 5324199407472847422L;
public HudiSinkBuilder() {
}
public HudiSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new HudiSinkBuilder(config);
}
@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
Integer parallelism = 1;
boolean isMor = true;
Map<String, String> sink = config.getSink();
Configuration configuration = Configuration.fromMap(sink);
if (sink.containsKey("parallelism")) {
parallelism = Integer.valueOf(sink.get("parallelism"));
}
if (configuration.contains(FlinkOptions.PATH)) {
configuration.set(FlinkOptions.PATH, configuration.getValue(FlinkOptions.PATH) + table.getSchemaTableNameWithUnderline());
}
if (sink.containsKey(FlinkOptions.TABLE_TYPE.key())) {
isMor = HoodieTableType.MERGE_ON_READ.name().equals(sink.get(FlinkOptions.TABLE_TYPE.key()));
}
configuration.set(FlinkOptions.TABLE_NAME, table.getSchemaTableNameWithUnderline());
configuration.set(FlinkOptions.HIVE_SYNC_DB, table.getSchema());
configuration.set(FlinkOptions.HIVE_SYNC_TABLE, table.getName());
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, RowType.of(columnTypes, columnNames), parallelism, rowDataDataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism, hoodieRecordDataStream);
if (isMor) {
Pipelines.clean(configuration, pipeline);
Pipelines.compact(configuration, pipeline);
}
}
}
...@@ -60,8 +60,9 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -60,8 +60,9 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
@Override @Override
public void addSink( public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream, DataStream<RowData> rowDataDataStream,
String schemaTableName, Table table,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
...@@ -116,7 +117,7 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -116,7 +117,7 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
if (sink.containsKey("username")) { if (sink.containsKey("username")) {
jdbcOptionsBuilder.setUsername(sink.get("username")); jdbcOptionsBuilder.setUsername(sink.get("username"));
} }
jdbcOptionsBuilder.setTableName(schemaTableName); jdbcOptionsBuilder.setTableName(table.getSchemaTableName());
builder.setOptions(jdbcOptionsBuilder.build()); builder.setOptions(jdbcOptionsBuilder.build());
builder.setTableSchema(TableSchema.fromTypeInfo(rowDataDataStream.getType())); builder.setTableSchema(TableSchema.fromTypeInfo(rowDataDataStream.getType()));
/*JdbcUpsertTableSink build = builder.build(); /*JdbcUpsertTableSink build = builder.build();
......
...@@ -93,7 +93,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -93,7 +93,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
} }
}); });
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
table.getSchemaTableName(), table.getSchemaTableNameWithUnderline(),
new SimpleStringSchema())); new SimpleStringSchema()));
} }
} }
...@@ -104,8 +104,9 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -104,8 +104,9 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
@Override @Override
public void addSink( public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream, DataStream<RowData> rowDataDataStream,
String schemaTableName, Table table,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
} }
......
...@@ -48,6 +48,10 @@ public class Table implements Serializable, Comparable<Table> { ...@@ -48,6 +48,10 @@ public class Table implements Serializable, Comparable<Table> {
return Asserts.isNullString(schema) ? name : schema + "." + name; return Asserts.isNullString(schema) ? name : schema + "." + name;
} }
public String getSchemaTableNameWithUnderline() {
return Asserts.isNullString(schema) ? name : schema + "_" + name;
}
@Override @Override
public int compareTo(Table o) { public int compareTo(Table o) {
return this.name.compareTo(o.getName()); return this.name.compareTo(o.getName());
......
...@@ -120,5 +120,10 @@ ...@@ -120,5 +120,10 @@
<artifactId>flink-doris-connector-1.13_2.12</artifactId> <artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version> <version>1.0.3</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink_2.11</artifactId>
<version>0.10.1</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment