Commit 564b1547 authored by wenmo's avatar wenmo

[Optimization-439][client] Optimize CDCSource sync doris

parent 904af902
...@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder { ...@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder {
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("."); String[] names = tableName.split("\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
schemaList.add(names[0]); schemaList.add(names[0]);
} }
......
package com.dlink.cdc; package com.dlink.cdc;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* AbstractCDCBuilder * AbstractCDCBuilder
...@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder { ...@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder {
public void setConfig(FlinkCDCConfig config) { public void setConfig(FlinkCDCConfig config) {
this.config = config; this.config = config;
} }
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
return properties;
}
protected SingleOutputStreamOperator<Map> deserialize(DataStreamSource<String> dataStreamSource) {
return dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
}
protected SingleOutputStreamOperator<Map> shunt(
SingleOutputStreamOperator<Map> mapOperator,
Table table,
String schemaFieldName) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
return mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() {
@Override
public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) {
case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(igenericRowData);
break;
case "d":
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
dgenericRowData.setRowKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(dgenericRowData);
break;
case "u":
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubgenericRowData);
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uagenericRowData);
break;
}
}
});
}
public abstract void addSink(
DataStream<RowData> rowDataDataStream,
String schemaTableName,
List<String> columnNameList,
List<LogicalType> columnTypeList);
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(rowDataDataStream, table.getSchemaTableName(), columnNameList, columnTypeList);
}
}
}
return dataStreamSource;
}
protected void buildColumn(List<String> columnNameList, List<LogicalType> columnTypeList, List<Column> columns) {
for (Column column : columns) {
columnNameList.add(column.getName());
columnTypeList.add(getLogicalType(column.getJavaType()));
}
}
protected LogicalType getLogicalType(ColumnType columnType) {
switch (columnType) {
case STRING:
return new VarCharType();
case BOOLEAN:
case JAVA_LANG_BOOLEAN:
return new BooleanType();
case BYTE:
case JAVA_LANG_BYTE:
return new TinyIntType();
case SHORT:
case JAVA_LANG_SHORT:
return new SmallIntType();
case LONG:
case JAVA_LANG_LONG:
return new BigIntType();
case FLOAT:
case JAVA_LANG_FLOAT:
return new FloatType();
case DOUBLE:
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType();
case INT:
case INTEGER:
return new IntType();
default:
return new VarCharType();
}
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value);
} else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else {
return value;
}
}
} }
...@@ -4,10 +4,13 @@ import org.apache.flink.api.common.functions.FilterFunction; ...@@ -4,10 +4,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
...@@ -39,6 +42,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -39,6 +42,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
super(config); super(config);
} }
@Override
public void addSink(DataStream<RowData> rowDataDataStream, String schemaTableName, List<String> columnNameList, List<LogicalType> columnTypeList) {
}
@Override @Override
public String getHandle() { public String getHandle() {
return KEY_WORD; return KEY_WORD;
...@@ -50,7 +58,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -50,7 +58,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
} }
@Override @Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) { public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) { if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
config.getSink().get("topic"), config.getSink().get("topic"),
......
...@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder { ...@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder {
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("."); String[] names = tableName.split("\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
schemaList.add(names[0]); schemaList.add(names[0]);
} }
......
package com.dlink.cdc; package com.dlink.cdc;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* AbstractCDCBuilder * AbstractCDCBuilder
...@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder { ...@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder {
public void setConfig(FlinkCDCConfig config) { public void setConfig(FlinkCDCConfig config) {
this.config = config; this.config = config;
} }
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
return properties;
}
protected SingleOutputStreamOperator<Map> deserialize(DataStreamSource<String> dataStreamSource) {
return dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
}
protected SingleOutputStreamOperator<Map> shunt(
SingleOutputStreamOperator<Map> mapOperator,
Table table,
String schemaFieldName) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
return mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() {
@Override
public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) {
case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(igenericRowData);
break;
case "d":
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
dgenericRowData.setRowKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(dgenericRowData);
break;
case "u":
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubgenericRowData);
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uagenericRowData);
break;
}
}
});
}
public abstract void addSink(
DataStream<RowData> rowDataDataStream,
String schemaTableName,
List<String> columnNameList,
List<LogicalType> columnTypeList);
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(rowDataDataStream, table.getSchemaTableName(), columnNameList, columnTypeList);
}
}
}
return dataStreamSource;
}
protected void buildColumn(List<String> columnNameList, List<LogicalType> columnTypeList, List<Column> columns) {
for (Column column : columns) {
columnNameList.add(column.getName());
columnTypeList.add(getLogicalType(column.getJavaType()));
}
}
protected LogicalType getLogicalType(ColumnType columnType) {
switch (columnType) {
case STRING:
return new VarCharType();
case BOOLEAN:
case JAVA_LANG_BOOLEAN:
return new BooleanType();
case BYTE:
case JAVA_LANG_BYTE:
return new TinyIntType();
case SHORT:
case JAVA_LANG_SHORT:
return new SmallIntType();
case LONG:
case JAVA_LANG_LONG:
return new BigIntType();
case FLOAT:
case JAVA_LANG_FLOAT:
return new FloatType();
case DOUBLE:
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType();
case INT:
case INTEGER:
return new IntType();
default:
return new VarCharType();
}
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value);
} else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else {
return value;
}
}
} }
...@@ -4,10 +4,13 @@ import org.apache.flink.api.common.functions.FilterFunction; ...@@ -4,10 +4,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
...@@ -39,6 +42,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -39,6 +42,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
super(config); super(config);
} }
@Override
public void addSink(DataStream<RowData> rowDataDataStream, String schemaTableName, List<String> columnNameList, List<LogicalType> columnTypeList) {
}
@Override @Override
public String getHandle() { public String getHandle() {
return KEY_WORD; return KEY_WORD;
...@@ -50,7 +58,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -50,7 +58,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
} }
@Override @Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) { public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) { if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
config.getSink().get("topic"), config.getSink().get("topic"),
......
...@@ -32,63 +32,10 @@ ...@@ -32,63 +32,10 @@
<artifactId>dlink-flink-1.13</artifactId> <artifactId>dlink-flink-1.13</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
<!--<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
&lt;!&ndash;打jar包&ndash;&gt;
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>uber</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>-->
</project> </project>
\ No newline at end of file
...@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder { ...@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder {
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("."); String[] names = tableName.split("\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
schemaList.add(names[0]); schemaList.add(names[0]);
} }
......
package com.dlink.cdc; package com.dlink.cdc;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* AbstractCDCBuilder * AbstractCDCBuilder
...@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder { ...@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder {
public void setConfig(FlinkCDCConfig config) { public void setConfig(FlinkCDCConfig config) {
this.config = config; this.config = config;
} }
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
return properties;
}
protected SingleOutputStreamOperator<Map> deserialize(DataStreamSource<String> dataStreamSource) {
return dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
}
protected SingleOutputStreamOperator<Map> shunt(
SingleOutputStreamOperator<Map> mapOperator,
Table table,
String schemaFieldName) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
return mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() {
@Override
public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) {
case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(igenericRowData);
break;
case "d":
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
dgenericRowData.setRowKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(dgenericRowData);
break;
case "u":
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubgenericRowData);
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uagenericRowData);
break;
}
}
});
}
public abstract void addSink(
DataStream<RowData> rowDataDataStream,
String schemaTableName,
List<String> columnNameList,
List<LogicalType> columnTypeList);
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(rowDataDataStream, table.getSchemaTableName(), columnNameList, columnTypeList);
}
}
}
return dataStreamSource;
}
protected void buildColumn(List<String> columnNameList, List<LogicalType> columnTypeList, List<Column> columns) {
for (Column column : columns) {
columnNameList.add(column.getName());
columnTypeList.add(getLogicalType(column.getJavaType()));
}
}
protected LogicalType getLogicalType(ColumnType columnType) {
switch (columnType) {
case STRING:
return new VarCharType();
case BOOLEAN:
case JAVA_LANG_BOOLEAN:
return new BooleanType();
case BYTE:
case JAVA_LANG_BYTE:
return new TinyIntType();
case SHORT:
case JAVA_LANG_SHORT:
return new SmallIntType();
case LONG:
case JAVA_LANG_LONG:
return new BigIntType();
case FLOAT:
case JAVA_LANG_FLOAT:
return new FloatType();
case DOUBLE:
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType();
case INT:
case INTEGER:
return new IntType();
default:
return new VarCharType();
}
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value);
} else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else {
return value;
}
}
} }
package com.dlink.cdc.jdbc; package com.dlink.cdc.jdbc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.dialect.ClickHouseDialect;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.dialect.OracleDialect;
import org.apache.flink.connector.jdbc.dialect.PostgresDialect;
import org.apache.flink.connector.jdbc.dialect.SQLServerDialect;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.table.JdbcUpsertTableSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
* *
...@@ -54,6 +58,71 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -54,6 +58,71 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
return new JdbcSinkBuilder(config); return new JdbcSinkBuilder(config);
} }
@Override
public void addSink(
DataStream<RowData> rowDataDataStream,
String schemaTableName,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
JdbcUpsertTableSink.Builder builder = JdbcUpsertTableSink.builder();
Map<String, String> sink = config.getSink();
if (sink.containsKey("sink.buffer-flush.interval")) {
builder.setFlushIntervalMills(Integer.valueOf(sink.get("sink.buffer-flush.interval")));
}
if (sink.containsKey("sink.buffer-flush.max-rows")) {
builder.setFlushMaxSize(Integer.valueOf(sink.get("sink.buffer-flush.max-rows")));
}
if (sink.containsKey("sink.max-retries")) {
builder.setMaxRetryTimes(Integer.valueOf(sink.get("sink.max-retries")));
}
JdbcOptions.Builder jdbcOptionsBuilder = JdbcOptions.builder();
if (sink.containsKey("connection.max-retry-timeout")) {
jdbcOptionsBuilder.setConnectionCheckTimeoutSeconds(Integer.valueOf(sink.get("connection.max-retry-timeout")));
}
if (sink.containsKey("url")) {
jdbcOptionsBuilder.setDBUrl(sink.get("url"));
}
if (sink.containsKey("dialect")) {
switch (sink.get("dialect")) {
case "MySql":
jdbcOptionsBuilder.setDialect(new MySQLDialect());
break;
case "Oracle":
jdbcOptionsBuilder.setDialect(new OracleDialect());
break;
case "ClickHouse":
jdbcOptionsBuilder.setDialect(new ClickHouseDialect());
break;
case "SQLServer":
jdbcOptionsBuilder.setDialect(new SQLServerDialect());
break;
case "Postgres":
jdbcOptionsBuilder.setDialect(new PostgresDialect());
break;
}
}
if (sink.containsKey("driver")) {
jdbcOptionsBuilder.setDriverName(sink.get("driver"));
}
if (sink.containsKey("sink.parallelism")) {
jdbcOptionsBuilder.setParallelism(Integer.valueOf(sink.get("sink.parallelism")));
}
if (sink.containsKey("password")) {
jdbcOptionsBuilder.setPassword(sink.get("password"));
}
if (sink.containsKey("username")) {
jdbcOptionsBuilder.setUsername(sink.get("username"));
}
jdbcOptionsBuilder.setTableName(schemaTableName);
builder.setOptions(jdbcOptionsBuilder.build());
builder.setTableSchema(TableSchema.fromTypeInfo(rowDataDataStream.getType()));
/*JdbcUpsertTableSink build = builder.build();
build.consumeDataStream(rowDataDataStream);
rowDataDataStream.addSink(build.);*/
}
/*@Override /*@Override
public DataStreamSource build(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) { public DataStreamSource build(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
...@@ -111,7 +180,7 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -111,7 +180,7 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
} }
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table item : schema.getTables()) { for (Table item : schema.getTables()) {
customTableEnvironment.executeSql(item.getFlinkTableSql(sb.toString()+"'table-name' = '"+item.getSchemaTableName()+"'\n")); customTableEnvironment.executeSql(item.getFlinkTableSql(sb.toString() + "'table-name' = '" + item.getSchemaTableName() + "'\n"));
List<Operation> operations = customTableEnvironment.getParser().parse(cdcBuilder.getInsertSQL(item, TABLE_NAME)); List<Operation> operations = customTableEnvironment.getParser().parse(cdcBuilder.getInsertSQL(item, TABLE_NAME));
if (operations.size() > 0) { if (operations.size() > 0) {
Operation operation = operations.get(0); Operation operation = operations.get(0);
......
...@@ -4,10 +4,13 @@ import org.apache.flink.api.common.functions.FilterFunction; ...@@ -4,10 +4,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
...@@ -50,7 +53,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -50,7 +53,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
} }
@Override @Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) { public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) { if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
config.getSink().get("topic"), config.getSink().get("topic"),
...@@ -94,4 +101,12 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -94,4 +101,12 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
} }
return dataStreamSource; return dataStreamSource;
} }
@Override
public void addSink(
DataStream<RowData> rowDataDataStream,
String schemaTableName,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
}
} }
...@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder { ...@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder {
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("."); String[] names = tableName.split("\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
schemaList.add(names[0]); schemaList.add(names[0]);
} }
......
package com.dlink.cdc; package com.dlink.cdc;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* AbstractCDCBuilder * AbstractCDCBuilder
...@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder { ...@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder {
public void setConfig(FlinkCDCConfig config) { public void setConfig(FlinkCDCConfig config) {
this.config = config; this.config = config;
} }
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
return properties;
}
protected SingleOutputStreamOperator<Map> deserialize(DataStreamSource<String> dataStreamSource) {
return dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
}
protected SingleOutputStreamOperator<Map> shunt(
SingleOutputStreamOperator<Map> mapOperator,
Table table,
String schemaFieldName) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
return mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() {
@Override
public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) {
case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(igenericRowData);
break;
case "d":
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
dgenericRowData.setRowKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(dgenericRowData);
break;
case "u":
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubgenericRowData);
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uagenericRowData);
break;
}
}
});
}
public abstract void addSink(
DataStream<RowData> rowDataDataStream,
String schemaTableName,
List<String> columnNameList,
List<LogicalType> columnTypeList);
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(rowDataDataStream, table.getSchemaTableName(), columnNameList, columnTypeList);
}
}
}
return dataStreamSource;
}
protected void buildColumn(List<String> columnNameList, List<LogicalType> columnTypeList, List<Column> columns) {
for (Column column : columns) {
columnNameList.add(column.getName());
columnTypeList.add(getLogicalType(column.getJavaType()));
}
}
protected LogicalType getLogicalType(ColumnType columnType) {
switch (columnType) {
case STRING:
return new VarCharType();
case BOOLEAN:
case JAVA_LANG_BOOLEAN:
return new BooleanType();
case BYTE:
case JAVA_LANG_BYTE:
return new TinyIntType();
case SHORT:
case JAVA_LANG_SHORT:
return new SmallIntType();
case LONG:
case JAVA_LANG_LONG:
return new BigIntType();
case FLOAT:
case JAVA_LANG_FLOAT:
return new FloatType();
case DOUBLE:
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType();
case INT:
case INTEGER:
return new IntType();
default:
return new VarCharType();
}
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value);
} else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else {
return value;
}
}
} }
package com.dlink.cdc.jdbc; package com.dlink.cdc.jdbc;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.table.data.RowData;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder; import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
...@@ -37,6 +28,11 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -37,6 +28,11 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
super(config); super(config);
} }
@Override
public void addSink(DataStream<RowData> rowDataDataStream, String schemaTableName, List<String> columnNameList, List<LogicalType> columnTypeList) {
}
@Override @Override
public String getHandle() { public String getHandle() {
return KEY_WORD; return KEY_WORD;
...@@ -47,78 +43,4 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -47,78 +43,4 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
return new JdbcSinkBuilder(config); return new JdbcSinkBuilder(config);
} }
/*@Override
public DataStreamSource build(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
*//*dataStreamSource.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
return value.containsKey("table_name") && table.getName().equals(value.get("table_name"));
}
});
dataStreamSource.addSink(
JdbcSink.sink(
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));*//*
}
}
}
return dataStreamSource;
}*/
@Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
/*org.apache.flink.table.api.Table table = env.fromChangelogStream(dataStreamSource);
env.registerTable("cdc_table",table);*/
customTableEnvironment.registerDataStream(TABLE_NAME, dataStreamSource);
List<ModifyOperation> modifyOperations = new ArrayList();
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : config.getSink().entrySet()) {
sb.append("'");
sb.append(entry.getKey());
sb.append("' = '");
sb.append(entry.getValue());
sb.append("',\n");
}
for (Schema schema : schemaList) {
for (Table item : schema.getTables()) {
customTableEnvironment.executeSql(item.getFlinkTableSql(sb.toString() + "'table-name' = '" + item.getSchemaTableName() + "'\n"));
List<Operation> operations = customTableEnvironment.getParser().parse(cdcBuilder.getInsertSQL(item, TABLE_NAME));
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}
}
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
}
return dataStreamSource;
}
} }
...@@ -6,9 +6,12 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema; ...@@ -6,9 +6,12 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
...@@ -40,6 +43,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -40,6 +43,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
super(config); super(config);
} }
@Override
public void addSink(DataStream<RowData> rowDataDataStream, String schemaTableName, List<String> columnNameList, List<LogicalType> columnTypeList) {
}
@Override @Override
public String getHandle() { public String getHandle() {
return KEY_WORD; return KEY_WORD;
...@@ -51,7 +59,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -51,7 +59,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
} }
@Override @Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) { public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) { if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.sinkTo(KafkaSink.<String>builder() dataStreamSource.sinkTo(KafkaSink.<String>builder()
.setBootstrapServers(config.getSink().get("brokers")) .setBootstrapServers(config.getSink().get("brokers"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment