Commit cab8d6a4 authored by wenmo's avatar wenmo

connector format

parent 0d5e7e57
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-connectors</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-connector-jdbc-1.11</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.6</flink.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<postgres.version>42.2.10</postgres.version>
<otj-pg-embedded.version>0.13.3</otj-pg-embedded.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Postgres dependencies -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgres.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- Postgres test dependencies -->
<dependency>
<groupId>com.opentable.components</groupId>
<artifactId>otj-pg-embedded</artifactId>
<version>${otj-pg-embedded.version}</version>
<scope>test</scope>
</dependency>
<!-- MySQL test dependencies -->
<dependency>
<groupId>ch.vorburger.mariaDB4j</groupId>
<artifactId>mariaDB4j</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<!-- ch.vorburger.mariaDB4j:mariaDB4j has a dependency of mariadb-java-client:2.3.0,
but we want to bump mariadb-java-client to 2.5.4 which fix a few notable bugs,
see: https://mariadb.com/kb/en/mariadb-connector-j-release-notes/
and the lower version may cause the test stability issue FLINK-18082.-->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.5.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>test</scope>
</dependency>
<!-- Derby test dependencies -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.14.2.0</version>
<scope>test</scope>
</dependency>
<!-- Oracle test dependencies -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<scope>test</scope>
</dependency>
<!-- SQLServer test dependencies -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>8.2.2.jre8</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* JDBC dialect for ClickHouse.
*
* @author wenmo
* @since 2021/6/7 21:48
*/
public class ClickHouseDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:clickhouse:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new ClickHouseRowConverter(rowType);
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.of(getInsertIntoStatement(tableName, fieldNames));
}
@Override
public String dialectName() {
return "ClickHouse";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* JdbcDialects
*
* @author wenmo
* @since 2021/6/7 19:29
*/
public final class JdbcDialects {
private static final List<JdbcDialect> DIALECTS =
Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect()
, new OracleDialect(), new ClickHouseDialect(), new SQLServerDialect());
/**
* Fetch the JdbcDialect class corresponding to a given database url.
*/
public static Optional<JdbcDialect> get(String url) {
for (JdbcDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) {
return Optional.of(dialect);
}
}
return Optional.empty();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.stream.Collectors;
/**
* JDBC dialect for Oracle.
*
* @author wenmo
* @since 2021/6/7 21:42
*/
public class OracleDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
// Define MAX/MIN precision of TIMESTAMP type according to Oracle docs:
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
// jdbc:oracle:thin:@127.0.0.1:1521:ORCL
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:oracle:thin:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new OracleRowConverter(rowType);
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("oracle.jdbc.driver.OracleDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
/*get update field*/
ArrayList<String> updateFieldNamesList = new ArrayList<String>(fieldNames.length);
Collections.addAll(updateFieldNamesList, fieldNames);
ArrayList<String> uniqueKeyFieldsList = new ArrayList<String>(uniqueKeyFields.length);
Collections.addAll(uniqueKeyFieldsList, uniqueKeyFields);
updateFieldNamesList.removeAll(uniqueKeyFieldsList);
String updateClause =
Arrays.stream(updateFieldNamesList.toArray(new String[0]))
.map(f -> "a." + quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> "a." + quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String sql =
"MERGE INTO "
+ tableName
+ " a USING ( SELECT 1 FROM dual ) b ON ( "
+ onClause
+ " )"
+ " WHEN MATCHED THEN"
+ " UPDATE SET "
+ updateClause
+ " WHEN NOT MATCHED THEN "
+ getInsertStatement(fieldNames);
return Optional.of(sql);
}
private String getInsertStatement(String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "INSERT " + "(" + columns + ")" + " VALUES (" + placeholders + ")";
}
@Override
public String dialectName() {
return "Oracle";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.stream.Collectors;
/**
* SQLServerDialect
*
* @author wenmo
* @since 2021/12/9
**/
public class SQLServerDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
// jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:sqlserver:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new SQLServerRowConverter(rowType);
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
/*IF EXISTS(SELECT * FROM source WHERE tid = 3)
BEGIN
UPDATE source SET tname = 'd' WHERE tid = 3
END
ELSE
BEGIN
INSERT INTO source (tid, tname) VALUES(3, 'd')
END*/
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
/*get update field*/
ArrayList<String> updateFieldNamesList = new ArrayList<String>(fieldNames.length);
Collections.addAll(updateFieldNamesList, fieldNames);
ArrayList<String> uniqueKeyFieldsList = new ArrayList<String>(uniqueKeyFields.length);
Collections.addAll(uniqueKeyFieldsList, uniqueKeyFields);
updateFieldNamesList.removeAll(uniqueKeyFieldsList);
String updateClause =
Arrays.stream(updateFieldNamesList.toArray(new String[0]))
.map(f -> quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String sql =
"IF EXISTS ( SELECT * FROM " + tableName + " WHERE " + onClause + " ) "
+ " BEGIN "
+ " UPDATE " + tableName + " SET " + updateClause + " WHERE " + onClause
+ " END "
+ " ELSE "
+ " BEGIN "
+ getInsertStatement(tableName, fieldNames)
+ " END";
return Optional.of(sql);
}
private String getInsertStatement(String tableName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "INSERT INTO " + tableName + "(" + columns + ") VALUES (" + placeholders + ")";
}
@Override
public String dialectName() {
return "SQLServer";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* ClickHouse.
*
* @author wenmo
* @since 2021/6/7 21:49
*/
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "ClickHouse";
}
public ClickHouseRowConverter(RowType rowType) {
super(rowType);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
*
* @author wenmo
* @since 2021/6/7 21:45
*/
public class OracleRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "Oracle";
}
public OracleRowConverter(RowType rowType) {
super(rowType);
}
}
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* SQLServerRowConverter
*
* @author wenmo
* @since 2021/12/9
**/
public class SQLServerRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "SQLServer";
}
public SQLServerRowConverter(RowType rowType) {
super(rowType);
}
}
...@@ -146,17 +146,4 @@ ...@@ -146,17 +146,4 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<!--<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
<finalName>${project.artifactId}</finalName>
</build>-->
</project> </project>
\ No newline at end of file
...@@ -26,7 +26,6 @@ import org.apache.flink.table.types.logical.RowType; ...@@ -26,7 +26,6 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
/** /**
* JDBC dialect for ClickHouse. * JDBC dialect for ClickHouse.
......
...@@ -14,9 +14,11 @@ public final class JdbcDialects { ...@@ -14,9 +14,11 @@ public final class JdbcDialects {
private static final List<JdbcDialect> DIALECTS = private static final List<JdbcDialect> DIALECTS =
Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect() Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect()
, new OracleDialect(), new ClickHouseDialect(), new SQLServerDialect()); , new OracleDialect(), new ClickHouseDialect(), new SQLServerDialect());
/** Fetch the JdbcDialect class corresponding to a given database url. */ /**
* Fetch the JdbcDialect class corresponding to a given database url.
*/
public static Optional<JdbcDialect> get(String url) { public static Optional<JdbcDialect> get(String url) {
for (JdbcDialect dialect : DIALECTS) { for (JdbcDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) { if (dialect.canHandle(url)) {
......
...@@ -23,11 +23,7 @@ import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter; ...@@ -23,11 +23,7 @@ import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
......
...@@ -5,11 +5,7 @@ import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter; ...@@ -5,11 +5,7 @@ import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -76,13 +72,13 @@ public class SQLServerDialect extends AbstractDialect { ...@@ -76,13 +72,13 @@ public class SQLServerDialect extends AbstractDialect {
.collect(Collectors.joining(" AND ")); .collect(Collectors.joining(" AND "));
String sql = String sql =
"IF EXISTS ( SELECT * FROM " + tableName + " WHERE " + onClause + " ) " "IF EXISTS ( SELECT * FROM " + tableName + " WHERE " + onClause + " ) "
+ " BEGIN " + " BEGIN "
+ " UPDATE " + tableName + " SET " + updateClause + " WHERE " + onClause + " UPDATE " + tableName + " SET " + updateClause + " WHERE " + onClause
+ " END " + " END "
+ " ELSE " + " ELSE "
+ " BEGIN " + " BEGIN "
+ getInsertStatement(tableName,fieldNames) + getInsertStatement(tableName, fieldNames)
+ " END"; + " END";
return Optional.of(sql); return Optional.of(sql);
} }
......
...@@ -139,18 +139,5 @@ ...@@ -139,18 +139,5 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<!--<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
<finalName>${project.artifactId}</finalName>
</build>-->
</project> </project>
\ No newline at end of file
...@@ -26,7 +26,6 @@ import org.apache.flink.table.types.logical.RowType; ...@@ -26,7 +26,6 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
/** /**
* JDBC dialect for ClickHouse. * JDBC dialect for ClickHouse.
......
...@@ -16,7 +16,9 @@ public final class JdbcDialects { ...@@ -16,7 +16,9 @@ public final class JdbcDialects {
Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect() Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect()
, new OracleDialect(), new ClickHouseDialect(), new SQLServerDialect()); , new OracleDialect(), new ClickHouseDialect(), new SQLServerDialect());
/** Fetch the JdbcDialect class corresponding to a given database url. */ /**
* Fetch the JdbcDialect class corresponding to a given database url.
*/
public static Optional<JdbcDialect> get(String url) { public static Optional<JdbcDialect> get(String url) {
for (JdbcDialect dialect : DIALECTS) { for (JdbcDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) { if (dialect.canHandle(url)) {
......
...@@ -5,11 +5,7 @@ import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter; ...@@ -5,11 +5,7 @@ import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -81,13 +77,13 @@ public class SQLServerDialect extends AbstractDialect { ...@@ -81,13 +77,13 @@ public class SQLServerDialect extends AbstractDialect {
.collect(Collectors.joining(" AND ")); .collect(Collectors.joining(" AND "));
String sql = String sql =
"IF EXISTS ( SELECT * FROM " + tableName + " WHERE " + onClause + " ) " "IF EXISTS ( SELECT * FROM " + tableName + " WHERE " + onClause + " ) "
+ " BEGIN " + " BEGIN "
+ " UPDATE " + tableName + " SET " + updateClause + " WHERE " + onClause + " UPDATE " + tableName + " SET " + updateClause + " WHERE " + onClause
+ " END " + " END "
+ " ELSE " + " ELSE "
+ " BEGIN " + " BEGIN "
+ getInsertStatement(tableName,fieldNames) + getInsertStatement(tableName, fieldNames)
+ " END"; + " END";
return Optional.of(sql); return Optional.of(sql);
} }
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-connectors</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-connector-jdbc-1.14</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.3</flink.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<postgres.version>42.2.10</postgres.version>
<otj-pg-embedded.version>0.13.3</otj-pg-embedded.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Postgres dependencies -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgres.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- Postgres test dependencies -->
<dependency>
<groupId>com.opentable.components</groupId>
<artifactId>otj-pg-embedded</artifactId>
<version>${otj-pg-embedded.version}</version>
<scope>test</scope>
</dependency>
<!-- MySQL test dependencies -->
<dependency>
<groupId>ch.vorburger.mariaDB4j</groupId>
<artifactId>mariaDB4j</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<!-- ch.vorburger.mariaDB4j:mariaDB4j has a dependency of mariadb-java-client:2.3.0,
but we want to bump mariadb-java-client to 2.5.4 which fix a few notable bugs,
see: https://mariadb.com/kb/en/mariadb-connector-j-release-notes/
and the lower version may cause the test stability issue FLINK-18082.-->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.5.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>test</scope>
</dependency>
<!-- Derby test dependencies -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.14.2.0</version>
<scope>test</scope>
</dependency>
<!-- Oracle test dependencies -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<scope>test</scope>
</dependency>
<!-- SQLServer test dependencies -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>8.2.2.jre8</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* JDBC dialect for ClickHouse.
*
* @author wenmo
* @since 2021/9/19 20:32
*/
public class ClickHouseDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:clickhouse:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new ClickHouseRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.of(getInsertIntoStatement(tableName, fieldNames));
}
@Override
public String dialectName() {
return "ClickHouse";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* JdbcDialects
*
* @author wenmo
* @since 2021/9/19 20:27
*/
public final class JdbcDialects {
private static final List<JdbcDialect> DIALECTS =
Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect()
, new OracleDialect(), new ClickHouseDialect(), new SQLServerDialect());
/**
* Fetch the JdbcDialect class corresponding to a given database url.
*/
public static Optional<JdbcDialect> get(String url) {
for (JdbcDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) {
return Optional.of(dialect);
}
}
return Optional.empty();
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.stream.Collectors;
/**
* JDBC dialect for Oracle.
*
* @author wenmo
* @since 2021/9/19 20:30
*/
public class OracleDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
// Define MAX/MIN precision of TIMESTAMP type according to Oracle docs:
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
// jdbc:oracle:thin:@127.0.0.1:1521:ORCL
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:oracle:thin:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new OracleRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return "ROWNUM < " + limit;
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("oracle.jdbc.driver.OracleDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
/*get update field*/
ArrayList<String> updateFieldNamesList = new ArrayList<String>(fieldNames.length);
Collections.addAll(updateFieldNamesList, fieldNames);
ArrayList<String> uniqueKeyFieldsList = new ArrayList<String>(uniqueKeyFields.length);
Collections.addAll(uniqueKeyFieldsList, uniqueKeyFields);
updateFieldNamesList.removeAll(uniqueKeyFieldsList);
String updateClause =
Arrays.stream(updateFieldNamesList.toArray(new String[0]))
.map(f -> "a." + quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> "a." + quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String sql =
"MERGE INTO "
+ tableName
+ " a USING ( SELECT 1 FROM dual ) b ON ( "
+ onClause
+ " )"
+ " WHEN MATCHED THEN"
+ " UPDATE SET "
+ updateClause
+ " WHEN NOT MATCHED THEN "
+ getInsertStatement(fieldNames);
return Optional.of(sql);
}
private String getInsertStatement(String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "INSERT " + "(" + columns + ")" + " VALUES (" + placeholders + ")";
}
@Override
public String dialectName() {
return "Oracle";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.stream.Collectors;
/**
* SQLServerDialect
*
* @author wenmo
* @since 2021/12/9
**/
public class SQLServerDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
// jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:sqlserver:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new SQLServerRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return "";
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
/*IF EXISTS(SELECT * FROM source WHERE tid = 3)
BEGIN
UPDATE source SET tname = 'd' WHERE tid = 3
END
ELSE
BEGIN
INSERT INTO source (tid, tname) VALUES(3, 'd')
END*/
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
/*get update field*/
ArrayList<String> updateFieldNamesList = new ArrayList<String>(fieldNames.length);
Collections.addAll(updateFieldNamesList, fieldNames);
ArrayList<String> uniqueKeyFieldsList = new ArrayList<String>(uniqueKeyFields.length);
Collections.addAll(uniqueKeyFieldsList, uniqueKeyFields);
updateFieldNamesList.removeAll(uniqueKeyFieldsList);
String updateClause =
Arrays.stream(updateFieldNamesList.toArray(new String[0]))
.map(f -> quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String sql =
"IF EXISTS ( SELECT * FROM " + tableName + " WHERE " + onClause + " ) "
+ " BEGIN "
+ " UPDATE " + tableName + " SET " + updateClause + " WHERE " + onClause
+ " END "
+ " ELSE "
+ " BEGIN "
+ getInsertStatement(tableName, fieldNames)
+ " END";
return Optional.of(sql);
}
private String getInsertStatement(String tableName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "INSERT INTO " + tableName + "(" + columns + ") VALUES (" + placeholders + ")";
}
@Override
public String dialectName() {
return "SQLServer";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* ClickHouse.
*
* @author wenmo
* @since 2021/9/19 20:28
*/
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "ClickHouse";
}
public ClickHouseRowConverter(RowType rowType) {
super(rowType);
}
}
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
*
* @author wenmo
* @since 2021/9/19 20:28
*/
public class OracleRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "Oracle";
}
public OracleRowConverter(RowType rowType) {
super(rowType);
}
}
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* SQLServerRowConverter
*
* @author wenmo
* @since 2021/12/9
**/
public class SQLServerRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "SQLServer";
}
public SQLServerRowConverter(RowType rowType) {
super(rowType);
}
}
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
<modules> <modules>
<module>dlink-connector-jdbc-1.12</module> <module>dlink-connector-jdbc-1.12</module>
<module>dlink-connector-jdbc-1.13</module> <module>dlink-connector-jdbc-1.13</module>
<module>dlink-connector-jdbc-1.11</module>
<module>dlink-connector-jdbc-1.14</module>
</modules> </modules>
<artifactId>dlink-connectors</artifactId> <artifactId>dlink-connectors</artifactId>
</project> </project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment