Commit d2d8a92b authored by wenmo's avatar wenmo

[Feature-275][client] Add Flink client 1.15

parent 1d46cbb4
......@@ -26,6 +26,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
- 支持 Apache Flink 所有的 Connector、UDF、CDC等
- 支持 FlinkSQL 语法增强:兼容 Apache Flink SQL、表值聚合函数、全局变量、CDC多源合并、执行环境、语句合并、共享会话等
- 支持易扩展的 SQL 作业提交方式:ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、SqlServer 等
- 支持 FlinkCDC (Source 合并)整库实时入仓入湖
- 支持实时调试预览 Table 和 ChangeLog 数据及图形展示
- 支持语法逻辑检查、作业执行计划、字段级血缘分析等
- 支持 Flink 元数据、数据源元数据查询及管理
......
......@@ -26,6 +26,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
- 支持 Apache Flink 所有的 Connector、UDF、CDC等
- 支持 FlinkSQL 语法增强:兼容 Apache Flink SQL、表值聚合函数、全局变量、CDC多源合并、执行环境、语句合并、共享会话等
- 支持易扩展的 SQL 作业提交方式:ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、SqlServer 等
- 支持 FlinkCDC (Source 合并)整库实时入仓入湖
- 支持实时调试预览 Table 和 ChangeLog 数据及图形展示
- 支持语法逻辑检查、作业执行计划、字段级血缘分析等
- 支持 Flink 元数据、数据源元数据查询及管理
......
......@@ -26,6 +26,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
- 支持 Apache Flink 所有的 Connector、UDF、CDC等
- 支持 FlinkSQL 语法增强:兼容 Apache Flink SQL、表值聚合函数、全局变量、CDC多源合并、执行环境、语句合并、共享会话等
- 支持易扩展的 SQL 作业提交方式:ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、SqlServer 等
- 支持 FlinkCDC (Source 合并)整库实时入仓入湖
- 支持实时调试预览 Table 和 ChangeLog 数据及图形展示
- 支持语法逻辑检查、作业执行计划、字段级血缘分析等
- 支持 Flink 元数据、数据源元数据查询及管理
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-app-1.15</artifactId>
<properties>
<mainClass>com.dlink.app.MainApp</mainClass>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-app-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<!-- <scope>provided</scope>-->
<version>8.0.21</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.15</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.15</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-base</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>com.dlink.app.MainApp</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.dlink.app;
import java.io.IOException;
import java.util.Map;
import com.dlink.app.db.DBConfig;
import com.dlink.app.flinksql.Submiter;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.utils.FlinkBaseUtil;
/**
* MainApp
*
* @author wenmo
* @since 2021/10/27
**/
public class MainApp {
public static void main(String[] args) throws IOException {
Map<String, String> params = FlinkBaseUtil.getParamsFromArgs(args);
String id = params.get(FlinkParamConstant.ID);
Asserts.checkNullString(id, "请配置入参 id ");
DBConfig dbConfig = DBConfig.build(params);
Submiter.submit(Integer.valueOf(id), dbConfig);
}
}
......@@ -19,6 +19,7 @@
<module>dlink-app-1.14</module>
<module>dlink-app-1.12</module>
<module>dlink-app-1.11</module>
<module>dlink-app-1.15</module>
</modules>
<properties>
......
......@@ -73,7 +73,13 @@
<include>dlink-client-1.14-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-client/dlink-client-1.15/target</directory>
<outputDirectory>extends</outputDirectory>
<includes>
<include>dlink-client-1.15-${project.version}.jar</include>
</includes>
</fileSet>
<!-- 将模块dlink-connectors的jar文件放到打包目录/lib下 -->
<fileSet>
<directory>${project.parent.basedir}/dlink-connectors/dlink-connector-jdbc-1.13/target</directory>
......@@ -246,6 +252,13 @@
<include>dlink-app-1.14-${project.version}-jar-with-dependencies.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-app/dlink-app-1.15/target</directory>
<outputDirectory>jar</outputDirectory>
<includes>
<include>dlink-app-1.15-${project.version}-jar-with-dependencies.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-doc/extends</directory>
<outputDirectory>jar</outputDirectory>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-client-1.15</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.15</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.dlink.cdc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractCDCBuilder {
protected FlinkCDCConfig config;
public AbstractCDCBuilder() {
}
public AbstractCDCBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getSchema();
if (Asserts.isNotNullString(schema)) {
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
}
return schemaList;
}
public List<String> getTableList() {
List<String> tableList = new ArrayList<>();
String table = config.getTable();
if (Asserts.isNullString(table)) {
return tableList;
}
String[] tables = table.split(FlinkParamConstant.SPLIT);
Collections.addAll(tableList, tables);
return tableList;
}
public String getSchemaFieldName() {
return "schema";
}
}
package com.dlink.cdc;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
public AbstractSinkBuilder() {
}
public AbstractSinkBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
return properties;
}
protected SingleOutputStreamOperator<Map> deserialize(DataStreamSource<String> dataStreamSource) {
return dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
}
protected SingleOutputStreamOperator<Map> shunt(
SingleOutputStreamOperator<Map> mapOperator,
Table table,
String schemaFieldName) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
return mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
}
protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() {
@Override
public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) {
case "r":
case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(igenericRowData);
break;
case "d":
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
dgenericRowData.setRowKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(dgenericRowData);
break;
case "u":
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubgenericRowData);
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uagenericRowData);
break;
}
}
});
}
public abstract void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList);
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
}
}
}
return dataStreamSource;
}
protected void buildColumn(List<String> columnNameList, List<LogicalType> columnTypeList, List<Column> columns) {
for (Column column : columns) {
columnNameList.add(column.getName());
columnTypeList.add(getLogicalType(column.getJavaType()));
}
}
public LogicalType getLogicalType(ColumnType columnType) {
switch (columnType) {
case STRING:
return new VarCharType();
case BOOLEAN:
case JAVA_LANG_BOOLEAN:
return new BooleanType();
case BYTE:
case JAVA_LANG_BYTE:
return new TinyIntType();
case SHORT:
case JAVA_LANG_SHORT:
return new SmallIntType();
case LONG:
case JAVA_LANG_LONG:
return new BigIntType();
case FLOAT:
case JAVA_LANG_FLOAT:
return new FloatType();
case DOUBLE:
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale());
case INT:
case INTEGER:
return new IntType();
case DATE:
case LOCALDATE:
return new DateType();
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
default:
return new VarCharType();
}
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value);
} else if (logicalType instanceof DateType) {
return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString());
} else if (logicalType instanceof TimestampType) {
return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value)));
} else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scale);
} else {
return value;
}
}
protected String getSinkSchemaName(Table table) {
String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db");
}
return schemaName;
}
protected String getSinkTableName(Table table) {
String tableName = table.getName();
if (config.getSink().containsKey("table.prefix.schema")) {
if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) {
tableName = table.getSchema() + "_" + tableName;
}
}
if (config.getSink().containsKey("table.prefix")) {
tableName = config.getSink().get("table.prefix") + tableName;
}
if (config.getSink().containsKey("table.suffix")) {
tableName = tableName + config.getSink().get("table.suffix");
}
if (config.getSink().containsKey("table.lower")) {
if (Boolean.valueOf(config.getSink().get("table.lower"))) {
tableName = tableName.toLowerCase();
}
}
if (config.getSink().containsKey("table.upper")) {
if (Boolean.valueOf(config.getSink().get("table.upper"))) {
tableName = tableName.toUpperCase();
}
}
return tableName;
}
protected List<String> getPKList(Table table){
List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){
return pks;
}
for(Column column: table.getColumns()){
if(column.isKeyFlag()){
pks.add(column.getName());
}
}
return pks;
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface CDCBuilder {
String getHandle();
CDCBuilder create(FlinkCDCConfig config);
DataStreamSource<String> build(StreamExecutionEnvironment env);
List<String> getSchemaList();
List<String> getTableList();
Map<String, Map<String, String>> parseMetaDataConfigs();
String getSchemaFieldName();
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.mysql.MysqlCDCBuilder;
import com.dlink.cdc.oracle.OracleCDCBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public class CDCBuilderFactory {
private static CDCBuilder[] cdcBuilders = {
new MysqlCDCBuilder(),
new OracleCDCBuilder()
};
public static CDCBuilder buildCDCBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getType())) {
throw new FlinkClientException("请指定 CDC Source 类型。");
}
for (int i = 0; i < cdcBuilders.length; i++) {
if (config.getType().equals(cdcBuilders[i].getHandle())) {
return cdcBuilders[i].create(config);
}
}
throw new FlinkClientException("未匹配到对应 CDC Source 类型的【" + config.getType() + "】。");
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
/**
* SinkBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface SinkBuilder {
String getHandle();
SinkBuilder create(FlinkCDCConfig config);
DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource);
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.doris.DorisSinkBuilder;
import com.dlink.cdc.kafka.KafkaSinkBuilder;
import com.dlink.cdc.sql.SQLSinkBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;
/**
* SinkBuilderFactory
*
* @author wenmo
* @since 2022/4/12 21:12
**/
public class SinkBuilderFactory {
private static SinkBuilder[] sinkBuilders = {
new KafkaSinkBuilder(),
new DorisSinkBuilder(),
new SQLSinkBuilder()
};
public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getSink().get("connector"))) {
throw new FlinkClientException("请指定 Sink connector。");
}
for (int i = 0; i < sinkBuilders.length; i++) {
if (config.getSink().get("connector").equals(sinkBuilders[i].getHandle())) {
return sinkBuilders[i].create(config);
}
}
return new SQLSinkBuilder().create(config);
}
}
package com.dlink.cdc.doris;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/**
* DorisSinkBuilder
*
* @author wenmo
* @since 2022/4/20 19:20
**/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L;
public DorisSinkBuilder() {
}
public DorisSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new DorisSinkBuilder(config);
}
@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
DorisExecutionOptions.Builder dorisExecutionOptionsBuilder = DorisExecutionOptions.builder();
Map<String, String> sink = config.getSink();
if (sink.containsKey("sink.batch.size")) {
dorisExecutionOptionsBuilder.setBatchSize(Integer.valueOf(sink.get("sink.batch.size")));
}
if (sink.containsKey("sink.batch.interval")) {
dorisExecutionOptionsBuilder.setBatchIntervalMs(Long.valueOf(sink.get("sink.batch.interval")));
}
if (sink.containsKey("sink.max-retries")) {
dorisExecutionOptionsBuilder.setMaxRetries(Integer.valueOf(sink.get("sink.max-retries")));
}
if (sink.containsKey("sink.enable-delete")) {
dorisExecutionOptionsBuilder.setEnableDelete(Boolean.valueOf(sink.get("sink.enable-delete")));
}
dorisExecutionOptionsBuilder.setStreamLoadProp(getProperties());
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
rowDataDataStream.addSink(
DorisSink.sink(
columnNames,
columnTypes,
DorisReadOptions.builder().build(),
dorisExecutionOptionsBuilder.build(),
DorisOptions.builder()
.setFenodes(config.getSink().get("fenodes"))
.setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table))
.setUsername(config.getSink().get("username"))
.setPassword(config.getSink().get("password")).build()
));
}
}
package com.dlink.cdc.kafka;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder {
private final static String KEY_WORD = "datastream-kafka";
public KafkaSinkBuilder() {
}
public KafkaSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new KafkaSinkBuilder(config);
}
@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.sinkTo(KafkaSink.<String>builder()
.setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(config.getSink().get("topic"))
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build());
} else {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
@Override
public String map(Map value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(value);
}
});
stringOperator.sinkTo(KafkaSink.<String>builder()
.setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(getSinkTableName(table))
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build());
}
}
}
}
return dataStreamSource;
}
}
package com.dlink.cdc.mysql;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() {
}
public MysqlCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new MysqlCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String database = config.getDatabase();
String serverId = config.getSource().get("server-id");
String serverTimeZone = config.getSource().get("server-time-zone");
String fetchSize = config.getSource().get("scan.snapshot.fetch.size");
String connectTimeout = config.getSource().get("connect.timeout");
String connectMaxRetries = config.getSource().get("connect.max-retries");
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
} else {
sourceBuilder.databaseList(new String[0]);
}
List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
case "initial":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
if (Asserts.isNotNullString(serverId)) {
sourceBuilder.serverId(serverId);
}
if (Asserts.isNotNullString(serverTimeZone)) {
sourceBuilder.serverTimeZone(serverTimeZone);
}
if (Asserts.isNotNullString(fetchSize)) {
sourceBuilder.fetchSize(Integer.valueOf(fetchSize));
}
if (Asserts.isNotNullString(connectTimeout)) {
sourceBuilder.connectTimeout(Duration.ofMillis(Long.valueOf(connectTimeout)));
}
if (Asserts.isNotNullString(connectMaxRetries)) {
sourceBuilder.connectMaxRetries(Integer.valueOf(connectMaxRetries));
}
if (Asserts.isNotNullString(connectionPoolSize)) {
sourceBuilder.connectionPoolSize(Integer.valueOf(connectionPoolSize));
}
if (Asserts.isNotNullString(heartbeatInterval)) {
sourceBuilder.heartbeatInterval(Duration.ofMillis(Long.valueOf(heartbeatInterval)));
}
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
}
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase();
if (Asserts.isNotNullString(schema)) {
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
}
return schemaList;
}
public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigMap = new HashMap<>();
List<String> schemaList = getSchemaList();
for (String schema : schemaList) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:mysql://");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append("/");
sb.append(schema);
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
allConfigMap.put(schema, configMap);
}
return allConfigMap;
}
@Override
public String getSchemaFieldName() {
return "db";
}
}
package com.dlink.cdc.oracle;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/**
* MysqlCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:29
**/
public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "oracle-cdc";
private final static String METADATA_TYPE = "Oracle";
public OracleCDCBuilder() {
}
public OracleCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new OracleCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}
OracleSource.Builder<String> sourceBuilder = OracleSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword())
.database(config.getDatabase());
String schema = config.getSchema();
if (Asserts.isNotNullString(schema)) {
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
sourceBuilder.schemaList(schemas);
} else {
sourceBuilder.schemaList(new String[0]);
}
List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
case "initial":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
return env.addSource(sourceBuilder.build(), "Oracle CDC Source");
}
public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigList = new HashMap<>();
List<String> schemaList = getSchemaList();
for (String schema : schemaList) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:oracle:thin:@");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append(":");
sb.append(config.getDatabase());
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
allConfigList.put(schema, configMap);
}
return allConfigList;
}
}
package com.dlink.cdc.sql;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil;
/**
* SQLSinkBuilder
*
* @author wenmo
* @since 2022/4/25 23:02
*/
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public void addSink(StreamExecutionEnvironment env, DataStream<RowData> rowDataDataStream, Table table, List<String> columnNameList, List<LogicalType> columnTypeList) {
}
protected DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes));
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
}
}, rowTypeInfo);
}
public void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}
}
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new SQLSinkBuilder(config);
}
@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
}
return dataStreamSource;
}
public String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
protected String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) {
return null;
}
if (logicalType instanceof DateType) {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} else if (logicalType instanceof TimestampType) {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
} else {
return value;
}
}
protected String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
}
package com.dlink.executor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.DataStreamQueryOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.typeutils.FieldInfoUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.dlink.assertion.Asserts;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* CustomTableEnvironmentImpl
*
* @author wenmo
* @since 2022/05/08
**/
public class CustomTableEnvironmentImpl extends AbstractStreamTableEnvironmentImpl
implements CustomTableEnvironment {
public CustomTableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
FunctionCatalog functionCatalog,
TableConfig tableConfig,
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
boolean isStreamingMode,
ClassLoader userClassLoader) {
super(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
isStreamingMode,
userClassLoader,
executionEnvironment);
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
}
public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
return create(executionEnvironment, EnvironmentSettings.newInstance().inBatchMode().build());
}
public static CustomTableEnvironmentImpl create(
StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
// temporary solution until FLINK-15635 is fixed
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final Executor executor = lookupExecutor(classLoader, executionEnvironment);
final TableConfig tableConfig = TableConfig.getDefault();
tableConfig.setRootConfiguration(executor.getConfiguration());
tableConfig.addConfiguration(settings.getConfiguration());
final ModuleManager moduleManager = new ModuleManager();
final CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.executionConfig(executionEnvironment.getConfig())
.build();
final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
final Planner planner =
PlannerFactoryUtil.createPlanner(
executor, tableConfig, moduleManager, catalogManager, functionCatalog);
return new CustomTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
settings.isStreamingMode(),
classLoader);
}
public ObjectNode getStreamGraph(String statement) {
List<Operation> operations = super.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if (operations.get(i) instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
for (Transformation<?> transformation : trans) {
executionEnvironment.addOperator(transformation);
}
StreamGraph streamGraph = executionEnvironment.getStreamGraph();
if (tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode = mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
} finally {
return objectNode;
}
}
}
@Override
public JobPlanInfo getJobPlanInfo(List<String> statements) {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for (String statement : statements) {
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
for (Transformation<?> transformation : trans) {
executionEnvironment.addOperator(transformation);
}
StreamGraph streamGraph = executionEnvironment.getStreamGraph();
if (tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
return getStreamGraphFromInserts(statements).getJobGraph();
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = getParser().parse(statement);
record.setParseTrue(true);
if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! explainSql() only accepts a single SQL query.");
}
List<Operation> operationlist = new ArrayList<>(operations);
for (int i = 0; i < operationlist.size(); i++) {
Operation operation = operationlist.get(i);
if (operation instanceof ModifyOperation) {
record.setType("Modify DML");
} else if (operation instanceof ExplainOperation) {
record.setType("Explain DML");
} else if (operation instanceof QueryOperation) {
record.setType("Query DML");
} else {
record.setExplain(operation.asSummaryString());
operationlist.remove(i);
record.setType("DDL");
i = i - 1;
}
}
record.setExplainTrue(true);
if (operationlist.size() == 0) {
//record.setExplain("DDL语句不进行解释。");
return record;
}
record.setExplain(planner.explain(operationlist, extraDetails));
return record;
}
public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
List<Operation> operations = getParser().parse(statement);
for (Operation operation : operations) {
if (operation instanceof SetOperation) {
callSet((SetOperation) operation, environment, setMap);
return true;
} else if (operation instanceof ResetOperation) {
callReset((ResetOperation) operation, environment, setMap);
return true;
}
}
return false;
}
private void callSet(SetOperation setOperation, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
if (Asserts.isNullString(key) || Asserts.isNullString(value)) {
return;
}
Map<String, String> confMap = new HashMap<>();
confMap.put(key, value);
setMap.put(key, value);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration, null);
getConfig().addConfiguration(configuration);
}
}
private void callReset(ResetOperation resetOperation, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
if (Asserts.isNullString(key)) {
return;
}
Map<String, String> confMap = new HashMap<>();
confMap.put(key, null);
setMap.remove(key);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration, null);
getConfig().addConfiguration(configuration);
} else {
setMap.clear();
}
}
public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields) {
return createTable(asQueryOperation(dataStream, Optional.of(Arrays.asList(fields))));
}
public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
List<Expression> expressions = ExpressionParser.INSTANCE.parseExpressionList(fields);
return fromDataStream(dataStream, expressions.toArray(new Expression[0]));
}
@Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream, String fields) {
createTemporaryView(path, fromDataStream(dataStream, fields));
}
@Override
public <T> void createTemporaryView(
String path, DataStream<T> dataStream, Expression... fields) {
createTemporaryView(path, fromDataStream(dataStream, fields));
}
protected <T> DataStreamQueryOperation<T> asQueryOperation(DataStream<T> dataStream, Optional<List<Expression>> fields) {
TypeInformation<T> streamType = dataStream.getType();
FieldInfoUtils.TypeInfoSchema typeInfoSchema = (FieldInfoUtils.TypeInfoSchema) fields.map((f) -> {
FieldInfoUtils.TypeInfoSchema fieldsInfo = FieldInfoUtils.getFieldsInfo(streamType, (Expression[]) f.toArray(new Expression[0]));
this.validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
return fieldsInfo;
}).orElseGet(() -> {
return FieldInfoUtils.getFieldsInfo(streamType);
});
return new DataStreamQueryOperation(dataStream, typeInfoSchema.getIndices(), typeInfoSchema.toResolvedSchema());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.ResultProvider;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.print.PrintStyle;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.table.utils.print.TableauStyle;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** Implementation for {@link TableResult}. */
@Internal
public class CustomTableResultImpl implements TableResultInternal {
public static final TableResult TABLE_RESULT_OK =
CustomTableResultImpl.builder()
.resultKind(ResultKind.SUCCESS)
.schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
.data(Collections.singletonList(Row.of("OK")))
.build();
private final JobClient jobClient;
private final ResolvedSchema resolvedSchema;
private final ResultKind resultKind;
private final ResultProvider resultProvider;
private final PrintStyle printStyle;
private CustomTableResultImpl(
@Nullable JobClient jobClient,
ResolvedSchema resolvedSchema,
ResultKind resultKind,
ResultProvider resultProvider,
PrintStyle printStyle) {
this.jobClient = jobClient;
this.resolvedSchema =
Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
Preconditions.checkNotNull(resultProvider, "result provider should not be null");
this.resultProvider = resultProvider;
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields, List<Row> rows) {
Builder builder = builder().resultKind(ResultKind.SUCCESS);
if (fields.size() > 0) {
List<String> columnNames = new ArrayList<>();
List<DataType> columnTypes = new ArrayList<>();
for (int i = 0; i < fields.size(); i++) {
columnNames.add(fields.get(i).getName());
columnTypes.add(fields.get(i).getType());
}
builder.schema(ResolvedSchema.physical(columnNames, columnTypes)).data(rows);
}
return builder.build();
}
@Override
public Optional<JobClient> getJobClient() {
return Optional.ofNullable(jobClient);
}
@Override
public void await() throws InterruptedException, ExecutionException {
try {
awaitInternal(-1, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// do nothing
}
}
@Override
public void await(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
awaitInternal(timeout, unit);
}
private void awaitInternal(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (jobClient == null) {
return;
}
ExecutorService executor =
Executors.newFixedThreadPool(1, r -> new Thread(r, "TableResult-await-thread"));
try {
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
while (!resultProvider.isFirstRowReady()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new TableException("Thread is interrupted");
}
}
},
executor);
if (timeout >= 0) {
future.get(timeout, unit);
} else {
future.get();
}
} finally {
executor.shutdown();
}
}
@Override
public ResolvedSchema getResolvedSchema() {
return resolvedSchema;
}
@Override
public ResultKind getResultKind() {
return resultKind;
}
@Override
public CloseableIterator<Row> collect() {
return resultProvider.toExternalIterator();
}
@Override
public CloseableIterator<RowData> collectInternal() {
return resultProvider.toInternalIterator();
}
@Override
public RowDataToStringConverter getRowDataToStringConverter() {
return resultProvider.getRowDataStringConverter();
}
@Override
public void print() {
Iterator<RowData> it = resultProvider.toInternalIterator();
printStyle.print(it, new PrintWriter(System.out));
}
public static Builder builder() {
return new Builder();
}
/** Builder for creating a {@link CustomTableResultImpl}. */
public static class Builder {
private JobClient jobClient = null;
private ResolvedSchema resolvedSchema = null;
private ResultKind resultKind = null;
private ResultProvider resultProvider = null;
private PrintStyle printStyle = null;
private Builder() {}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
/**
* Specifies schema of the execution result.
*
* @param resolvedSchema a {@link ResolvedSchema} for the execution result.
*/
public Builder schema(ResolvedSchema resolvedSchema) {
Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resolvedSchema = resolvedSchema;
return this;
}
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
public Builder resultProvider(ResultProvider resultProvider) {
Preconditions.checkNotNull(resultProvider, "resultProvider should not be null");
this.resultProvider = resultProvider;
return this;
}
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.resultProvider = new StaticResultProvider(rowList);
return this;
}
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
/** Returns a {@link TableResult} instance. */
public TableResultInternal build() {
if (printStyle == null) {
printStyle = PrintStyle.rawContent(resultProvider.getRowDataStringConverter());
}
return new CustomTableResultImpl(
jobClient, resolvedSchema, resultKind, resultProvider, printStyle);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.ResultProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.utils.print.PrintStyle;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.List;
import java.util.function.Function;
/** Create result provider from a static set of data using external types. */
@Internal
class StaticResultProvider implements ResultProvider {
/**
* This converter supports only String, long, int and boolean fields. Moreover, this converter
* works only with {@link GenericRowData}.
*/
static final RowDataToStringConverter SIMPLE_ROW_DATA_TO_STRING_CONVERTER =
rowData -> {
GenericRowData genericRowData = (GenericRowData) rowData;
String[] results = new String[rowData.getArity()];
for (int i = 0; i < results.length; i++) {
Object value = genericRowData.getField(i);
if (Boolean.TRUE.equals(value)) {
results[i] = "TRUE";
} else if (Boolean.FALSE.equals(value)) {
results[i] = "FALSE";
} else {
results[i] = value == null ? PrintStyle.NULL_VALUE : "" + value;
}
}
return results;
};
private final List<Row> rows;
private final Function<Row, RowData> externalToInternalConverter;
public StaticResultProvider(List<Row> rows) {
this(rows, StaticResultProvider::rowToInternalRow);
}
public StaticResultProvider(
List<Row> rows, Function<Row, RowData> externalToInternalConverter) {
this.rows = rows;
this.externalToInternalConverter = externalToInternalConverter;
}
@Override
public StaticResultProvider setJobClient(JobClient jobClient) {
return this;
}
@Override
public CloseableIterator<RowData> toInternalIterator() {
return CloseableIterator.adapterForIterator(
this.rows.stream().map(this.externalToInternalConverter).iterator());
}
@Override
public CloseableIterator<Row> toExternalIterator() {
return CloseableIterator.adapterForIterator(this.rows.iterator());
}
@Override
public RowDataToStringConverter getRowDataStringConverter() {
return SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
}
@Override
public boolean isFirstRowReady() {
return true;
}
/** This function supports only String, long, int and boolean fields. */
@VisibleForTesting
static RowData rowToInternalRow(Row row) {
Object[] values = new Object[row.getArity()];
for (int i = 0; i < row.getArity(); i++) {
Object value = row.getField(i);
if (value == null) {
values[i] = null;
} else if (value instanceof String) {
values[i] = StringData.fromString((String) value);
} else if (value instanceof Boolean
|| value instanceof Long
|| value instanceof Integer) {
values[i] = value;
} else {
throw new TableException("Cannot convert row type");
}
}
return GenericRowData.of(values);
}
}
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2022/05/08
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
package com.dlink.utils;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
/**
* FlinkUtil
*
* @author wenmo
* @since 2022/05/08
*/
public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table) {
Optional<ContextResolvedTable> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return tableOpt.get().getResolvedSchema().getColumnNames();
} else {
return new ArrayList<String>();
}
}
public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames();
}
}
......@@ -19,5 +19,6 @@
<module>dlink-client-1.14</module>
<module>dlink-client-hadoop</module>
<module>dlink-client-base</module>
<module>dlink-client-1.15</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-flink-1.15</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<flinkcdc.version>2.2.1</flinkcdc.version>
<commons.version>1.3.1</commons.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>${commons.version}</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.14_2.12</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -16,6 +16,7 @@
<module>dlink-flink-1.12</module>
<module>dlink-flink-1.13</module>
<module>dlink-flink-1.14</module>
<module>dlink-flink-1.15</module>
</modules>
<properties>
......
......@@ -11,7 +11,7 @@
| | | 支持 INSERT 语句集 | 0.4.0 |
| | | 新增 SQL 片段语法(全局变量) | 0.4.0 |
| | | 新增 AGGTABLE 表值聚合语法及 UDATF 支持 | 0.4.0 |
| | | 新增 CDCSOURCE 多源合并语法支持 | 0.6.0 |
| | | 新增 CDCSOURCE 整库实时入仓入湖语法支持 | 0.6.3 |
| | | 新增 FlinkSQLEnv 执行环境复用 | 0.5.0 |
| | | 新增 Flink Catalog 交互查询 | 0.4.0 |
| | | 新增 执行环境的共享与私有会话机制 | 0.4.0 |
......@@ -47,6 +47,7 @@
| | | 支持 1.12.0+ | 0.4.0 |
| | | 支持 1.13.0+ | 0.4.0 |
| | | 支持 1.14.0+ | 0.4.0 |
| | | 支持 1.15.0+ | 0.6.2 |
| | SQL | 新增 外部数据源的 SQL 校验 | 0.5.0 |
| | | 新增 外部数据源的 SQL 执行与预览 | 0.5.0 |
| | BI | 新增 折线图的渲染 | 0.5.0 |
......
......@@ -210,6 +210,11 @@
<artifactId>dlink-client-1.11</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.15</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.11</artifactId>
......@@ -360,6 +365,11 @@
<artifactId>dlink-flink-1.14</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.15</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment