Unverified Commit d7b81fcb authored by zhongjingq's avatar zhongjingq Committed by GitHub

[Fix][core,client] Resolve the exception when starting from the specified savepoint (#1041)

* fix :解决从指定的savepoint恢复任务时,未设置savepint文件路径,以及从savepoint启动未匹配到算子状态跳过启动

* 增加mysql cdc的参数配置

* 优化datastream kafka-json 和datastream starrocks,能被合并进去

* 修复sinkbuilder返回对象和方法重命名
parent 20f7991c
...@@ -757,6 +757,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -757,6 +757,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
break; break;
case CUSTOM: case CUSTOM:
config.setSavePointPath(config.getSavePointPath());
config.getConfig().put("execution.savepoint.path", config.getSavePointPath()); config.getConfig().put("execution.savepoint.path", config.getSavePointPath());
break; break;
default: default:
......
...@@ -5,10 +5,10 @@ import com.dlink.cdc.AbstractSinkBuilder; ...@@ -5,10 +5,10 @@ import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder; import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.utils.ObjectConvertUtil;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
...@@ -20,17 +20,34 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -20,17 +20,34 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.xml.bind.DatatypeConverter;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
...@@ -42,7 +59,6 @@ import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; ...@@ -42,7 +59,6 @@ import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private static final String KEY_WORD = "datastream-kafka-json"; private static final String KEY_WORD = "datastream-kafka-json";
private transient ObjectMapper objectMapper;
public KafkaSinkJsonBuilder() { public KafkaSinkJsonBuilder() {
} }
...@@ -71,14 +87,18 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui ...@@ -71,14 +87,18 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() { SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override @Override
public Map map(String value) throws Exception { public Map map(String value) throws Exception {
if (objectMapper == null) { ObjectMapper objectMapper = new ObjectMapper();
initializeObjectMapper();
}
return objectMapper.readValue(value, Map.class); return objectMapper.readValue(value, Map.class);
} }
}); });
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
SingleOutputStreamOperator<String> process = mapOperator.process(new KafkaProcessFunction(schemaList));
process.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
config.getSink().get("topic"),
new SimpleStringSchema()));
} else {
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
...@@ -92,39 +112,123 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui ...@@ -92,39 +112,123 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
&& schemaName.equals(source.get(schemaFieldName).toString()); && schemaName.equals(source.get(schemaFieldName).toString());
} }
}); });
String topic = getSinkTableName(table); SingleOutputStreamOperator<String> stringOperator = filterOperator.process(new KafkaProcessFunction(schemaList));
if (Asserts.isNotNullString(config.getSink().get("topic"))) { stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
topic = config.getSink().get("topic"); getSinkTableName(table),
new SimpleStringSchema()));
} }
List<String> columnNameList = new LinkedList<>(); }
List<LogicalType> columnTypeList = new LinkedList<>(); }
buildColumn(columnNameList, columnTypeList, table.getColumns()); }
SingleOutputStreamOperator<String> stringOperator = filterOperator.process(new ProcessFunction<Map, String>() { } catch (Exception ex) {
logger.error("kafka sink error:",ex);
}
return dataStreamSource;
}
private static class KafkaProcessFunction extends ProcessFunction<Map, String> {
private ObjectMapper objectMapper;
private List<Schema> schemaList;
public KafkaProcessFunction(List<Schema> schemaList) {
this.schemaList = schemaList;
}
private void initializeObjectMapper() {
this.objectMapper = new ObjectMapper();
JavaTimeModule javaTimeModule = new JavaTimeModule();
// Hack time module to allow 'Z' at the end of string (i.e. javascript json's)
javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ISO_DATE_TIME));
objectMapper.registerModule(javaTimeModule);
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
}
@Override @Override
public void processElement(Map value, Context context, Collector<String> collector) throws Exception { public void processElement(Map value, Context ctx, Collector<String> out) throws Exception {
Map after = null; Map after = null;
Map before = null; Map before = null;
String tsMs = value.get("ts_ms").toString(); String tsMs = value.get("ts_ms").toString();
try { try {
LinkedHashMap source = (LinkedHashMap) value.get("source");
String tableName = null;
String schemaName = null;
Table tableObject = null;
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
final String tableName1 = table.getName();
final String schemaName1 = table.getSchema();
if (tableName1.equals(source.get("table").toString()) && schemaName1.equals(source.get("db").toString())) {
tableObject = table;
tableName = tableName1;
schemaName = schemaName1;
break;
}
}
}
if (tableObject != null) {
List<String> columnNameList = new LinkedList<>();
List<LogicalType> columnTypeList = new LinkedList<>();
buildColumn(columnNameList, columnTypeList, tableObject.getColumns());
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
after = (Map) value.get("after"); after = (Map) value.get("after");
convertAttr(columnNameList, columnTypeList, after, value.get("op").toString(), 0, schemaName, tableName, tsMs); for (int i = 0; i < columnNameList.size(); i++) {
String columnName = columnNameList.get(i);
Object columnNameValue = after.remove(columnName);
Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i));
after.put(columnName, columnNameNewVal);
}
after.put("__op",Integer.valueOf(0));
after.put("is_deleted",Integer.valueOf(0));
after.put("db",schemaName);
after.put("table",tableName);
after.put("ts_ms",tsMs);
break; break;
case "u": case "u":
before = (Map) value.get("before"); before = (Map) value.get("before");
convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs); for (int i = 0; i < columnNameList.size(); i++) {
String columnName = columnNameList.get(i);
Object columnNameValue = before.remove(columnName);
Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i));
before.put(columnName, columnNameNewVal);
}
before.put("__op",Integer.valueOf(1));
before.put("is_deleted",Integer.valueOf(1));
before.put("db",schemaName);
before.put("table",tableName);
before.put("ts_ms",tsMs);
after = (Map) value.get("after"); after = (Map) value.get("after");
convertAttr(columnNameList, columnTypeList, after, value.get("op").toString(), 0, schemaName, tableName, tsMs); for (int i = 0; i < columnNameList.size(); i++) {
String columnName = columnNameList.get(i);
Object columnNameValue = after.remove(columnName);
Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i));
after.put(columnName, columnNameNewVal);
}
after.put("__op",Integer.valueOf(0));
after.put("is_deleted",Integer.valueOf(0));
after.put("db",schemaName);
after.put("table",tableName);
after.put("ts_ms",tsMs);
break; break;
case "d": case "d":
before = (Map) value.get("before"); before = (Map) value.get("before");
convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs); for (int i = 0; i < columnNameList.size(); i++) {
String columnName = columnNameList.get(i);
Object columnNameValue = before.remove(columnName);
Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i));
before.put(columnName, columnNameNewVal);
}
before.put("__op",Integer.valueOf(1));
before.put("is_deleted",Integer.valueOf(1));
before.put("db",schemaName);
before.put("table",tableName);
before.put("ts_ms",tsMs);
break; break;
default: default:
} }
}
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Exception:", e); logger.error("SchameTable: {} - Exception:", e);
throw e; throw e;
...@@ -133,32 +237,102 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui ...@@ -133,32 +237,102 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
initializeObjectMapper(); initializeObjectMapper();
} }
if (before != null) { if (before != null) {
collector.collect(objectMapper.writeValueAsString(before)); out.collect(objectMapper.writeValueAsString(before));
} }
if (after != null) { if (after != null) {
collector.collect(objectMapper.writeValueAsString(after)); out.collect(objectMapper.writeValueAsString(after));
} }
} }
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), protected void buildColumn(List<String> columnNameList, List<LogicalType> columnTypeList, List<Column> columns) {
topic, for (Column column : columns) {
new SimpleStringSchema())); columnNameList.add(column.getName());
columnTypeList.add(getLogicalType(column));
} }
} }
public LogicalType getLogicalType(Column column) {
switch (column.getJavaType()) {
case STRING:
return new VarCharType();
case BOOLEAN:
case JAVA_LANG_BOOLEAN:
return new BooleanType();
case BYTE:
case JAVA_LANG_BYTE:
return new TinyIntType();
case SHORT:
case JAVA_LANG_SHORT:
return new SmallIntType();
case LONG:
case JAVA_LANG_LONG:
return new BigIntType();
case FLOAT:
case JAVA_LANG_FLOAT:
return new FloatType();
case DOUBLE:
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
if (column.getPrecision() == null || column.getPrecision() == 0) {
return new DecimalType(38, column.getScale());
} else {
return new DecimalType(column.getPrecision(), column.getScale());
} }
} catch (Exception ex) { case INT:
logger.error("kafka sink error:",ex); case INTEGER:
return new IntType();
case DATE:
case LOCALDATE:
return new DateType();
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
return new VarCharType();
} }
return dataStreamSource;
} }
private void initializeObjectMapper() { protected Object convertValue(Object value, LogicalType logicalType) {
this.objectMapper = new ObjectMapper(); if (value == null) {
JavaTimeModule javaTimeModule = new JavaTimeModule(); return null;
// Hack time module to allow 'Z' at the end of string (i.e. javascript json's) }
javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ISO_DATE_TIME)); if (logicalType instanceof DateType) {
objectMapper.registerModule(javaTimeModule); ZoneId utc = ZoneId.of("UTC");
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(utc).toLocalDate();
} else {
return Instant.ofEpochMilli((long) value).atZone(utc).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.of("UTC")).toLocalDateTime();
} else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.of("UTC")).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if (value instanceof Integer) {
return ((Integer) value).longValue();
} else {
return value;
}
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else {
return value;
}
}
} }
@Override @Override
...@@ -169,24 +343,4 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui ...@@ -169,24 +343,4 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
} }
@Override
protected Object convertValue(Object value, LogicalType logicalType) {
return ObjectConvertUtil.convertValue(value,logicalType);
}
private void convertAttr(List<String> columnNameList, List<LogicalType> columnTypeList, Map value, String op, int isDeleted,
String schemaName, String tableName, String tsMs) {
for (int i = 0; i < columnNameList.size(); i++) {
String columnName = columnNameList.get(i);
Object columnNameValue = value.remove(columnName);
Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i));
value.put(columnName, columnNameNewVal);
}
value.put("__op",op);
value.put("is_deleted",Integer.valueOf(isDeleted));
value.put("db",schemaName);
value.put("table",tableName);
value.put("ts_ms",tsMs);
}
} }
...@@ -80,6 +80,10 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -80,6 +80,10 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
String connectMaxRetries = config.getSource().get("connect.max-retries"); String connectMaxRetries = config.getSource().get("connect.max-retries");
String connectionPoolSize = config.getSource().get("connection.pool.size"); String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval"); String heartbeatInterval = config.getSource().get("heartbeat.interval");
String chunkSize = config.getSource().get("scan.incremental.snapshot.chunk.size");
String distributionFactorLower = config.getSource().get("chunk-key.even-distribution.factor.upper-bound");
String distributionFactorUpper = config.getSource().get("chunk-key.even-distribution.factor.lower-bound");
String scanNewlyAddedTableEnabled = config.getSource().get("scan.newly-added-table.enabled");
Properties debeziumProperties = new Properties(); Properties debeziumProperties = new Properties();
// 为部分转换添加默认值 // 为部分转换添加默认值
...@@ -123,7 +127,9 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -123,7 +127,9 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
sourceBuilder.deserializer(new MysqlJsonDebeziumDeserializationSchema()); sourceBuilder.deserializer(new MysqlJsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(debeziumProperties); sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties); sourceBuilder.jdbcProperties(jdbcProperties);
if ("true".equalsIgnoreCase(scanNewlyAddedTableEnabled)) {
sourceBuilder.scanNewlyAddedTableEnabled(true);
}
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) { switch (config.getStartupMode().toLowerCase()) {
case "initial": case "initial":
...@@ -165,7 +171,15 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -165,7 +171,15 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
if (Asserts.isNotNullString(heartbeatInterval)) { if (Asserts.isNotNullString(heartbeatInterval)) {
sourceBuilder.heartbeatInterval(Duration.ofMillis(Long.valueOf(heartbeatInterval))); sourceBuilder.heartbeatInterval(Duration.ofMillis(Long.valueOf(heartbeatInterval)));
} }
if (Asserts.isAllNotNullString(chunkSize)) {
sourceBuilder.splitSize(Integer.parseInt(chunkSize));
}
if (Asserts.isNotNullString(distributionFactorLower)) {
sourceBuilder.distributionFactorLower(Double.valueOf(distributionFactorLower));
}
if (Asserts.isNotNullString(distributionFactorUpper)) {
sourceBuilder.distributionFactorUpper(Double.valueOf(distributionFactorUpper));
}
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source"); return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
} }
......
package com.dlink.cdc.starrocks; package com.dlink.cdc.starrocks;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder; import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column; import com.dlink.model.Column;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.utils.ObjectConvertUtil; import com.dlink.utils.JSONUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.xml.bind.DatatypeConverter;
import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer; import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction; import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
...@@ -58,6 +73,44 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui ...@@ -58,6 +73,44 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
return new StarrocksSinkBuilder(config); return new StarrocksSinkBuilder(config);
} }
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
String mergeFlag = config.getSink().get("sink.merge.table");
if ("true".equalsIgnoreCase(mergeFlag)) {
//取第一个table
Table table = schemaList.get(0).getTables().get(0);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(mapOperator, columnNameList, columnTypeList, table.getSchemaTableName());
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} else {
final String schemaFieldName = config.getSchemaFieldName();
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName());
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
}
}
}
}
return dataStreamSource;
}
@Override @Override
public void addSink( public void addSink(
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
...@@ -118,18 +171,96 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui ...@@ -118,18 +171,96 @@ public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBui
@Override @Override
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
Object object = ObjectConvertUtil.convertValue(value, logicalType, sinkZoneIdUTC); if (value == null) {
if (object == null) {
return null; return null;
} }
if (logicalType instanceof TimestampType && object instanceof LocalDateTime) { if (logicalType instanceof DateType) {
return TimestampData.fromLocalDateTime((LocalDateTime) object); if (value instanceof Integer) {
} else if (logicalType instanceof DateType) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkZoneIdUTC).toLocalDate();
} else {
return Instant.ofEpochMilli((long) value).atZone(sinkZoneIdUTC).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkZoneIdUTC).toLocalDateTime();
} else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(sinkZoneIdUTC).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if (value instanceof Integer) { if (value instanceof Integer) {
return Instant.ofEpochSecond((int) value).atZone(sinkZoneIdUTC).toEpochSecond(); return ((Integer) value).longValue();
} else {
return value;
}
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else {
return value;
}
} }
return Instant.ofEpochMilli((long) value).atZone(sinkZoneIdUTC).toEpochSecond();
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList,
String schemaTableName) {
return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() {
@Override
public void flatMap(Map value, Collector<RowData> out) throws Exception {
try {
switch (value.get("op").toString()) {
case "r":
case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
return object; out.collect(igenericRowData);
break;
case "d":
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
dgenericRowData.setRowKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(dgenericRowData);
break;
case "u":
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubgenericRowData);
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uagenericRowData);
break;
default:
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e);
throw e;
}
}
});
} }
} }
...@@ -376,7 +376,7 @@ public class JobManager { ...@@ -376,7 +376,7 @@ public class JobManager {
streamGraph.setJobName(config.getJobName()); streamGraph.setJobName(config.getJobName());
JobGraph jobGraph = streamGraph.getJobGraph(); JobGraph jobGraph = streamGraph.getJobGraph();
if (Asserts.isNotNullString(config.getSavePointPath())) { if (Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath())); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(),true));
} }
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
} }
...@@ -434,7 +434,7 @@ public class JobManager { ...@@ -434,7 +434,7 @@ public class JobManager {
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts); JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
// Perjob mode need to set savepoint restore path, when recovery from savepoint. // Perjob mode need to set savepoint restore path, when recovery from savepoint.
if (Asserts.isNotNullString(config.getSavePointPath())) { if (Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath())); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(),true));
} }
// Perjob mode need to submit job graph. // Perjob mode need to submit job graph.
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment