Commit 47355f4b authored by wenmo's avatar wenmo

优化多版本支持

parent fbe9812d
...@@ -115,6 +115,12 @@ ...@@ -115,6 +115,12 @@
<artifactId>dlink-metadata-mysql</artifactId> <artifactId>dlink-metadata-mysql</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency>--> </dependency>-->
<!--<dependency>
<groupId>com.alibaba.ververica</groupId>
&lt;!&ndash; add the dependency matching your database &ndash;&gt;
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>-->
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
...@@ -52,13 +52,34 @@ ...@@ -52,13 +52,34 @@
<include>dlink-client-1.12-${project.version}.jar</include> <include>dlink-client-1.12-${project.version}.jar</include>
</includes> </includes>
</fileSet> </fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-client/dlink-client-1.11/target</directory>
<outputDirectory>plugins</outputDirectory>
<includes>
<include>dlink-client-1.11-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-client/dlink-client-1.13/target</directory>
<outputDirectory>plugins</outputDirectory>
<includes>
<include>dlink-client-1.13-${project.version}.jar</include>
</includes>
</fileSet>
<!-- 将模块dlink-connectors的jar文件放到打包目录/lib下 --> <!-- 将模块dlink-connectors的jar文件放到打包目录/lib下 -->
<fileSet> <fileSet>
<directory>${project.parent.basedir}/dlink-connectors/dlink-connector-jdbc/target</directory> <directory>${project.parent.basedir}/dlink-connectors/dlink-connector-jdbc-1.12/target</directory>
<outputDirectory>lib</outputDirectory> <outputDirectory>lib</outputDirectory>
<includes> <includes>
<include>dlink-connector-jdbc-${project.version}.jar</include> <include>dlink-connector-jdbc-1.12-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-connectors/dlink-connector-jdbc-1.13/target</directory>
<outputDirectory>plugins</outputDirectory>
<includes>
<include>dlink-connector-jdbc-1.13-${project.version}.jar</include>
</includes> </includes>
</fileSet> </fileSet>
......
...@@ -215,17 +215,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -215,17 +215,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
record.setType("Query DML"); record.setType("Query DML");
} else { } else {
record.setExplain(operation.asSummaryString()); record.setExplain(operation.asSummaryString());
record.setExplainTrue(true);
record.setType("DDL");
operationlist.remove(i); operationlist.remove(i);
record.setType("DDL");
i=i-1; i=i-1;
} }
} }
record.setExplainTrue(true);
if(operationlist.size()==0){ if(operationlist.size()==0){
return record; return record;
} }
record.setExplain(planner.explain(operationlist, extraDetails)); record.setExplain(planner.explain(operationlist, extraDetails));
record.setExplainTrue(true);
return record; return record;
} }
......
package com.dlink.utils;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* FlinkUtil
*
* @author wenmo
* @since 2021/9/15 22:46
*/
public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table){
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return Arrays.asList(tableOpt.get().getResolvedSchema().getFieldNames());
}else{
return new ArrayList<String>();
}
}
}
...@@ -216,17 +216,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -216,17 +216,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
record.setType("Query DML"); record.setType("Query DML");
} else { } else {
record.setExplain(operation.asSummaryString()); record.setExplain(operation.asSummaryString());
record.setExplainTrue(true);
record.setType("DDL");
operationlist.remove(i); operationlist.remove(i);
record.setType("DDL");
i=i-1; i=i-1;
} }
} }
record.setExplainTrue(true);
if(operationlist.size()==0){ if(operationlist.size()==0){
return record; return record;
} }
record.setExplain(planner.explain(operationlist, extraDetails)); record.setExplain(planner.explain(operationlist, extraDetails));
record.setExplainTrue(true);
return record; return record;
} }
......
package com.dlink.utils;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* FlinkUtil
*
* @author wenmo
* @since 2021/9/15 22:46
*/
public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table){
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return Arrays.asList(tableOpt.get().getResolvedSchema().getFieldNames());
}else{
return new ArrayList<String>();
}
}
}
...@@ -209,17 +209,18 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -209,17 +209,18 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} else if (operation instanceof QueryOperation) { } else if (operation instanceof QueryOperation) {
record.setType("Query DML"); record.setType("Query DML");
} else { } else {
record.setExplain(operation.asSummaryString());
operationlist.remove(i); operationlist.remove(i);
record.setType("DDL"); record.setType("DDL");
i=i-1; i=i-1;
} }
} }
record.setExplainTrue(true);
if(operationlist.size()==0){ if(operationlist.size()==0){
//record.setExplain("DDL语句不进行解释。"); //record.setExplain("DDL语句不进行解释。");
return record; return record;
} }
record.setExplain(planner.explain(operationlist, extraDetails)); record.setExplain(planner.explain(operationlist, extraDetails));
record.setExplainTrue(true);
return record; return record;
} }
......
package com.dlink.utils;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* FlinkUtil
*
* @author wenmo
* @since 2021/9/15 22:46
*/
public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table){
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return tableOpt.get().getResolvedSchema().getColumnNames();
}else{
return new ArrayList<String>();
}
}
}
...@@ -9,12 +9,12 @@ ...@@ -9,12 +9,12 @@
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>dlink-connector-jdbc</artifactId> <artifactId>dlink-connector-jdbc-1.12</artifactId>
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.4</flink.version> <flink.version>1.12.5</flink.version>
<scala.binary.version>2.11</scala.binary.version> <scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
......
...@@ -67,11 +67,6 @@ public class ClickHouseDialect extends AbstractDialect { ...@@ -67,11 +67,6 @@ public class ClickHouseDialect extends AbstractDialect {
@Override @Override
public Optional<String> getUpsertStatement( public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) { String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String columns = Arrays.stream(fieldNames).collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames)
.map(f -> quoteIdentifier(f))
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName, fieldNames)); return Optional.of(getInsertIntoStatement(tableName, fieldNames));
} }
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-connectors</artifactId>
<groupId>com.dlink</groupId>
<version>0.3.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<postgres.version>42.2.10</postgres.version>
<otj-pg-embedded.version>0.13.3</otj-pg-embedded.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Postgres dependencies -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgres.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- Postgres test dependencies -->
<dependency>
<groupId>com.opentable.components</groupId>
<artifactId>otj-pg-embedded</artifactId>
<version>${otj-pg-embedded.version}</version>
<scope>test</scope>
</dependency>
<!-- MySQL test dependencies -->
<dependency>
<groupId>ch.vorburger.mariaDB4j</groupId>
<artifactId>mariaDB4j</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<!-- ch.vorburger.mariaDB4j:mariaDB4j has a dependency of mariadb-java-client:2.3.0,
but we want to bump mariadb-java-client to 2.5.4 which fix a few notable bugs,
see: https://mariadb.com/kb/en/mariadb-connector-j-release-notes/
and the lower version may cause the test stability issue FLINK-18082.-->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.5.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>test</scope>
</dependency>
<!-- Derby test dependencies -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.14.2.0</version>
<scope>test</scope>
</dependency>
<!-- Oracle test dependencies -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!--<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
<finalName>${project.artifactId}</finalName>
</build>-->
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* JDBC dialect for ClickHouse.
*
* @author wenmo
* @since 2021/9/19 20:32
*/
public class ClickHouseDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:clickhouse:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new ClickHouseRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.of(getInsertIntoStatement(tableName, fieldNames));
}
@Override
public String dialectName() {
return "ClickHouse";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* JdbcDialects
*
* @author wenmo
* @since 2021/9/19 20:27
*/
public final class JdbcDialects {
private static final List<JdbcDialect> DIALECTS =
Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect()
, new OracleDialect(), new ClickHouseDialect());
/** Fetch the JdbcDialect class corresponding to a given database url. */
public static Optional<JdbcDialect> get(String url) {
for (JdbcDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) {
return Optional.of(dialect);
}
}
return Optional.empty();
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.stream.Collectors;
/**
* JDBC dialect for Oracle.
*
* @author wenmo
* @since 2021/9/19 20:30
*/
public class OracleDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
// Define MAX/MIN precision of TIMESTAMP type according to Oracle docs:
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
// jdbc:oracle:thin:@127.0.0.1:1521:ORCL
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:oracle:thin:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new OracleRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return "ROWNUM < " + limit;
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("oracle.jdbc.driver.OracleDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
/*get update field*/
ArrayList<String> updateFieldNamesList = new ArrayList<String>(fieldNames.length);
Collections.addAll(updateFieldNamesList, fieldNames);
ArrayList<String> uniqueKeyFieldsList = new ArrayList<String>(uniqueKeyFields.length);
Collections.addAll(uniqueKeyFieldsList, uniqueKeyFields);
updateFieldNamesList.removeAll(uniqueKeyFieldsList);
String updateClause =
Arrays.stream(updateFieldNamesList.toArray(new String[0]))
.map(f -> "a." + quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> "a." + quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String sql =
"MERGE INTO "
+ tableName
+ " a USING ( SELECT 1 FROM dual ) b ON ( "
+ onClause
+ " )"
+ " WHEN MATCHED THEN"
+ " UPDATE SET "
+ updateClause
+ " WHEN NOT MATCHED THEN "
+ getInsertStatement(fieldNames);
return Optional.of(sql);
}
private String getInsertStatement(String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "INSERT " + "(" + columns + ")" + " VALUES (" + placeholders + ")";
}
@Override
public String dialectName() {
return "Oracle";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* ClickHouse.
*
* @author wenmo
* @since 2021/9/19 20:28
*/
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "ClickHouse";
}
public ClickHouseRowConverter(RowType rowType) {
super(rowType);
}
}
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
*
* @author wenmo
* @since 2021/9/19 20:28
*/
public class OracleRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "Oracle";
}
public OracleRowConverter(RowType rowType) {
super(rowType);
}
}
...@@ -10,7 +10,8 @@ ...@@ -10,7 +10,8 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging> <packaging>pom</packaging>
<modules> <modules>
<module>dlink-connector-jdbc</module> <module>dlink-connector-jdbc-1.12</module>
<module>dlink-connector-jdbc-1.13</module>
</modules> </modules>
<artifactId>dlink-connectors</artifactId> <artifactId>dlink-connectors</artifactId>
</project> </project>
\ No newline at end of file
...@@ -42,12 +42,12 @@ ...@@ -42,12 +42,12 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId> <artifactId>dlink-client-1.13</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId> <artifactId>dlink-connector-jdbc-1.13</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
...@@ -3,27 +3,20 @@ package com.dlink.explainer; ...@@ -3,27 +3,20 @@ package com.dlink.explainer;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.explainer.ca.ColumnCAGenerator; import com.dlink.explainer.ca.*;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.TableCA;
import com.dlink.explainer.ca.TableCAGenerator;
import com.dlink.explainer.ca.TableCAResult;
import com.dlink.explainer.trans.Trans; import com.dlink.explainer.trans.Trans;
import com.dlink.explainer.trans.TransGenerator; import com.dlink.explainer.trans.TransGenerator;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.dlink.utils.FlinkUtil;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Optional;
/** /**
* Explainer * Explainer
...@@ -44,14 +37,14 @@ public class Explainer { ...@@ -44,14 +37,14 @@ public class Explainer {
return new Explainer(executor); return new Explainer(executor);
} }
public List<SqlExplainResult> explainSqlResult(String statement, ExplainDetail... extraDetails) { public List<SqlExplainResult> explainSqlResult(String statement) {
String[] sqls = SqlUtil.getStatements(statement); String[] sqls = SqlUtil.getStatements(statement);
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>(); List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
for (int i = 0; i < sqls.length; i++) { for (int i = 0; i < sqls.length; i++) {
SqlExplainResult record = new SqlExplainResult(); SqlExplainResult record = new SqlExplainResult();
try { try {
if (!FlinkInterceptor.build(executor.getCustomTableEnvironmentImpl(), sqls[i])) { if (!FlinkInterceptor.build(executor.getCustomTableEnvironmentImpl(), sqls[i])) {
record = executor.explainSqlRecord(sqls[i], extraDetails); record = executor.explainSqlRecord(sqls[i]);
if (Asserts.isEquals(FlinkSQLConstant.DDL,record.getType())) { if (Asserts.isEquals(FlinkSQLConstant.DDL,record.getType())) {
executor.executeSql(sqls[i]); executor.executeSql(sqls[i]);
} }
...@@ -110,13 +103,7 @@ public class Explainer { ...@@ -110,13 +103,7 @@ public class Explainer {
for (int i = 0; i < results.size(); i++) { for (int i = 0; i < results.size(); i++) {
TableCA sinkTableCA = (TableCA) results.get(i).getSinkTableCA(); TableCA sinkTableCA = (TableCA) results.get(i).getSinkTableCA();
if (Asserts.isNotNull(sinkTableCA)) { if (Asserts.isNotNull(sinkTableCA)) {
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable( sinkTableCA.setFields(FlinkUtil.getFieldNamesFromCatalogManager(catalogManager,sinkTableCA.getCatalog(), sinkTableCA.getDatabase(), sinkTableCA.getTable()));
ObjectIdentifier.of(sinkTableCA.getCatalog(), sinkTableCA.getDatabase(), sinkTableCA.getTable())
);
if (tableOpt.isPresent()) {
String[] fieldNames = tableOpt.get().getResolvedSchema().getFieldNames();
sinkTableCA.setFields(Arrays.asList(fieldNames));
}
} }
} }
} }
......
...@@ -60,8 +60,8 @@ public class FlinkSqlPlus { ...@@ -60,8 +60,8 @@ public class FlinkSqlPlus {
} }
} }
public List<SqlExplainResult> explainSqlRecord(String statement, ExplainDetail... extraDetails) { public List<SqlExplainResult> explainSqlRecord(String statement) {
return explainer.explainSqlResult(statement,extraDetails); return explainer.explainSqlResult(statement);
} }
public List<TableCAResult> explainSqlTableColumnCA(String statement) { public List<TableCAResult> explainSqlTableColumnCA(String statement) {
......
...@@ -311,6 +311,9 @@ export default (): React.ReactNode => { ...@@ -311,6 +311,9 @@ export default (): React.ReactNode => {
<li> <li>
<Link>新增了SQL编辑器自动补全文档的功能</Link> <Link>新增了SQL编辑器自动补全文档的功能</Link>
</li> </li>
<li>
<Link>优化了 Flink 多版本间的切换,下沉 Flink 获取表字段的逻辑</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
...@@ -159,7 +159,22 @@ ...@@ -159,7 +159,22 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId> <artifactId>dlink-client-1.13</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.11</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.12</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment