Commit 18aa6bbc authored by godkaikai's avatar godkaikai

新增SQLServer的Connector

parent 8d691890
......@@ -6,8 +6,7 @@ Dlink 为 Apache Flink 而生,让 Flink SQL 更加丝滑。它是一个交互
需要注意的是,Dlink 它更专注于 FlinkSQL 的应用,而不是 DataStream。在开发过程中您不会看到任何一句 java、scala 或者 python。所以,它的目标是基于 100% FlinkSQL 来实现批流一体的实时计算平台。
站在巨人肩膀上开发与创新,Dlink 在未来批流一体的发展趋势下潜力无限。
值得惊喜的是,Dlink 的实现基于最新 Flink 源码二次开发,而交互更加贴近 Flink 的功能与体验,并且紧随官方社区发展。即站在巨人肩膀上开发与创新,Dlink 在未来批流一体的发展趋势下潜力无限。
## 原理
......@@ -19,10 +18,9 @@ Dlink 为 Apache Flink 而生,让 Flink SQL 更加丝滑。它是一个交互
| 域 | 概要 | 进展 |
| :-----------------: | :----------------------------------: | :----: |
| 基本管理 | 作业及Savepoint管理 | 已实现 |
| | FlinkSQL管理 | 已实现 |
| | Flink 集群管理 | 已实现 |
| | Flink 集群配置管理 | 已实现 |
| 基本管理 | 作业及 Savepoint 管理 | 已实现 |
| | FlinkSQL 及执行配置管理 | 已实现 |
| | Flink 集群及配置管理 | 已实现 |
| | 数据源管理 | 已实现 |
| | 文档管理 | 已实现 |
| | 系统配置 | 已实现 |
......@@ -31,6 +29,7 @@ Dlink 为 Apache Flink 而生,让 Flink SQL 更加丝滑。它是一个交互
| | AGGTABLE 语法 | 已实现 |
| | 语句集 | 已实现 |
| | 支持 sql-client 所有语法 | 已实现 |
| | 支持 Flink 所有 Configuration | 已实现 |
| FlinkSQL 交互式开发 | 会话的 connector 查询 | 已实现 |
| | 语法检查 | 已实现 |
| | 执行图校验 | 已实现 |
......@@ -41,7 +40,7 @@ Dlink 为 Apache Flink 而生,让 Flink SQL 更加丝滑。它是一个交互
| | 支持选中提交 | 已实现 |
| | 布局拖拽 | 已实现 |
| | SELECT、SHOW等语法数据预览 | 已实现 |
| | JobGraph 图预览 | 已实现 |
| | JobPlanGraph 预览 | 已实现 |
| Flink 任务运维 | standalone SQL提交 | 已实现 |
| | yarn session SQL提交 | 已实现 |
| | yarn per-job SQL提交 | 已实现 |
......
......@@ -139,7 +139,13 @@
<artifactId>ojdbc8</artifactId>
<scope>test</scope>
</dependency>
<!-- SQLServer test dependencies -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>8.2.2.jre8</version>
<scope>test</scope>
</dependency>
</dependencies>
<!--<build>
<plugins>
......
......@@ -14,7 +14,7 @@ public final class JdbcDialects {
private static final List<JdbcDialect> DIALECTS =
Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect()
, new OracleDialect(), new ClickHouseDialect());
, new OracleDialect(), new ClickHouseDialect(), new SQLServerDialect());
/** Fetch the JdbcDialect class corresponding to a given database url. */
public static Optional<JdbcDialect> get(String url) {
......
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* SQLServerDialect
*
* @author wenmo
* @since 2021/12/9
**/
public class SQLServerDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
// jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:sqlserver:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new SQLServerRowConverter(rowType);
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
/*IF EXISTS(SELECT * FROM source WHERE tid = 3)
BEGIN
UPDATE source SET tname = 'd' WHERE tid = 3
END
ELSE
BEGIN
INSERT INTO source (tid, tname) VALUES(3, 'd')
END*/
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
/*get update field*/
ArrayList<String> updateFieldNamesList = new ArrayList<String>(fieldNames.length);
Collections.addAll(updateFieldNamesList, fieldNames);
ArrayList<String> uniqueKeyFieldsList = new ArrayList<String>(uniqueKeyFields.length);
Collections.addAll(uniqueKeyFieldsList, uniqueKeyFields);
updateFieldNamesList.removeAll(uniqueKeyFieldsList);
String updateClause =
Arrays.stream(updateFieldNamesList.toArray(new String[0]))
.map(f -> quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String sql =
"IF EXISTS ( SELECT * FROM " + tableName + " WHERE " + onClause + " ) "
+ " BEGIN "
+ " UPDATE " + tableName + " SET " + updateClause + " WHERE " + onClause
+ " END "
+ " ELSE "
+ " BEGIN "
+ getInsertStatement(tableName,fieldNames)
+ " END";
return Optional.of(sql);
}
private String getInsertStatement(String tableName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "INSERT INTO " + tableName + "(" + columns + ") VALUES (" + placeholders + ")";
}
@Override
public String dialectName() {
return "SQLServer";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* SQLServerRowConverter
*
* @author wenmo
* @since 2021/12/9
**/
public class SQLServerRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "SQLServer";
}
public SQLServerRowConverter(RowType rowType) {
super(rowType);
}
}
......@@ -140,6 +140,13 @@
<scope>test</scope>
</dependency>
<!-- SQLServer test dependencies -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>8.2.2.jre8</version>
<scope>test</scope>
</dependency>
</dependencies>
<!--<build>
<plugins>
......
......@@ -14,7 +14,7 @@ public final class JdbcDialects {
private static final List<JdbcDialect> DIALECTS =
Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect()
, new OracleDialect(), new ClickHouseDialect());
, new OracleDialect(), new ClickHouseDialect(), new SQLServerDialect());
/** Fetch the JdbcDialect class corresponding to a given database url. */
public static Optional<JdbcDialect> get(String url) {
......
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* SQLServerDialect
*
* @author wenmo
* @since 2021/12/9
**/
public class SQLServerDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
// jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:sqlserver:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new SQLServerRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return "";
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
/*IF EXISTS(SELECT * FROM source WHERE tid = 3)
BEGIN
UPDATE source SET tname = 'd' WHERE tid = 3
END
ELSE
BEGIN
INSERT INTO source (tid, tname) VALUES(3, 'd')
END*/
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
/*get update field*/
ArrayList<String> updateFieldNamesList = new ArrayList<String>(fieldNames.length);
Collections.addAll(updateFieldNamesList, fieldNames);
ArrayList<String> uniqueKeyFieldsList = new ArrayList<String>(uniqueKeyFields.length);
Collections.addAll(uniqueKeyFieldsList, uniqueKeyFields);
updateFieldNamesList.removeAll(uniqueKeyFieldsList);
String updateClause =
Arrays.stream(updateFieldNamesList.toArray(new String[0]))
.map(f -> quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String sql =
"IF EXISTS ( SELECT * FROM " + tableName + " WHERE " + onClause + " ) "
+ " BEGIN "
+ " UPDATE " + tableName + " SET " + updateClause + " WHERE " + onClause
+ " END "
+ " ELSE "
+ " BEGIN "
+ getInsertStatement(tableName,fieldNames)
+ " END";
return Optional.of(sql);
}
private String getInsertStatement(String tableName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "INSERT INTO " + tableName + "(" + columns + ") VALUES (" + placeholders + ")";
}
@Override
public String dialectName() {
return "SQLServer";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* SQLServerRowConverter
*
* @author wenmo
* @since 2021/12/9
**/
public class SQLServerRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "SQLServer";
}
public SQLServerRowConverter(RowType rowType) {
super(rowType);
}
}
......@@ -10,7 +10,7 @@ public interface FlinkSQLConstant {
/**
* 分隔符
*/
String SEPARATOR = ";";
String SEPARATOR = ";\r\n";
/**
* DDL 类型
*/
......
......@@ -447,6 +447,19 @@ export default (): React.ReactNode => {
</ul>
</Paragraph>
</Timeline.Item>
<Timeline.Item><Text code>0.5.0</Text> <Text type="secondary">2022-01-??</Text>
<p> </p>
<Paragraph>
<ul>
<li>
<Link>新增 JobPlanGraph 来替代 StreamGraph </Link>
</li>
<li>
<Link>新增 SQLServer Jdbc Connector 的实现</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
</Timeline>
</Card>
</PageContainer>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment