Unverified Commit 0474e811 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-442][client,executor] Add SOURCE_AVRO_SCHEMA

[Fix-442][client,executor] Add SOURCE_AVRO_SCHEMA
parents 129e1347 89cfb186
......@@ -10,6 +10,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter;
import java.io.Serializable;
import java.util.List;
......@@ -17,7 +18,6 @@ import java.util.Map;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.cdc.doris.DorisSinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
......@@ -76,8 +76,11 @@ public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
configuration.set(FlinkOptions.TABLE_NAME, table.getSchemaTableNameWithUnderline());
configuration.set(FlinkOptions.HIVE_SYNC_DB, table.getSchema());
configuration.set(FlinkOptions.HIVE_SYNC_TABLE, table.getName());
RowType rowType = RowType.of(false, columnTypes, columnNames);
configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA,
AvroSchemaConverter.convertToSchema(rowType, table.getSchemaTableNameWithUnderline()).toString());
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, RowType.of(columnTypes, columnNames), parallelism, rowDataDataStream);
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism, rowDataDataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism, hoodieRecordDataStream);
if (isMor) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment