Unverified Commit 1ae6a845 authored by booleandev's avatar booleandev Committed by GitHub

[Fix-1083][client] kafka sink properties not enable(#1083) (#1139)

parent 3f5b8da1
......@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue());
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("properties.",""), entry.getValue());
}
}
return properties;
......
......@@ -19,6 +19,7 @@
package com.dlink.cdc.doris;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
......@@ -36,6 +37,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* DorisSinkBuilder
......@@ -105,4 +107,16 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.setPassword(config.getSink().get("password")).build()
));
}
@Override
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
}
}
return properties;
}
}
......@@ -39,13 +39,16 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* MysqlCDCBuilder
......@@ -90,10 +93,12 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
Properties kafkaProducerConfig = getProperties();
getPropertiesFromBrokerList(kafkaProducerConfig, config.getSink().get("brokers"));
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
config.getSink().get("topic"),
new SimpleStringSchema()));
dataStreamSource.addSink(new FlinkKafkaProducer<>(config.getSink().get("topic"),
new SimpleStringSchema(),
kafkaProducerConfig));
} else {
Map<Table, OutputTag<String>> tagMap = new HashMap<>();
......@@ -129,11 +134,20 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
});
tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
process.getSideOutput(v).rebalance().addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
topic, new SimpleStringSchema())).name(topic);
process.getSideOutput(v).rebalance().addSink(new FlinkKafkaProducer<>(
topic, new SimpleStringSchema(), kafkaProducerConfig)).name(topic);
});
}
}
return dataStreamSource;
}
private void getPropertiesFromBrokerList(Properties props, String brokerList) {
String[] elements = brokerList.split(",");
// validate the broker addresses
for (String broker : elements) {
NetUtils.getCorrectHostnamePort(broker);
}
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
}
}
......@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue());
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("properties.",""), entry.getValue());
}
}
return properties;
......
......@@ -19,6 +19,7 @@
package com.dlink.cdc.doris;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
......@@ -36,6 +37,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* DorisSinkBuilder
......@@ -105,4 +107,16 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.setPassword(config.getSink().get("password")).build()
));
}
@Override
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
}
}
return properties;
}
}
......@@ -39,13 +39,16 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* MysqlCDCBuilder
......@@ -90,10 +93,13 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
Properties kafkaProducerConfig = getProperties();
getPropertiesFromBrokerList(kafkaProducerConfig, config.getSink().get("brokers"));
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
dataStreamSource.addSink(new FlinkKafkaProducer<>(
config.getSink().get("topic"),
new SimpleStringSchema()));
new SimpleStringSchema(),
kafkaProducerConfig));
} else {
Map<Table, OutputTag<String>> tagMap = new HashMap<>();
......@@ -130,11 +136,20 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
});
tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
process.getSideOutput(v).rebalance().addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
topic, new SimpleStringSchema())).name(topic);
process.getSideOutput(v).rebalance().addSink(new FlinkKafkaProducer<>(
topic, new SimpleStringSchema(), kafkaProducerConfig)).name(topic);
});
}
}
return dataStreamSource;
}
private void getPropertiesFromBrokerList(Properties props, String brokerList) {
String[] elements = brokerList.split(",");
// validate the broker addresses
for (String broker : elements) {
NetUtils.getCorrectHostnamePort(broker);
}
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
}
}
......@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder implements SinkBuilder {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue());
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("properties.",""), entry.getValue());
}
}
return properties;
......
......@@ -19,6 +19,7 @@
package com.dlink.cdc.doris;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
......@@ -36,6 +37,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* DorisSinkBuilder
......@@ -105,4 +107,16 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements Serializabl
.setPassword(config.getSink().get("password")).build()
));
}
@Override
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
}
}
return properties;
}
}
......@@ -39,13 +39,16 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* MysqlCDCBuilder
......@@ -80,10 +83,13 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializabl
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
Properties kafkaProducerConfig = getProperties();
getPropertiesFromBrokerList(kafkaProducerConfig, config.getSink().get("brokers"));
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
dataStreamSource.addSink(new FlinkKafkaProducer<>(
config.getSink().get("topic"),
new SimpleStringSchema()));
new SimpleStringSchema(),
kafkaProducerConfig));
} else {
Map<Table, OutputTag<String>> tagMap = new HashMap<>();
......@@ -120,8 +126,8 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializabl
});
tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
process.getSideOutput(v).rebalance().addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
topic, new SimpleStringSchema())).name(topic);
process.getSideOutput(v).rebalance().addSink(new FlinkKafkaProducer<>(
topic, new SimpleStringSchema(), kafkaProducerConfig)).name(topic);
});
}
}
......@@ -136,4 +142,13 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializabl
List<String> columnNameList,
List<LogicalType> columnTypeList) {
}
private void getPropertiesFromBrokerList(Properties props, String brokerList) {
String[] elements = brokerList.split(",");
// validate the broker addresses
for (String broker : elements) {
NetUtils.getCorrectHostnamePort(broker);
}
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
}
}
......@@ -105,8 +105,8 @@ public abstract class AbstractSinkBuilder implements SinkBuilder {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("properties.", ""), entry.getValue());
}
}
return properties;
......
......@@ -19,6 +19,7 @@
package com.dlink.cdc.doris;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
......@@ -170,4 +171,16 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements Serializabl
rowDataDataStream.sinkTo(builder.build()).name("Doris Sink(table=[" + getSinkSchemaName(table) + "." + getSinkTableName(table) + "])");
}
@Override
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
}
}
return properties;
}
}
......@@ -48,6 +48,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* MysqlCDCBuilder
......@@ -92,6 +93,8 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializabl
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
// 解决kafka的 properties 配置未加载问题
Properties kafkaProducerConfig = getProperties();
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
......@@ -100,6 +103,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializabl
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.valueOf(env.getCheckpointingMode().name()))
.setKafkaProducerConfig(kafkaProducerConfig)
.build();
dataStreamSource.sinkTo(kafkaSink);
} else {
......@@ -144,6 +148,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializabl
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.valueOf(env.getCheckpointingMode().name()))
.setKafkaProducerConfig(kafkaProducerConfig)
.build();
process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic);
});
......
......@@ -103,8 +103,8 @@ public abstract class AbstractSinkBuilder {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue());
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("properties.",""), entry.getValue());
}
}
return properties;
......
......@@ -19,6 +19,7 @@
package com.dlink.cdc.doris;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
......@@ -40,6 +41,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
/**
......@@ -161,4 +163,16 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
rowDataDataStream.sinkTo(builder.build()).name("Doris Sink(table=[" + getSinkSchemaName(table) + "." + getSinkTableName(table) + "])");
}
@Override
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
}
}
return properties;
}
}
......@@ -48,6 +48,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* MysqlCDCBuilder
......@@ -92,6 +93,8 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
// 解决kafka的 properties 配置未加载问题
Properties kafkaProducerConfig = getProperties();
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
......@@ -100,6 +103,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.valueOf(env.getCheckpointingMode().name()))
.setKafkaProducerConfig(kafkaProducerConfig)
.build();
dataStreamSource.sinkTo(kafkaSink);
} else {
......@@ -144,6 +148,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.valueOf(env.getCheckpointingMode().name()))
.setKafkaProducerConfig(kafkaProducerConfig)
.build();
process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic);
});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment