Commit 7c753b85 authored by wenmo's avatar wenmo

[Feature-422][*] CDCSource sync kafka topics

parent 89cab925
...@@ -33,6 +33,10 @@ ...@@ -33,6 +33,10 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.11</artifactId> <artifactId>dlink-client-1.11</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.11</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.*;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
/**
* 定制TableResultImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
class CustomTableResultImpl implements TableResult {
public static final TableResult TABLE_RESULT_OK =
CustomTableResultImpl.builder()
.resultKind(ResultKind.SUCCESS)
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
.data(Collections.singletonList(Row.of("OK")))
.build();
private final JobClient jobClient;
private final TableSchema tableSchema;
private final ResultKind resultKind;
private final CloseableIterator<Row> data;
private final PrintStyle printStyle;
private CustomTableResultImpl(
@Nullable JobClient jobClient,
TableSchema tableSchema,
ResultKind resultKind,
CloseableIterator<Row> data,
PrintStyle printStyle) {
this.jobClient = jobClient;
this.tableSchema =
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.data = Preconditions.checkNotNull(data, "data should not be null");
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields, List<Row> rows) {
Builder builder = builder().resultKind(ResultKind.SUCCESS);
if (fields.size() > 0) {
TableSchema.Builder tableSchemaBuild = TableSchema.builder();
for (int i = 0; i < fields.size(); i++) {
tableSchemaBuild.field(fields.get(i).getName(), fields.get(i).getType());
}
builder.tableSchema(tableSchemaBuild.build()).data(rows);
}
return builder.build();
}
@Override
public Optional<JobClient> getJobClient() {
return Optional.ofNullable(jobClient);
}
@Override
public TableSchema getTableSchema() {
return tableSchema;
}
@Override
public ResultKind getResultKind() {
return resultKind;
}
@Override
public CloseableIterator<Row> collect() {
return data;
}
@Override
public void print() {
Iterator<Row> it = collect();
if (printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle) printStyle).getNullColumn();
boolean deriveColumnWidthByType =
((TableauStyle) printStyle).isDeriveColumnWidthByType();
PrintUtils.printAsTableauForm(
getTableSchema(),
it,
new PrintWriter(System.out),
maxColumnWidth,
nullColumn,
deriveColumnWidthByType);
} else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
}
} else {
throw new TableException("Unsupported print style: " + printStyle);
}
}
public static Builder builder() {
return new Builder();
}
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public static class Builder {
private JobClient jobClient = null;
private TableSchema tableSchema = null;
private ResultKind resultKind = null;
private CloseableIterator<Row> data = null;
private PrintStyle printStyle =
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false);
private Builder() {
}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
/**
* Specifies table schema of the execution result.
*
* @param tableSchema a {@link TableSchema} for the execution result.
*/
public Builder tableSchema(TableSchema tableSchema) {
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.tableSchema = tableSchema;
return this;
}
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
/**
* Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
}
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
}
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
/**
* Returns a {@link TableResult} instance.
*/
public TableResult build() {
return new CustomTableResultImpl(jobClient, tableSchema, resultKind, data, printStyle);
}
}
/**
* Root interface for all print styles.
*/
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, and a flag to
* indicate whether the column width is derived from type (true) or content (false), which
* prints the result schema and content as tableau form.
*/
static PrintStyle tableau(
int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) {
Preconditions.checkArgument(
maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType);
}
/**
* Create a raw content print style, which only print the result content as raw form. column
* delimiter is ",", row delimiter is "\n".
*/
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
/**
* print the result schema and content as tableau form.
*/
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
* (false).
*/
private final boolean deriveColumnWidthByType;
private final int maxColumnWidth;
private final String nullColumn;
private TableauStyle(
int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) {
this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
}
public boolean isDeriveColumnWidthByType() {
return deriveColumnWidthByType;
}
int getMaxColumnWidth() {
return maxColumnWidth;
}
String getNullColumn() {
return nullColumn;
}
}
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {
}
}
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
...@@ -33,6 +33,10 @@ ...@@ -33,6 +33,10 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId> <artifactId>dlink-client-1.12</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.12</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
...@@ -33,6 +33,10 @@ ...@@ -33,6 +33,10 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.13</artifactId> <artifactId>dlink-client-1.13</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.13</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
...@@ -33,6 +33,10 @@ ...@@ -33,6 +33,10 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.14</artifactId> <artifactId>dlink-client-1.14</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.14</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/10/22 10:02
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
...@@ -25,7 +25,12 @@ ...@@ -25,7 +25,12 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.13</artifactId> <artifactId>dlink-client-${dlink.flink.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-${dlink.flink.version}</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
...@@ -13,11 +13,8 @@ ...@@ -13,11 +13,8 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.6</flink.version>
<flinkcdc.version>1.1.0</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties> </properties>
<dependencies> <dependencies>
...@@ -26,81 +23,14 @@ ...@@ -26,81 +23,14 @@
<artifactId>dlink-client-base</artifactId> <artifactId>dlink-client-base</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-hadoop</artifactId> <artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId> <artifactId>dlink-flink-1.11</artifactId>
<scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
package com.dlink.cdc; package com.dlink.cdc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
/** /**
...@@ -26,4 +32,35 @@ public abstract class AbstractCDCBuilder { ...@@ -26,4 +32,35 @@ public abstract class AbstractCDCBuilder {
public void setConfig(FlinkCDCConfig config) { public void setConfig(FlinkCDCConfig config) {
this.config = config; this.config = config;
} }
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getSchema();
if (Asserts.isNullString(schema)) {
return schemaList;
}
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split(".");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
}
return schemaList;
}
public List<String> getTableList() {
List<String> tableList = new ArrayList<>();
String table = config.getTable();
if (Asserts.isNullString(table)) {
return tableList;
}
String[] tables = table.split(FlinkParamConstant.SPLIT);
Collections.addAll(tableList, tables);
return tableList;
}
} }
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config;
public AbstractSinkBuilder() {
}
public AbstractSinkBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
}
...@@ -3,7 +3,11 @@ package com.dlink.cdc; ...@@ -3,7 +3,11 @@ package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* CDCBuilder * CDCBuilder
...@@ -17,5 +21,13 @@ public interface CDCBuilder { ...@@ -17,5 +21,13 @@ public interface CDCBuilder {
CDCBuilder create(FlinkCDCConfig config); CDCBuilder create(FlinkCDCConfig config);
DataStreamSource build(StreamExecutionEnvironment env); DataStreamSource<String> build(StreamExecutionEnvironment env);
List<String> getSchemaList();
List<String> getTableList();
Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
} }
package com.dlink.cdc;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
/**
* FlinkCDCMergeBuilder
*
* @author wenmo
* @since 2022/1/29 22:37
*/
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if (Asserts.isNotNull(config.getParallelism())) {
env.setParallelism(config.getParallelism());
}
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
DataStreamSource<String> streamSource = CDCBuilderFactory.buildCDCBuilder(config).build(env);
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
/**
* SinkBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface SinkBuilder {
String getHandle();
SinkBuilder create(FlinkCDCConfig config);
DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource);
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.kafka.KafkaSinkBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;
/**
* SinkBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public class SinkBuilderFactory {
private static SinkBuilder[] sinkBuilders = {
new KafkaSinkBuilder(),
};
public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getSink().get("connector"))) {
throw new FlinkClientException("请指定 Sink connector。");
}
for (int i = 0; i < sinkBuilders.length; i++) {
if (config.getSink().get("connector").equals(sinkBuilders[i].getHandle())) {
return sinkBuilders[i].create(config);
}
}
throw new FlinkClientException("未匹配到对应 Sink 类型的【" + config.getSink().get("connector") + "】。");
}
}
package com.dlink.cdc.kafka;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "kafka";
public KafkaSinkBuilder() {
}
public KafkaSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new KafkaSinkBuilder(config);
}
@Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
config.getSink().get("topic"),
new SimpleStringSchema()));
} else {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get("db").toString());
}
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
@Override
public String map(Map value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(value);
}
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
table.getSchemaTableName(),
new SimpleStringSchema()));
}
}
}
}
return dataStreamSource;
}
}
...@@ -3,13 +3,21 @@ package com.dlink.cdc.mysql; ...@@ -3,13 +3,21 @@ package com.dlink.cdc.mysql;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder; import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
...@@ -20,6 +28,7 @@ import com.dlink.model.FlinkCDCConfig; ...@@ -20,6 +28,7 @@ import com.dlink.model.FlinkCDCConfig;
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc"; private String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() { public MysqlCDCBuilder() {
} }
...@@ -55,4 +64,58 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -55,4 +64,58 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
.deserializer(new StringDebeziumDeserializationSchema()); .deserializer(new StringDebeziumDeserializationSchema());
return env.addSource(sourceBuilder.build(), "MySQL CDC Source"); return env.addSource(sourceBuilder.build(), "MySQL CDC Source");
} }
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase();
if (Asserts.isNullString(schema)) {
return schemaList;
}
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
return schemaList;
}
public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigMap = new HashMap<>();
List<String> schemaList = getSchemaList();
for (String schema : schemaList) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:mysql://");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append("/");
sb.append(schema);
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
allConfigMap.put(schema, configMap);
}
return allConfigMap;
}
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
/* sb.append(" WHERE database_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");*/
return sb.toString();
}
} }
package com.dlink.executor; package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
...@@ -11,11 +7,13 @@ import org.apache.flink.configuration.PipelineOptions; ...@@ -11,11 +7,13 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
...@@ -38,12 +36,19 @@ import org.apache.flink.table.operations.Operation; ...@@ -38,12 +36,19 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase; import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils; import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.exception.FlinkClientException;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/** /**
* 定制TableEnvironmentImpl * 定制TableEnvironmentImpl
* *
...@@ -52,10 +57,31 @@ import java.util.Map; ...@@ -52,10 +57,31 @@ import java.util.Map;
**/ **/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment { public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment {
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) { private final StreamExecutionEnvironment executionEnvironment;
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
public CustomTableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
FunctionCatalog functionCatalog,
TableConfig tableConfig,
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
boolean isStreamingMode,
ClassLoader userClassLoader) {
super(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
isStreamingMode,
userClassLoader);
this.executionEnvironment = executionEnvironment;
} }
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) { public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build()); return create(executionEnvironment, EnvironmentSettings.newInstance().build());
} }
...@@ -72,26 +98,79 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -72,26 +98,79 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
return create(executionEnvironment, settings, new TableConfig()); return create(executionEnvironment, settings, new TableConfig());
} }
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) { public static CustomTableEnvironmentImpl create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings,
TableConfig tableConfig) {
if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
}
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager(); ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.executionConfig(executionEnvironment.getConfig())
.build();
FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties(); Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment); Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties(); Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = (ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager); Planner planner =
return new CustomTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader); ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
return new CustomTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
settings.isStreamingMode(),
classLoader);
} }
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
try { try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); ExecutorFactory executorFactory =
Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class); ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment); Method createMethod =
} catch (Exception var4) { executorFactory
throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", var4); .getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor)
createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
} }
} }
...@@ -167,7 +246,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -167,7 +246,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
record.setParseTrue(true); record.setParseTrue(true);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException( throw new TableException(
"Unsupported SQL query! explainSql() only accepts a single SQL query."); "Unsupported SQL query! explainSql() only accepts a single SQL query.");
} }
List<Operation> operationlist = new ArrayList<>(operations); List<Operation> operationlist = new ArrayList<>(operations);
for (int i = 0; i < operationlist.size(); i++) { for (int i = 0; i < operationlist.size(); i++) {
...@@ -213,4 +292,20 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -213,4 +292,20 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) { public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
return false; return false;
} }
@Override
public Table fromChangelogStream(DataStream<Row> dataStream) {
throw new FlinkClientException("Flink 1.12 not support");
}
@Override
public <T> void registerDataStream(String name, DataStream<T> dataStream) {
throw new FlinkClientException("Flink 1.12 not support");
}
@Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream) {
throw new FlinkClientException("Flink 1.12 not support");
}
} }
...@@ -2,19 +2,24 @@ package com.dlink.executor; ...@@ -2,19 +2,24 @@ package com.dlink.executor;
import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.*; import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import javax.annotation.Nullable;
/** /**
* 定制TableResultImpl * 定制TableResultImpl
* *
...@@ -24,11 +29,11 @@ import java.util.Optional; ...@@ -24,11 +29,11 @@ import java.util.Optional;
@Internal @Internal
class CustomTableResultImpl implements TableResult { class CustomTableResultImpl implements TableResult {
public static final TableResult TABLE_RESULT_OK = public static final TableResult TABLE_RESULT_OK =
CustomTableResultImpl.builder() CustomTableResultImpl.builder()
.resultKind(ResultKind.SUCCESS) .resultKind(ResultKind.SUCCESS)
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()) .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
.data(Collections.singletonList(Row.of("OK"))) .data(Collections.singletonList(Row.of("OK")))
.build(); .build();
private final JobClient jobClient; private final JobClient jobClient;
private final TableSchema tableSchema; private final TableSchema tableSchema;
...@@ -37,14 +42,14 @@ class CustomTableResultImpl implements TableResult { ...@@ -37,14 +42,14 @@ class CustomTableResultImpl implements TableResult {
private final PrintStyle printStyle; private final PrintStyle printStyle;
private CustomTableResultImpl( private CustomTableResultImpl(
@Nullable JobClient jobClient, @Nullable JobClient jobClient,
TableSchema tableSchema, TableSchema tableSchema,
ResultKind resultKind, ResultKind resultKind,
CloseableIterator<Row> data, CloseableIterator<Row> data,
PrintStyle printStyle) { PrintStyle printStyle) {
this.jobClient = jobClient; this.jobClient = jobClient;
this.tableSchema = this.tableSchema =
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null"); Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null"); this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.data = Preconditions.checkNotNull(data, "data should not be null"); this.data = Preconditions.checkNotNull(data, "data should not be null");
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null"); this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
...@@ -89,14 +94,14 @@ class CustomTableResultImpl implements TableResult { ...@@ -89,14 +94,14 @@ class CustomTableResultImpl implements TableResult {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth(); int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle) printStyle).getNullColumn(); String nullColumn = ((TableauStyle) printStyle).getNullColumn();
boolean deriveColumnWidthByType = boolean deriveColumnWidthByType =
((TableauStyle) printStyle).isDeriveColumnWidthByType(); ((TableauStyle) printStyle).isDeriveColumnWidthByType();
PrintUtils.printAsTableauForm( PrintUtils.printAsTableauForm(
getTableSchema(), getTableSchema(),
it, it,
new PrintWriter(System.out), new PrintWriter(System.out),
maxColumnWidth, maxColumnWidth,
nullColumn, nullColumn,
deriveColumnWidthByType); deriveColumnWidthByType);
} else if (printStyle instanceof RawContentStyle) { } else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) { while (it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString(it.next()))); System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
...@@ -119,7 +124,7 @@ class CustomTableResultImpl implements TableResult { ...@@ -119,7 +124,7 @@ class CustomTableResultImpl implements TableResult {
private ResultKind resultKind = null; private ResultKind resultKind = null;
private CloseableIterator<Row> data = null; private CloseableIterator<Row> data = null;
private PrintStyle printStyle = private PrintStyle printStyle =
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false); PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false);
private Builder() { private Builder() {
} }
...@@ -205,9 +210,9 @@ class CustomTableResultImpl implements TableResult { ...@@ -205,9 +210,9 @@ class CustomTableResultImpl implements TableResult {
* prints the result schema and content as tableau form. * prints the result schema and content as tableau form.
*/ */
static PrintStyle tableau( static PrintStyle tableau(
int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) { int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) {
Preconditions.checkArgument( Preconditions.checkArgument(
maxColumnWidth > 0, "maxColumnWidth should be greater than 0"); maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null"); Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType); return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType);
} }
...@@ -235,7 +240,7 @@ class CustomTableResultImpl implements TableResult { ...@@ -235,7 +240,7 @@ class CustomTableResultImpl implements TableResult {
private final String nullColumn; private final String nullColumn;
private TableauStyle( private TableauStyle(
int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) { int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) {
this.deriveColumnWidthByType = deriveColumnWidthByType; this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth; this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn; this.nullColumn = nullColumn;
......
...@@ -14,11 +14,8 @@ ...@@ -14,11 +14,8 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.7</flink.version>
<flinkcdc.version>1.3.0</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties> </properties>
<dependencies> <dependencies>
...@@ -27,81 +24,14 @@ ...@@ -27,81 +24,14 @@
<artifactId>dlink-client-base</artifactId> <artifactId>dlink-client-base</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-hadoop</artifactId> <artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId> <artifactId>dlink-flink-1.12</artifactId>
<scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
package com.dlink.cdc; package com.dlink.cdc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
/** /**
...@@ -26,4 +32,35 @@ public abstract class AbstractCDCBuilder { ...@@ -26,4 +32,35 @@ public abstract class AbstractCDCBuilder {
public void setConfig(FlinkCDCConfig config) { public void setConfig(FlinkCDCConfig config) {
this.config = config; this.config = config;
} }
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getSchema();
if (Asserts.isNullString(schema)) {
return schemaList;
}
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split(".");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
}
return schemaList;
}
public List<String> getTableList() {
List<String> tableList = new ArrayList<>();
String table = config.getTable();
if (Asserts.isNullString(table)) {
return tableList;
}
String[] tables = table.split(FlinkParamConstant.SPLIT);
Collections.addAll(tableList, tables);
return tableList;
}
} }
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config;
public AbstractSinkBuilder() {
}
public AbstractSinkBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
}
...@@ -3,7 +3,11 @@ package com.dlink.cdc; ...@@ -3,7 +3,11 @@ package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* CDCBuilder * CDCBuilder
...@@ -17,5 +21,13 @@ public interface CDCBuilder { ...@@ -17,5 +21,13 @@ public interface CDCBuilder {
CDCBuilder create(FlinkCDCConfig config); CDCBuilder create(FlinkCDCConfig config);
DataStreamSource build(StreamExecutionEnvironment env); DataStreamSource<String> build(StreamExecutionEnvironment env);
List<String> getSchemaList();
List<String> getTableList();
Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
} }
package com.dlink.cdc;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
/**
* FlinkCDCMergeBuilder
*
* @author wenmo
* @since 2022/1/29 22:37
*/
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if (Asserts.isNotNull(config.getParallelism())) {
env.setParallelism(config.getParallelism());
}
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
DataStreamSource<String> streamSource = CDCBuilderFactory.buildCDCBuilder(config).build(env);
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
/**
* SinkBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface SinkBuilder {
String getHandle();
SinkBuilder create(FlinkCDCConfig config);
DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource);
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.kafka.KafkaSinkBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;
/**
* SinkBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public class SinkBuilderFactory {
private static SinkBuilder[] sinkBuilders = {
new KafkaSinkBuilder(),
};
public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getSink().get("connector"))) {
throw new FlinkClientException("请指定 Sink connector。");
}
for (int i = 0; i < sinkBuilders.length; i++) {
if (config.getSink().get("connector").equals(sinkBuilders[i].getHandle())) {
return sinkBuilders[i].create(config);
}
}
throw new FlinkClientException("未匹配到对应 Sink 类型的【" + config.getSink().get("connector") + "】。");
}
}
package com.dlink.cdc.kafka;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "kafka";
public KafkaSinkBuilder() {
}
public KafkaSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new KafkaSinkBuilder(config);
}
@Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
config.getSink().get("topic"),
new SimpleStringSchema()));
} else {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get("db").toString());
}
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
@Override
public String map(Map value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(value);
}
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
table.getSchemaTableName(),
new SimpleStringSchema()));
}
}
}
}
return dataStreamSource;
}
}
...@@ -3,14 +3,22 @@ package com.dlink.cdc.mysql; ...@@ -3,14 +3,22 @@ package com.dlink.cdc.mysql;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder; import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
...@@ -21,6 +29,7 @@ import com.dlink.model.FlinkCDCConfig; ...@@ -21,6 +29,7 @@ import com.dlink.model.FlinkCDCConfig;
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc"; private String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() { public MysqlCDCBuilder() {
} }
...@@ -73,4 +82,58 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -73,4 +82,58 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
return env.addSource(sourceBuilder.build(), "MySQL CDC Source"); return env.addSource(sourceBuilder.build(), "MySQL CDC Source");
} }
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase();
if (Asserts.isNullString(schema)) {
return schemaList;
}
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
return schemaList;
}
public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigMap = new HashMap<>();
List<String> schemaList = getSchemaList();
for (String schema : schemaList) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:mysql://");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append("/");
sb.append(schema);
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
allConfigMap.put(schema, configMap);
}
return allConfigMap;
}
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
/* sb.append(" WHERE database_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");*/
return sb.toString();
}
} }
package com.dlink.executor; package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
...@@ -13,11 +9,13 @@ import org.apache.flink.configuration.PipelineOptions; ...@@ -13,11 +9,13 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
...@@ -40,12 +38,19 @@ import org.apache.flink.table.operations.Operation; ...@@ -40,12 +38,19 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase; import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils; import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.exception.FlinkClientException;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/** /**
* 定制TableEnvironmentImpl * 定制TableEnvironmentImpl
* *
...@@ -54,8 +59,28 @@ import java.util.Map; ...@@ -54,8 +59,28 @@ import java.util.Map;
**/ **/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment { public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment {
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) { private final StreamExecutionEnvironment executionEnvironment;
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
public CustomTableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
FunctionCatalog functionCatalog,
TableConfig tableConfig,
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
boolean isStreamingMode,
ClassLoader userClassLoader) {
super(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
isStreamingMode,
userClassLoader);
this.executionEnvironment = executionEnvironment;
} }
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) { public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
...@@ -74,29 +99,83 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -74,29 +99,83 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
return create(executionEnvironment, settings, new TableConfig()); return create(executionEnvironment, settings, new TableConfig());
} }
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) { public static CustomTableEnvironmentImpl create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings,
TableConfig tableConfig) {
if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
}
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager(); ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.executionConfig(executionEnvironment.getConfig())
.build();
FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties(); Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment); Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties(); Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = (ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager); Planner planner =
return new CustomTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader); ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
return new CustomTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
settings.isStreamingMode(),
classLoader);
} }
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
try { try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); ExecutorFactory executorFactory =
Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class); ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment); Method createMethod =
} catch (Exception var4) { executorFactory
throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", var4); .getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor)
createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
} }
} }
public ObjectNode getStreamGraph(String statement) { public ObjectNode getStreamGraph(String statement) {
List<Operation> operations = super.parser.parse(statement); List<Operation> operations = super.parser.parse(statement);
if (operations.size() != 1) { if (operations.size() != 1) {
...@@ -169,7 +248,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -169,7 +248,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
record.setParseTrue(true); record.setParseTrue(true);
if (operations.size() != 1) { if (operations.size() != 1) {
throw new TableException( throw new TableException(
"Unsupported SQL query! explainSql() only accepts a single SQL query."); "Unsupported SQL query! explainSql() only accepts a single SQL query.");
} }
List<Operation> operationlist = new ArrayList<>(operations); List<Operation> operationlist = new ArrayList<>(operations);
for (int i = 0; i < operationlist.size(); i++) { for (int i = 0; i < operationlist.size(); i++) {
...@@ -215,4 +294,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -215,4 +294,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) { public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
return false; return false;
} }
@Override
public Table fromChangelogStream(DataStream<Row> dataStream) {
throw new FlinkClientException("Flink 1.12 not support");
}
@Override
public <T> void registerDataStream(String name, DataStream<T> dataStream) {
throw new FlinkClientException("Flink 1.12 not support");
}
@Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream) {
throw new FlinkClientException("Flink 1.12 not support");
}
} }
\ No newline at end of file
...@@ -14,114 +14,23 @@ ...@@ -14,114 +14,23 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.6</flink.version>
<flinkcdc.version>2.2.0</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-base</artifactId> <artifactId>dlink-client-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-hadoop</artifactId> <artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId> <artifactId>dlink-flink-1.13</artifactId>
<scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<!--<build> <!--<build>
......
package com.dlink.cdc; package com.dlink.cdc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
/** /**
...@@ -26,4 +32,35 @@ public abstract class AbstractCDCBuilder { ...@@ -26,4 +32,35 @@ public abstract class AbstractCDCBuilder {
public void setConfig(FlinkCDCConfig config) { public void setConfig(FlinkCDCConfig config) {
this.config = config; this.config = config;
} }
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getSchema();
if (Asserts.isNullString(schema)) {
return schemaList;
}
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split(".");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
}
return schemaList;
}
public List<String> getTableList() {
List<String> tableList = new ArrayList<>();
String table = config.getTable();
if (Asserts.isNullString(table)) {
return tableList;
}
String[] tables = table.split(FlinkParamConstant.SPLIT);
Collections.addAll(tableList, tables);
return tableList;
}
} }
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config;
public AbstractSinkBuilder() {
}
public AbstractSinkBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
}
...@@ -3,7 +3,11 @@ package com.dlink.cdc; ...@@ -3,7 +3,11 @@ package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* CDCBuilder * CDCBuilder
...@@ -17,5 +21,13 @@ public interface CDCBuilder { ...@@ -17,5 +21,13 @@ public interface CDCBuilder {
CDCBuilder create(FlinkCDCConfig config); CDCBuilder create(FlinkCDCConfig config);
DataStreamSource build(StreamExecutionEnvironment env); DataStreamSource<String> build(StreamExecutionEnvironment env);
List<String> getSchemaList();
List<String> getTableList();
Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
} }
package com.dlink.cdc;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
/**
* FlinkCDCMergeBuilder
*
* @author wenmo
* @since 2022/1/29 22:37
*/
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) {
if (Asserts.isNotNull(config.getParallelism())) {
env.setParallelism(config.getParallelism());
}
if (Asserts.isNotNull(config.getCheckpoint())) {
env.enableCheckpointing(config.getCheckpoint());
}
DataStreamSource<String> streamSource = CDCBuilderFactory.buildCDCBuilder(config).build(env);
streamSource.addSink(getKafkaProducer(config.getBrokers(), config.getTopic()));
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
/**
* SinkBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface SinkBuilder {
String getHandle();
SinkBuilder create(FlinkCDCConfig config);
DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource);
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.jdbc.JdbcSinkBuilder;
import com.dlink.cdc.kafka.KafkaSinkBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;
/**
* SinkBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public class SinkBuilderFactory {
private static SinkBuilder[] sinkBuilders = {
new KafkaSinkBuilder(),
new JdbcSinkBuilder(),
};
public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getSink().get("connector"))) {
throw new FlinkClientException("请指定 Sink connector。");
}
for (int i = 0; i < sinkBuilders.length; i++) {
if (config.getSink().get("connector").equals(sinkBuilders[i].getHandle())) {
return sinkBuilders[i].create(config);
}
}
throw new FlinkClientException("未匹配到对应 Sink 类型的【" + config.getSink().get("connector") + "】。");
}
}
package com.dlink.cdc.jdbc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "jdbc";
private final static String TABLE_NAME = "cdc_table";
public JdbcSinkBuilder() {
}
public JdbcSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new JdbcSinkBuilder(config);
}
/*@Override
public DataStreamSource build(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
*//*dataStreamSource.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
return value.containsKey("table_name") && table.getName().equals(value.get("table_name"));
}
});
dataStreamSource.addSink(
JdbcSink.sink(
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));*//*
}
}
}
return dataStreamSource;
}*/
@Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
/*org.apache.flink.table.api.Table table = env.fromChangelogStream(dataStreamSource);
env.registerTable("cdc_table",table);*/
customTableEnvironment.registerDataStream(TABLE_NAME, dataStreamSource);
List<ModifyOperation> modifyOperations = new ArrayList();
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : config.getSink().entrySet()) {
sb.append("'");
sb.append(entry.getKey());
sb.append("' = '");
sb.append(entry.getValue());
sb.append("',\n");
}
for (Schema schema : schemaList) {
for (Table item : schema.getTables()) {
customTableEnvironment.executeSql(item.getFlinkTableSql(sb.toString()+"'table-name' = '"+item.getSchemaTableName()+"'\n"));
List<Operation> operations = customTableEnvironment.getParser().parse(cdcBuilder.getInsertSQL(item, TABLE_NAME));
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}
}
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
}
return dataStreamSource;
}
}
package com.dlink.cdc.kafka;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "kafka";
public KafkaSinkBuilder() {
}
public KafkaSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new KafkaSinkBuilder(config);
}
@Override
public DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
config.getSink().get("topic"),
new SimpleStringSchema()));
} else {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get("db").toString());
}
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
@Override
public String map(Map value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(value);
}
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
table.getSchemaTableName(),
new SimpleStringSchema()));
}
}
}
}
return dataStreamSource;
}
}
...@@ -4,11 +4,19 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; ...@@ -4,11 +4,19 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder; import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.table.StartupOptions;
...@@ -22,7 +30,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; ...@@ -22,7 +30,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/ **/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc"; private final static String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() { public MysqlCDCBuilder() {
} }
...@@ -77,4 +86,58 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -77,4 +86,58 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source"); return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
} }
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase();
if (Asserts.isNullString(schema)) {
return schemaList;
}
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
return schemaList;
}
public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigMap = new HashMap<>();
List<String> schemaList = getSchemaList();
for (String schema : schemaList) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:mysql://");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append("/");
sb.append(schema);
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
allConfigMap.put(schema, configMap);
}
return allConfigMap;
}
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
sb.append(" WHERE database_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");
return sb.toString();
}
} }
...@@ -3,10 +3,16 @@ package com.dlink.cdc.oracle; ...@@ -3,10 +3,16 @@ package com.dlink.cdc.oracle;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder; import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions; import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
...@@ -19,7 +25,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; ...@@ -19,7 +25,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/ **/
public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "oracle-cdc"; private final static String KEY_WORD = "oracle-cdc";
private final static String METADATA_TYPE = "Oracle";
public OracleCDCBuilder() { public OracleCDCBuilder() {
} }
...@@ -71,4 +78,47 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -71,4 +78,47 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
return env.addSource(sourceBuilder.build(), "Oracle CDC Source"); return env.addSource(sourceBuilder.build(), "Oracle CDC Source");
} }
public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigList = new HashMap<>();
List<String> schemaList = getSchemaList();
for (String schema : schemaList) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:oracle:thin:@");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append(":");
sb.append(config.getDatabase());
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
allConfigList.put(schema, configMap);
}
return allConfigList;
}
@Override
public String getInsertSQL(Table table, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(table.getName());
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`" + table.getColumns().get(i).getName() + "` \n");
}
sb.append(" FROM ");
sb.append(sourceName);
sb.append(" WHERE schema_name = '");
sb.append(table.getSchema());
sb.append("' and table_name = '");
sb.append(table.getName());
sb.append("'");
return sb.toString();
}
} }
...@@ -13,12 +13,8 @@ ...@@ -13,12 +13,8 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.3</flink.version>
<flinkcdc.version>2.2.0</flinkcdc.version>
<commons.version>1.3.1</commons.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties> </properties>
<dependencies> <dependencies>
...@@ -27,91 +23,14 @@ ...@@ -27,91 +23,14 @@
<artifactId>dlink-client-base</artifactId> <artifactId>dlink-client-base</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId> <artifactId>dlink-common</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-cli</groupId> <groupId>com.dlink</groupId>
<artifactId>commons-cli</artifactId> <artifactId>dlink-flink-1.14</artifactId>
<version>${commons.version}</version> <!-- <scope>provided</scope>-->
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
package com.dlink.cdc; package com.dlink.cdc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
/** /**
...@@ -26,4 +32,35 @@ public abstract class AbstractCDCBuilder { ...@@ -26,4 +32,35 @@ public abstract class AbstractCDCBuilder {
public void setConfig(FlinkCDCConfig config) { public void setConfig(FlinkCDCConfig config) {
this.config = config; this.config = config;
} }
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getSchema();
if (Asserts.isNullString(schema)) {
return schemaList;
}
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split(".");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
}
return schemaList;
}
public List<String> getTableList() {
List<String> tableList = new ArrayList<>();
String table = config.getTable();
if (Asserts.isNullString(table)) {
return tableList;
}
String[] tables = table.split(FlinkParamConstant.SPLIT);
Collections.addAll(tableList, tables);
return tableList;
}
} }
package com.dlink.cdc;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config;
public AbstractSinkBuilder() {
}
public AbstractSinkBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
}
...@@ -3,7 +3,11 @@ package com.dlink.cdc; ...@@ -3,7 +3,11 @@ package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* CDCBuilder * CDCBuilder
...@@ -17,5 +21,13 @@ public interface CDCBuilder { ...@@ -17,5 +21,13 @@ public interface CDCBuilder {
CDCBuilder create(FlinkCDCConfig config); CDCBuilder create(FlinkCDCConfig config);
DataStreamSource build(StreamExecutionEnvironment env); DataStreamSource<String> build(StreamExecutionEnvironment env);
List<String> getSchemaList();
List<String> getTableList();
Map<String, Map<String, String>> parseMetaDataConfigs();
String getInsertSQL(Table table, String sourceName);
} }
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
/**
* SinkBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface SinkBuilder {
String getHandle();
SinkBuilder create(FlinkCDCConfig config);
DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource);
}
...@@ -13,5 +13,5 @@ public final class FlinkParamConstant { ...@@ -13,5 +13,5 @@ public final class FlinkParamConstant {
public static final String USERNAME = "username"; public static final String USERNAME = "username";
public static final String PASSWORD = "password"; public static final String PASSWORD = "password";
public static final String SPLIT = ";"; public static final String SPLIT = ",";
} }
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment