Unverified Commit f7bf2cbc authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Optimization-983][client] Optimizate Doris datastream sink (#984)

Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent fd2004c1
...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder { ...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder {
Properties properties = new Properties(); Properties properties = new Properties();
Map<String, String> sink = config.getSink(); Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) { for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue()); properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue());
} }
} }
return properties; return properties;
......
...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder { ...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder {
Properties properties = new Properties(); Properties properties = new Properties();
Map<String, String> sink = config.getSink(); Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) { for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue()); properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue());
} }
} }
return properties; return properties;
......
...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder { ...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder {
Properties properties = new Properties(); Properties properties = new Properties();
Map<String, String> sink = config.getSink(); Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) { for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue()); properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue());
} }
} }
return properties; return properties;
......
...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder { ...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder {
Properties properties = new Properties(); Properties properties = new Properties();
Map<String, String> sink = config.getSink(); Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) { for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue()); properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue());
} }
} }
return properties; return properties;
......
...@@ -27,21 +27,22 @@ import com.dlink.model.Table; ...@@ -27,21 +27,22 @@ import com.dlink.model.Table;
import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink; import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.TypeConversions;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* DorisSinkBuilder * DorisSinkBuilder
*
* @author wenmo
* @since 2022/4/20 19:20
**/ **/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
...@@ -73,36 +74,88 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -73,36 +74,88 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
DorisExecutionOptions.Builder dorisExecutionOptionsBuilder = DorisExecutionOptions.builder();
Map<String, String> sink = config.getSink(); Map<String, String> sink = config.getSink();
if (sink.containsKey("sink.batch.size")) {
dorisExecutionOptionsBuilder.setBatchSize(Integer.valueOf(sink.get("sink.batch.size"))); // Create FieldNames and FieldType for RowDataSerializer.
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final List<DataType> dataTypeList = new ArrayList<>();
for (LogicalType logicalType : columnTypeList) {
dataTypeList.add(TypeConversions.fromLogicalToDataType(logicalType));
}
final DataType[] columnTypes = dataTypeList.toArray(new DataType[dataTypeList.size()]);
// Create DorisReadOptions for DorisSink.
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
if (sink.containsKey(DorisSinkOptions.DORIS_DESERIALIZE_ARROW_ASYNC.key())) {
readOptionBuilder.setDeserializeArrowAsync(Boolean.valueOf(sink.get(DorisSinkOptions.DORIS_DESERIALIZE_ARROW_ASYNC.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_DESERIALIZE_QUEUE_SIZE.key())) {
readOptionBuilder.setDeserializeQueueSize(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_DESERIALIZE_QUEUE_SIZE.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_EXEC_MEM_LIMIT.key())) {
readOptionBuilder.setExecMemLimit(Long.valueOf(sink.get(DorisSinkOptions.DORIS_EXEC_MEM_LIMIT.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_FILTER_QUERY.key())) {
readOptionBuilder.setFilterQuery(String.valueOf(sink.get(DorisSinkOptions.DORIS_FILTER_QUERY.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_READ_FIELD.key())) {
readOptionBuilder.setReadFields(sink.get(DorisSinkOptions.DORIS_READ_FIELD.key()));
}
if (sink.containsKey(DorisSinkOptions.DORIS_BATCH_SIZE.key())) {
readOptionBuilder.setRequestBatchSize(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_BATCH_SIZE.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS.key())) {
readOptionBuilder.setRequestConnectTimeoutMs(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS.key())));
} }
if (sink.containsKey("sink.batch.interval")) { if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_QUERY_TIMEOUT_S.key())) {
dorisExecutionOptionsBuilder.setBatchIntervalMs(Long.valueOf(sink.get("sink.batch.interval"))); readOptionBuilder.setRequestQueryTimeoutS(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_QUERY_TIMEOUT_S.key())));
} }
if (sink.containsKey("sink.max-retries")) { if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_READ_TIMEOUT_MS.key())) {
dorisExecutionOptionsBuilder.setMaxRetries(Integer.valueOf(sink.get("sink.max-retries"))); readOptionBuilder.setRequestReadTimeoutMs(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_READ_TIMEOUT_MS.key())));
} }
if (sink.containsKey("sink.enable-delete")) { if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_RETRIES.key())) {
dorisExecutionOptionsBuilder.setEnableDelete(Boolean.valueOf(sink.get("sink.enable-delete"))); readOptionBuilder.setRequestRetries(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_RETRIES.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_TABLET_SIZE.key())) {
readOptionBuilder.setRequestTabletSize(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_TABLET_SIZE.key())));
} }
dorisExecutionOptionsBuilder.setStreamLoadProp(getProperties());
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); // Create DorisOptions for DorisSink.
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(config.getSink().get(DorisSinkOptions.FENODES.key()))
rowDataDataStream.addSink(
DorisSink.sink(
columnNames,
columnTypes,
DorisReadOptions.builder().build(),
dorisExecutionOptionsBuilder.build(),
DorisOptions.builder()
.setFenodes(config.getSink().get("fenodes"))
.setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table)) .setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table))
.setUsername(config.getSink().get("username")) .setUsername(config.getSink().get(DorisSinkOptions.USERNAME.key()))
.setPassword(config.getSink().get("password")).build() .setPassword(config.getSink().get(DorisSinkOptions.PASSWORD.key())).build();
));
// Create DorisExecutionOptions for DorisSink.
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
if (sink.containsKey(DorisSinkOptions.SINK_BUFFER_COUNT.key())) {
executionBuilder.setBufferCount(Integer.valueOf(sink.get(DorisSinkOptions.SINK_BUFFER_COUNT.key())));
}
if (sink.containsKey(DorisSinkOptions.SINK_BUFFER_SIZE.key())) {
executionBuilder.setBufferSize(Integer.valueOf(sink.get(DorisSinkOptions.SINK_BUFFER_SIZE.key())));
}
if (sink.containsKey(DorisSinkOptions.SINK_ENABLE_DELETE.key())) {
executionBuilder.setDeletable(Boolean.valueOf(sink.get(DorisSinkOptions.SINK_ENABLE_DELETE.key())));
}
if (sink.containsKey(DorisSinkOptions.SINK_LABEL_PREFIX.key())) {
executionBuilder.setLabelPrefix(getSinkSchemaName(table) + "_" + getSinkTableName(table) + sink.get(DorisSinkOptions.SINK_LABEL_PREFIX.key()));
}
if (sink.containsKey(DorisSinkOptions.SINK_MAX_RETRIES.key())) {
executionBuilder.setMaxRetries(Integer.valueOf(sink.get(DorisSinkOptions.SINK_MAX_RETRIES.key())));
}
executionBuilder.setStreamLoadProp(getProperties());
// Create DorisSink.
DorisSink.Builder<RowData> builder = DorisSink.builder();
builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(RowDataSerializer.builder()
.setFieldNames(columnNames)
.setType("json")
.setFieldType(columnTypes).build())
.setDorisOptions(dorisBuilder.build());
rowDataDataStream.sinkTo(builder.build()).name("Doris Sink(table=[" + getSinkSchemaName(table) + "." + getSinkTableName(table) + "])");
} }
} }
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.cdc.doris;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/**
* DorisSinkOptions
**/
public class DorisSinkOptions {
public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue()
.withDescription("Doris FE http address, support multiple addresses, separated by commas.");
public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue()
.withDescription("Doris table identifier, eg, db1.tbl1.");
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue()
.withDescription("Doris username.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue()
.withDescription("Doris password.");
public static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions.key("doris.deserialize.arrow.async").booleanType().defaultValue(false)
.withDescription("Whether to support asynchronous conversion of Arrow format to RowBatch required for flink-doris-connector iteration.");
public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions.key("doris.deserialize.queue.size").intType().defaultValue(64)
.withDescription("Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true.");
public static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions.key("doris.exec.mem.limit").longType().defaultValue(2147483648L)
.withDescription("Memory limit for a single query. The default is 2GB, in bytes.");
public static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions.key("doris.filter.query").stringType().noDefaultValue()
.withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering.");
public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions.key("doris.read.field").stringType().noDefaultValue()
.withDescription("List of column names in the Doris table, separated by commas.");
public static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions.key("doris.batch.size").intType().defaultValue(1024)
.withDescription("The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Flink and Doris."
+ " Thereby reducing the extra time overhead caused by network delay.");
public static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions.key("doris.request.connect.timeout.ms").intType().defaultValue(30000)
.withDescription("Connection timeout for sending requests to Doris.");
public static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions.key("doris.request.query.timeout.s").intType().defaultValue(3600)
.withDescription("Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit.");
public static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions.key("doris.request.read.timeout.ms").intType().defaultValue(30000)
.withDescription("Read timeout for sending request to Doris.");
public static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions.key("doris.request.retries").intType().defaultValue(3)
.withDescription("Number of retries to send requests to Doris.");
public static final ConfigOption<Integer> DORIS_REQUEST_TABLET_SIZE = ConfigOptions.key("doris.request.tablet.size").intType().defaultValue(Integer.MAX_VALUE)
.withDescription("The number of Doris Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. "
+ "This will increase the parallelism on the flink side, but at the same time will cause greater pressure on Doris.");
public static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions.key("sink.buffer-count").intType().defaultValue(3)
.withDescription("The number of write data cache buffers, it is not recommended to modify, the default configuration is sufficient.");
public static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions.key("sink.buffer-size").intType().defaultValue(1048576)
.withDescription("Write data cache buffer size, in bytes. It is not recommended to modify, the default configuration is sufficient.");
public static final ConfigOption<Boolean> SINK_ENABLE_DELETE = ConfigOptions.key("sink.enable-delete").booleanType().defaultValue(true)
.withDescription("Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Uniq model.");
public static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions.key("sink.label-prefix").stringType().noDefaultValue()
.withDescription("The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of Flink.");
public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries").intType().defaultValue(1)
.withDescription("In the 2pc scenario, the number of retries after the commit phase fails.");
}
...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder { ...@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder {
Properties properties = new Properties(); Properties properties = new Properties();
Map<String, String> sink = config.getSink(); Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) { for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue()); properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue());
} }
} }
return properties; return properties;
......
...@@ -27,21 +27,22 @@ import com.dlink.model.Table; ...@@ -27,21 +27,22 @@ import com.dlink.model.Table;
import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink; import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.TypeConversions;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* DorisSinkBuilder * DorisSinkBuilder
*
* @author wenmo
* @since 2022/4/20 19:20
**/ **/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
...@@ -73,36 +74,88 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -73,36 +74,88 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
DorisExecutionOptions.Builder dorisExecutionOptionsBuilder = DorisExecutionOptions.builder();
Map<String, String> sink = config.getSink(); Map<String, String> sink = config.getSink();
if (sink.containsKey("sink.batch.size")) {
dorisExecutionOptionsBuilder.setBatchSize(Integer.valueOf(sink.get("sink.batch.size"))); // Create FieldNames and FieldType for RowDataSerializer.
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final List<DataType> dataTypeList = new ArrayList<>();
for (LogicalType logicalType : columnTypeList) {
dataTypeList.add(TypeConversions.fromLogicalToDataType(logicalType));
}
final DataType[] columnTypes = dataTypeList.toArray(new DataType[dataTypeList.size()]);
// Create DorisReadOptions for DorisSink.
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
if (sink.containsKey(DorisSinkOptions.DORIS_DESERIALIZE_ARROW_ASYNC.key())) {
readOptionBuilder.setDeserializeArrowAsync(Boolean.valueOf(sink.get(DorisSinkOptions.DORIS_DESERIALIZE_ARROW_ASYNC.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_DESERIALIZE_QUEUE_SIZE.key())) {
readOptionBuilder.setDeserializeQueueSize(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_DESERIALIZE_QUEUE_SIZE.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_EXEC_MEM_LIMIT.key())) {
readOptionBuilder.setExecMemLimit(Long.valueOf(sink.get(DorisSinkOptions.DORIS_EXEC_MEM_LIMIT.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_FILTER_QUERY.key())) {
readOptionBuilder.setFilterQuery(String.valueOf(sink.get(DorisSinkOptions.DORIS_FILTER_QUERY.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_READ_FIELD.key())) {
readOptionBuilder.setReadFields(sink.get(DorisSinkOptions.DORIS_READ_FIELD.key()));
}
if (sink.containsKey(DorisSinkOptions.DORIS_BATCH_SIZE.key())) {
readOptionBuilder.setRequestBatchSize(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_BATCH_SIZE.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS.key())) {
readOptionBuilder.setRequestConnectTimeoutMs(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS.key())));
} }
if (sink.containsKey("sink.batch.interval")) { if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_QUERY_TIMEOUT_S.key())) {
dorisExecutionOptionsBuilder.setBatchIntervalMs(Long.valueOf(sink.get("sink.batch.interval"))); readOptionBuilder.setRequestQueryTimeoutS(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_QUERY_TIMEOUT_S.key())));
} }
if (sink.containsKey("sink.max-retries")) { if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_READ_TIMEOUT_MS.key())) {
dorisExecutionOptionsBuilder.setMaxRetries(Integer.valueOf(sink.get("sink.max-retries"))); readOptionBuilder.setRequestReadTimeoutMs(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_READ_TIMEOUT_MS.key())));
} }
if (sink.containsKey("sink.enable-delete")) { if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_RETRIES.key())) {
dorisExecutionOptionsBuilder.setEnableDelete(Boolean.valueOf(sink.get("sink.enable-delete"))); readOptionBuilder.setRequestRetries(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_RETRIES.key())));
}
if (sink.containsKey(DorisSinkOptions.DORIS_REQUEST_TABLET_SIZE.key())) {
readOptionBuilder.setRequestTabletSize(Integer.valueOf(sink.get(DorisSinkOptions.DORIS_REQUEST_TABLET_SIZE.key())));
} }
dorisExecutionOptionsBuilder.setStreamLoadProp(getProperties());
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); // Create DorisOptions for DorisSink.
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(config.getSink().get(DorisSinkOptions.FENODES.key()))
rowDataDataStream.addSink(
DorisSink.sink(
columnNames,
columnTypes,
DorisReadOptions.builder().build(),
dorisExecutionOptionsBuilder.build(),
DorisOptions.builder()
.setFenodes(config.getSink().get("fenodes"))
.setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table)) .setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table))
.setUsername(config.getSink().get("username")) .setUsername(config.getSink().get(DorisSinkOptions.USERNAME.key()))
.setPassword(config.getSink().get("password")).build() .setPassword(config.getSink().get(DorisSinkOptions.PASSWORD.key())).build();
));
// Create DorisExecutionOptions for DorisSink.
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
if (sink.containsKey(DorisSinkOptions.SINK_BUFFER_COUNT.key())) {
executionBuilder.setBufferCount(Integer.valueOf(sink.get(DorisSinkOptions.SINK_BUFFER_COUNT.key())));
}
if (sink.containsKey(DorisSinkOptions.SINK_BUFFER_SIZE.key())) {
executionBuilder.setBufferSize(Integer.valueOf(sink.get(DorisSinkOptions.SINK_BUFFER_SIZE.key())));
}
if (sink.containsKey(DorisSinkOptions.SINK_ENABLE_DELETE.key())) {
executionBuilder.setDeletable(Boolean.valueOf(sink.get(DorisSinkOptions.SINK_ENABLE_DELETE.key())));
}
if (sink.containsKey(DorisSinkOptions.SINK_LABEL_PREFIX.key())) {
executionBuilder.setLabelPrefix(getSinkSchemaName(table) + "_" + getSinkTableName(table) + sink.get(DorisSinkOptions.SINK_LABEL_PREFIX.key()));
}
if (sink.containsKey(DorisSinkOptions.SINK_MAX_RETRIES.key())) {
executionBuilder.setMaxRetries(Integer.valueOf(sink.get(DorisSinkOptions.SINK_MAX_RETRIES.key())));
}
executionBuilder.setStreamLoadProp(getProperties());
// Create DorisSink.
DorisSink.Builder<RowData> builder = DorisSink.builder();
builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(RowDataSerializer.builder()
.setFieldNames(columnNames)
.setType("json")
.setFieldType(columnTypes).build())
.setDorisOptions(dorisBuilder.build());
rowDataDataStream.sinkTo(builder.build()).name("Doris Sink(table=[" + getSinkSchemaName(table) + "." + getSinkTableName(table) + "])");
} }
} }
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.cdc.doris;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/**
* DorisSinkOptions
**/
public class DorisSinkOptions {
public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue()
.withDescription("Doris FE http address, support multiple addresses, separated by commas.");
public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue()
.withDescription("Doris table identifier, eg, db1.tbl1.");
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue()
.withDescription("Doris username.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue()
.withDescription("Doris password.");
public static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions.key("doris.deserialize.arrow.async").booleanType().defaultValue(false)
.withDescription("Whether to support asynchronous conversion of Arrow format to RowBatch required for flink-doris-connector iteration.");
public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions.key("doris.deserialize.queue.size").intType().defaultValue(64)
.withDescription("Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true.");
public static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions.key("doris.exec.mem.limit").longType().defaultValue(2147483648L)
.withDescription("Memory limit for a single query. The default is 2GB, in bytes.");
public static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions.key("doris.filter.query").stringType().noDefaultValue()
.withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering.");
public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions.key("doris.read.field").stringType().noDefaultValue()
.withDescription("List of column names in the Doris table, separated by commas.");
public static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions.key("doris.batch.size").intType().defaultValue(1024)
.withDescription("The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Flink and Doris."
+ " Thereby reducing the extra time overhead caused by network delay.");
public static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions.key("doris.request.connect.timeout.ms").intType().defaultValue(30000)
.withDescription("Connection timeout for sending requests to Doris.");
public static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions.key("doris.request.query.timeout.s").intType().defaultValue(3600)
.withDescription("Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit.");
public static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions.key("doris.request.read.timeout.ms").intType().defaultValue(30000)
.withDescription("Read timeout for sending request to Doris.");
public static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions.key("doris.request.retries").intType().defaultValue(3)
.withDescription("Number of retries to send requests to Doris.");
public static final ConfigOption<Integer> DORIS_REQUEST_TABLET_SIZE = ConfigOptions.key("doris.request.tablet.size").intType().defaultValue(Integer.MAX_VALUE)
.withDescription("The number of Doris Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. "
+ "This will increase the parallelism on the flink side, but at the same time will cause greater pressure on Doris.");
public static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions.key("sink.buffer-count").intType().defaultValue(3)
.withDescription("The number of write data cache buffers, it is not recommended to modify, the default configuration is sufficient.");
public static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions.key("sink.buffer-size").intType().defaultValue(1048576)
.withDescription("Write data cache buffer size, in bytes. It is not recommended to modify, the default configuration is sufficient.");
public static final ConfigOption<Boolean> SINK_ENABLE_DELETE = ConfigOptions.key("sink.enable-delete").booleanType().defaultValue(true)
.withDescription("Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Uniq model.");
public static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions.key("sink.label-prefix").stringType().noDefaultValue()
.withDescription("The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of Flink.");
public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries").intType().defaultValue(1)
.withDescription("In the 2pc scenario, the number of retries after the commit phase fails.");
}
...@@ -176,7 +176,7 @@ ...@@ -176,7 +176,7 @@
<dependency> <dependency>
<groupId>org.apache.doris</groupId> <groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.14_${scala.binary.version}</artifactId> <artifactId>flink-doris-connector-1.14_${scala.binary.version}</artifactId>
<version>1.0.3</version> <version>1.1.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.starrocks</groupId> <groupId>com.starrocks</groupId>
......
...@@ -160,7 +160,7 @@ ...@@ -160,7 +160,7 @@
<dependency> <dependency>
<groupId>org.apache.doris</groupId> <groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.14_2.12</artifactId> <artifactId>flink-doris-connector-1.14_2.12</artifactId>
<version>1.0.3</version> <version>1.1.0</version>
</dependency> </dependency>
</dependencies> </dependencies>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment