Commit cdc26038 authored by wenmo's avatar wenmo

修复Flink1.14

parent 59c7b06d
......@@ -317,7 +317,11 @@ AGG BY TOP2(value) as (value,rank);
2.[Dlink 概念原理与源码扩展介绍](https://github.com/DataLinkDC/dlink/blob/main/dlink-doc/doc/Dlink%E6%A0%B8%E5%BF%83%E6%A6%82%E5%BF%B5%E4%B8%8E%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86%E8%AF%A6%E8%A7%A3.md)
3.[Dlink 实时计算平台——部署篇](https://github.com/DataLinkDC/dlink/blob/dev/dlink-doc/doc/Dlink%E5%AE%9E%E6%97%B6%E8%AE%A1%E7%AE%97%E5%B9%B3%E5%8F%B0%E2%80%94%E2%80%94%E9%83%A8%E7%BD%B2%E7%AF%87.md)
3.[Dlink-0.3.0重磅来袭](https://github.com/DataLinkDC/dlink/blob/main/dlink-doc/doc/Dlink0.3.0%E9%87%8D%E7%A3%85%E6%9D%A5%E8%A2%AD.md)
4.[Dlink 实时计算平台——部署篇](https://github.com/DataLinkDC/dlink/blob/dev/dlink-doc/doc/Dlink%E5%AE%9E%E6%97%B6%E8%AE%A1%E7%AE%97%E5%B9%B3%E5%8F%B0%E2%80%94%E2%80%94%E9%83%A8%E7%BD%B2%E7%AF%87.md)
5.[Dlink-0.3.2更新说明](https://github.com/DataLinkDC/dlink/blob/main/dlink-doc/doc/Dlink-0.3.2%E6%9B%B4%E6%96%B0%E8%AF%B4%E6%98%8E.md)
#### 常见问题及解决
......
......@@ -118,7 +118,7 @@
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<!--<dependency>
<groupId>com.dlink</groupId>
......
......@@ -66,6 +66,13 @@
<include>dlink-client-1.13-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-client/dlink-client-1.14/target</directory>
<outputDirectory>extends</outputDirectory>
<includes>
<include>dlink-client-1.14-${project.version}.jar</include>
</includes>
</fileSet>
<!-- 将模块dlink-connectors的jar文件放到打包目录/lib下 -->
<fileSet>
......
package com.dlink.executor.custom;
import org.apache.flink.configuration.Configuration;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.internal.DlinkTableEnvironmentImpl;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
......@@ -14,7 +19,20 @@ import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.DefaultExecutor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
/**
* 定制TableEnvironmentImpl
......@@ -22,37 +40,51 @@ import org.apache.flink.table.module.ModuleManager;
* @author wenmo
* @since 2021/10/22 10:02
**/
public class CustomTableEnvironmentImpl extends DlinkTableEnvironmentImpl {
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager,sqlManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
}
private SqlManager sqlManager;
private boolean useSqlFragment = true;
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
this.sqlManager = sqlManager;
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return create(settings, settings.toConfiguration());
public CustomTableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
SqlManager sqlManager,
FunctionCatalog functionCatalog,
TableConfig tableConfig,
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
boolean isStreamingMode,
ClassLoader userClassLoader) {
super(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
isStreamingMode,
userClassLoader);
this.sqlManager = sqlManager;
}
public static CustomTableEnvironmentImpl create(Configuration configuration) {
return create(EnvironmentSettings.fromConfiguration(configuration), configuration);
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment){
return create(executionEnvironment,EnvironmentSettings.newInstance().build(),TableConfig.getDefault());
}
public static CustomTableEnvironmentImpl create(EnvironmentSettings settings) {
return create(settings, settings.toConfiguration());
}
public static CustomTableEnvironmentImpl create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings,
TableConfig tableConfig) {
private static CustomTableEnvironmentImpl create(
EnvironmentSettings settings, Configuration configuration) {
// temporary solution until FLINK-15635 is fixed
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// use configuration to init table config
final TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
final ModuleManager moduleManager = new ModuleManager();
final SqlManager sqlManager = new SqlManager();
......@@ -66,15 +98,14 @@ public class CustomTableEnvironmentImpl extends DlinkTableEnvironmentImpl {
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.executionConfig(executionEnvironment.getConfig())
.build();
final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, settings.getExecutor());
final Executor executor = executorFactory.create(configuration);
final Executor executor =
lookupExecutor(classLoader, settings.getExecutor(), executionEnvironment);
final Planner planner =
PlannerFactoryUtil.createPlanner(
......@@ -86,13 +117,219 @@ public class CustomTableEnvironmentImpl extends DlinkTableEnvironmentImpl {
return new CustomTableEnvironmentImpl(
catalogManager,
sqlManager,
moduleManager,
tableConfig,
executor,
sqlManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
settings.isStreamingMode(),
classLoader);
}
private static Executor lookupExecutor(
ClassLoader classLoader,
String executorIdentifier,
StreamExecutionEnvironment executionEnvironment) {
try {
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, executorIdentifier);
final Method createMethod =
executorFactory
.getClass()
.getMethod("create", StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(executorFactory, executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
}
}
public void useSqlFragment() {
this.useSqlFragment = true;
}
public void unUseSqlFragment() {
this.useSqlFragment = false;
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
} else {
return super.explainSql(statement, extraDetails);
}
}
public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be explained.");
}
List<Operation> operations = super.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if(execEnv instanceof DefaultExecutor){
StreamGraph streamGraph = ((DefaultExecutor) execEnv).getExecutionEnvironment().generateStreamGraph(trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
return objectNode;
}
}else{
throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query.");
}
}
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
String orignSql = statement;
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
record.setParseTrue(true);
record.setType("Sql Fragment");
record.setExplain(orignSql);
record.setExplainTrue(true);
return record;
}
}
List<Operation> operations = getParser().parse(statement);
record.setParseTrue(true);
if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! explainSql() only accepts a single SQL query.");
}
List<Operation> operationlist = new ArrayList<>(operations);
for (int i = 0; i < operationlist.size(); i++) {
Operation operation = operationlist.get(i);
if (operation instanceof ModifyOperation) {
record.setType("Modify DML");
} else if (operation instanceof ExplainOperation) {
record.setType("Explain DML");
} else if (operation instanceof QueryOperation) {
record.setType("Query DML");
} else {
record.setExplain(operation.asSummaryString());
operationlist.remove(i);
record.setType("DDL");
i=i-1;
}
}
record.setExplainTrue(true);
if(operationlist.size()==0){
//record.setExplain("DDL语句不进行解释。");
return record;
}
record.setExplain(planner.explain(operationlist, extraDetails));
return record;
}
@Override
public String[] getCompletionHints(String statement, int position) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return new String[0];
}
}
return super.getCompletionHints(statement, position);
}
@Override
public Table sqlQuery(String query) {
if(useSqlFragment) {
query = sqlManager.parseVariable(query);
if (query.length() == 0) {
throw new TableException("Unsupported SQL query! The SQL query parsed is null.If it's a sql fragment, and please use executeSql().");
}
if (checkShowFragments(query)) {
return sqlManager.getSqlFragmentsTable(this);
} else {
return super.sqlQuery(query);
}
}else {
return super.sqlQuery(query);
}
}
@Override
public TableResult executeSql(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return CustomTableResultImpl.TABLE_RESULT_OK;
}
if (checkShowFragments(statement)) {
return sqlManager.getSqlFragments();
} else {
return super.executeSql(statement);
}
}else{
return super.executeSql(statement);
}
}
@Override
public void sqlUpdate(String stmt) {
if(useSqlFragment) {
stmt = sqlManager.parseVariable(stmt);
if (stmt.length() == 0) {
throw new TableException("Unsupported SQL update! The SQL update parsed is null.If it's a sql fragment, and please use executeSql().");
}
}
super.sqlUpdate(stmt);
}
public boolean checkShowFragments(String sql){
return sqlManager.checkShowFragments(sql);
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
}
public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
}
public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
}
......@@ -5,7 +5,6 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.DlinkTableEnvironmentImpl;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
......@@ -133,7 +132,7 @@ public final class SqlManager {
return sqlFragments.entrySet().iterator();
}
public Table getSqlFragmentsTable(DlinkTableEnvironmentImpl environment) {
public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
......
package org.apache.flink.table.api.internal;
import com.dlink.executor.custom.CustomTableResultImpl;
import com.dlink.executor.custom.SqlManager;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.DefaultExecutor;
import java.util.ArrayList;
import java.util.List;
/**
* 定制TableEnvironmentImpl
*
* @author wenmo
* @since 2021/10/22 10:02
**/
public class DlinkTableEnvironmentImpl extends TableEnvironmentImpl {
private SqlManager sqlManager;
private boolean useSqlFragment = true;
protected DlinkTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
this.sqlManager = sqlManager;
}
public static TableEnvironmentImpl create(Configuration configuration) {
return create(EnvironmentSettings.fromConfiguration(configuration), configuration);
}
public static TableEnvironmentImpl create(EnvironmentSettings settings) {
return create(settings, settings.toConfiguration());
}
private static TableEnvironmentImpl create(
EnvironmentSettings settings, Configuration configuration) {
// temporary solution until FLINK-15635 is fixed
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// use configuration to init table config
final TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
final ModuleManager moduleManager = new ModuleManager();
final CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, settings.getExecutor());
final Executor executor = executorFactory.create(configuration);
final Planner planner =
PlannerFactoryUtil.createPlanner(
settings.getPlanner(),
executor,
tableConfig,
catalogManager,
functionCatalog);
return new TableEnvironmentImpl(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
settings.isStreamingMode(),
classLoader);
}
public void useSqlFragment() {
this.useSqlFragment = true;
}
public void unUseSqlFragment() {
this.useSqlFragment = false;
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
} else {
return super.explainSql(statement, extraDetails);
}
}
public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be explained.");
}
List<Operation> operations = super.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if(execEnv instanceof DefaultExecutor){
StreamGraph streamGraph = ((DefaultExecutor) execEnv).getExecutionEnvironment().generateStreamGraph(trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
return objectNode;
}
}else{
throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query.");
}
}
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
String orignSql = statement;
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
record.setParseTrue(true);
record.setType("Sql Fragment");
record.setExplain(orignSql);
record.setExplainTrue(true);
return record;
}
}
List<Operation> operations = getParser().parse(statement);
record.setParseTrue(true);
if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! explainSql() only accepts a single SQL query.");
}
List<Operation> operationlist = new ArrayList<>(operations);
for (int i = 0; i < operationlist.size(); i++) {
Operation operation = operationlist.get(i);
if (operation instanceof ModifyOperation) {
record.setType("Modify DML");
} else if (operation instanceof ExplainOperation) {
record.setType("Explain DML");
} else if (operation instanceof QueryOperation) {
record.setType("Query DML");
} else {
record.setExplain(operation.asSummaryString());
operationlist.remove(i);
record.setType("DDL");
i=i-1;
}
}
record.setExplainTrue(true);
if(operationlist.size()==0){
//record.setExplain("DDL语句不进行解释。");
return record;
}
record.setExplain(planner.explain(operationlist, extraDetails));
return record;
}
@Override
public String[] getCompletionHints(String statement, int position) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return new String[0];
}
}
return super.getCompletionHints(statement, position);
}
@Override
public Table sqlQuery(String query) {
if(useSqlFragment) {
query = sqlManager.parseVariable(query);
if (query.length() == 0) {
throw new TableException("Unsupported SQL query! The SQL query parsed is null.If it's a sql fragment, and please use executeSql().");
}
if (checkShowFragments(query)) {
return sqlManager.getSqlFragmentsTable(this);
} else {
return super.sqlQuery(query);
}
}else {
return super.sqlQuery(query);
}
}
@Override
public TableResult executeSql(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return CustomTableResultImpl.TABLE_RESULT_OK;
}
if (checkShowFragments(statement)) {
return sqlManager.getSqlFragments();
} else {
return super.executeSql(statement);
}
}else{
return super.executeSql(statement);
}
}
@Override
public void sqlUpdate(String stmt) {
if(useSqlFragment) {
stmt = sqlManager.parseVariable(stmt);
if (stmt.length() == 0) {
throw new TableException("Unsupported SQL update! The SQL update parsed is null.If it's a sql fragment, and please use executeSql().");
}
}
super.sqlUpdate(stmt);
}
public boolean checkShowFragments(String sql){
return sqlManager.checkShowFragments(sql);
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
}
public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
}
public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
}
# Dlink 读写 Hive 的实践
## 前言
最近有很多小伙伴问,dlink 如何连接 Hive 进行数据开发?
关于 dlink 连接 Hive 的步骤同 Flink 的 `sql-client ` ,只不过它没有默认加载的配置文件。下文将详细讲述对 Hive 操作的全过程。
## 准备工作
由于搭建 Hive 的开发环境会涉及到重多组件和插件,那其版本对应问题也是至关重要,它能帮我们避免很多不必要的问题,当然小版本号之间具备一定的兼容性。
我们先来梳理下本教程的各个组件版本:
| 组件 | 版本 |
| :----: | :----: |
| Dlink | 0.3.2 |
| Flink | 1.12.4 |
| Hadoop | 2.7.7 |
| Hive | 2.3.6 |
| Mysql | 8.0.15 |
再来梳理下本教程的各个插件版本:
| 所属组件 | 插件 | 版本 |
| :-----------: | :------------------------: | :-------------------: |
| Dlink | dlink-client | 1.12 |
| Dlink & Flink | flink-sql-connector-hive | 2.3.6_2.11-1.12.3 |
| Dlink & Flink | flink-shaded-hadoop-3-uber | 3.1.1.7.2.8.0-224-9.0 |
## 部署扩展
部署扩展的工作非常简单(前提是 Dlink 部署完成并成功连接 Flink 集群,相关部署步骤请查看《Dlink实时计算平台——部署篇》),只需要把 `flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar``flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar` 两个插件分别加入到 Dlink 的 plugins 目录与 Flink 的 lib 目录下即可,然后重启二者。当然,还需要放置 `hive-site.xml`,位置自定义,Dlink 可以访问到即可。
## 创建 Hive Catalog
已知,Hive 已经新建了一个数据库实例 `hdb` ,创建了一张表 `htest`,列为 `name``age`,存储位置默认为 `hdfs:///usr/local/hadoop/hive-2.3.9/warehouse/hdb.db` 。(此处为何 2.3.9 呢,因为 `flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar` 只支持到最高版本 2.3.6,小编先装了个 2.3.9 后装了个 2.3.6,尴尬 > _ < ~)
```sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'hdb',
'hive-conf-dir' = '/usr/local/dlink/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
select * from htest
```
在 Dlink 编辑器中输入以上 sql ,创建 Hive Catalog,并查询一张表。
其中,`hive-conf-dir` 需要指定 `hive-site.xml` 的路径,其他同 Flink 官方解释。
![image-20211030150744871](D:\DataCenter\worknote\公众号\插图\hive篇\hive创建catalog.png)
执行查询后(记得选中执行配置的预览结果),可以从查询结果中查看到 htest 表中只有一条数据。(这是正确的,因为小编太懒了,只随手模拟了一条数据)
此时可以使用 FlinkSQL 愉快地操作 Hive 的数据了。
## 使用 Hive Dialect
很熟悉 Hive 的语法以及需要对 Hive 执行其自身特性的语句怎么办?
同 Flink 官方解释一样,只需要使用 `SET table.sql-dialect=hive` 来启用方言即可。注意有两种方言 `default``hive` ,它们的使用可以随意切换哦~
```sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'hdb',
'hive-conf-dir' = '/usr/local/dlink/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
-- set hive dialect
SET table.sql-dialect=hive;
-- alter table location
alter table htest set location 'hdfs:///usr/htest';
-- set default dialect
SET table.sql-dialect=default;
select * from htest;
```
上述 sql 中添加了 Hive Dialect 的使用,FlinkSQL 本身不支持 `alter table .. set location ..` 的语法,使用 Hive Dialect 则可以实现语法的切换。本 sql 内容对 htest 表进行存储位置的改变,将其更改为一个新的路径,然后再执行查询。
![image-20211030151433858](D:\DataCenter\worknote\公众号\插图\hive篇\hivedialect.png)
由上图可见,被更改过 location 的 htest 此时查询没有数据,是正确的。
然后将 location 更改为之前的路径,再执行查询,则可见原来的那条数据,如下图所示。
![image-20211030151532520](D:\DataCenter\worknote\公众号\插图\hive篇\hivesetlocation.png)
## 总结
由上所知,Dlink 以更加友好的交互方式展现了 Flink 集成 Hive 的部分功能,当然其他更多的 Hive 功能需要您自己在使用的过程中去体验与挖掘。
目前,Dlink 支持 Flink 绝大多数特性与功能,集成与拓展方式与 Flink 官方文档描述一致,只需要在 Dlink 的 plugins 目录下添加依赖即可。
需要注意的是,由于 Dlink 是一个基于 SpringBoot 的 B/S 应用,难免存在依赖冲突问题,可以通过 Nginx 前后端分离部署来避免一部分问题;而对于 mysql-cdc 等包,由于内部依赖冲突问题,会导致 Dlink 自身无法正常启动,(da lao 可以自行排查依赖冲突问题并重新打包)该问题目前有相应的解决方案,需要等待后续版本的发布。
......@@ -354,6 +354,9 @@ export default (): React.ReactNode => {
<li>
<Link>新增 FlinkSQL 自动补全可以根据 sql 上下文来提示已注册的元数据的功能</Link>
</li>
<li>
<Link>修复 Flink 1.14.0 远程提交无法正确提交任务至集群的问题</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment