Unverified Commit a619d50b authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Optimization-1014][client] Optimizate Doris sink and type convert and upgrade...

[Optimization-1014][client] Optimizate Doris sink and type convert and upgrade flink to 1.15.2 (#1015)
Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent eef636b1
...@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector; ...@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag; import org.apache.flink.util.OutputTag;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -68,6 +67,8 @@ import java.util.List; ...@@ -68,6 +67,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import javax.xml.bind.DatatypeConverter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -83,6 +84,7 @@ public abstract class AbstractSinkBuilder { ...@@ -83,6 +84,7 @@ public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList(); protected List<ModifyOperation> modifyOperations = new ArrayList();
private ZoneId sinkTimeZone = ZoneId.of("UTC");
public AbstractSinkBuilder() { public AbstractSinkBuilder() {
} }
...@@ -104,7 +106,7 @@ public abstract class AbstractSinkBuilder { ...@@ -104,7 +106,7 @@ public abstract class AbstractSinkBuilder {
Map<String, String> sink = config.getSink(); Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) { for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue()); properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
} }
} }
return properties; return properties;
...@@ -137,9 +139,9 @@ public abstract class AbstractSinkBuilder { ...@@ -137,9 +139,9 @@ public abstract class AbstractSinkBuilder {
} }
protected DataStream<Map> shunt( protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator, SingleOutputStreamOperator<Map> processOperator,
Table table, Table table,
OutputTag<Map> tag) { OutputTag<Map> tag) {
return processOperator.getSideOutput(tag); return processOperator.getSideOutput(tag);
} }
...@@ -193,7 +195,7 @@ public abstract class AbstractSinkBuilder { ...@@ -193,7 +195,7 @@ public abstract class AbstractSinkBuilder {
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
...@@ -213,6 +215,12 @@ public abstract class AbstractSinkBuilder { ...@@ -213,6 +215,12 @@ public abstract class AbstractSinkBuilder {
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)) {
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
...@@ -294,14 +302,41 @@ public abstract class AbstractSinkBuilder { ...@@ -294,14 +302,41 @@ public abstract class AbstractSinkBuilder {
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DateType) { } else if (logicalType instanceof DateType) {
return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString()); return value;
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value))); if (value instanceof Integer) {
return TimestampData.fromLocalDateTime(Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime());
} else if (value instanceof Long) {
return TimestampData.fromLocalDateTime(Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime());
} else {
return TimestampData.fromLocalDateTime(Instant.parse(value.toString()).atZone(sinkTimeZone).toLocalDateTime());
}
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision(); final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale(); final int scale = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scale); return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scale);
} else if (logicalType instanceof FloatType) {
if (value instanceof Float) {
return value;
} else if (value instanceof Double) {
return ((Double) value).floatValue();
} else {
return Float.parseFloat(value.toString());
}
} else if (logicalType instanceof BigIntType) {
if (value instanceof Integer) {
return ((Integer) value).longValue();
} else {
return value;
}
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary(value.toString());
} else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -40,6 +40,7 @@ import java.io.Serializable; ...@@ -40,6 +40,7 @@ import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
/** /**
* DorisSinkBuilder * DorisSinkBuilder
...@@ -137,6 +138,8 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -137,6 +138,8 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
} }
if (sink.containsKey(DorisSinkOptions.SINK_ENABLE_DELETE.key())) { if (sink.containsKey(DorisSinkOptions.SINK_ENABLE_DELETE.key())) {
executionBuilder.setDeletable(Boolean.valueOf(sink.get(DorisSinkOptions.SINK_ENABLE_DELETE.key()))); executionBuilder.setDeletable(Boolean.valueOf(sink.get(DorisSinkOptions.SINK_ENABLE_DELETE.key())));
} else {
executionBuilder.setDeletable(true);
} }
if (sink.containsKey(DorisSinkOptions.SINK_LABEL_PREFIX.key())) { if (sink.containsKey(DorisSinkOptions.SINK_LABEL_PREFIX.key())) {
executionBuilder.setLabelPrefix(getSinkSchemaName(table) + "_" + getSinkTableName(table) + sink.get(DorisSinkOptions.SINK_LABEL_PREFIX.key())); executionBuilder.setLabelPrefix(getSinkSchemaName(table) + "_" + getSinkTableName(table) + sink.get(DorisSinkOptions.SINK_LABEL_PREFIX.key()));
...@@ -144,7 +147,12 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -144,7 +147,12 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
if (sink.containsKey(DorisSinkOptions.SINK_MAX_RETRIES.key())) { if (sink.containsKey(DorisSinkOptions.SINK_MAX_RETRIES.key())) {
executionBuilder.setMaxRetries(Integer.valueOf(sink.get(DorisSinkOptions.SINK_MAX_RETRIES.key()))); executionBuilder.setMaxRetries(Integer.valueOf(sink.get(DorisSinkOptions.SINK_MAX_RETRIES.key())));
} }
executionBuilder.setStreamLoadProp(getProperties());
Properties properties = getProperties();
// Doris 1.1 need to this para to support delete
properties.setProperty("columns", String.join(",", columnNameList) + ",__DORIS_DELETE_SIGN__");
executionBuilder.setStreamLoadProp(properties);
// Create DorisSink. // Create DorisSink.
DorisSink.Builder<RowData> builder = DorisSink.builder(); DorisSink.Builder<RowData> builder = DorisSink.builder();
...@@ -153,6 +161,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -153,6 +161,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.setSerializer(RowDataSerializer.builder() .setSerializer(RowDataSerializer.builder()
.setFieldNames(columnNames) .setFieldNames(columnNames)
.setType("json") .setType("json")
.enableDelete(true)
.setFieldType(columnTypes).build()) .setFieldType(columnTypes).build())
.setDorisOptions(dorisBuilder.build()); .setDorisOptions(dorisBuilder.build());
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.1</flink.version> <flink.version>1.15.2</flink.version>
<flink.guava.version>15.0</flink.guava.version> <flink.guava.version>15.0</flink.guava.version>
<flinkcdc.version>2.2.1</flinkcdc.version> <flinkcdc.version>2.2.1</flinkcdc.version>
<commons.version>1.3.1</commons.version> <commons.version>1.3.1</commons.version>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment