Commit 6d32afa0 authored by godkaikai's avatar godkaikai

扩展Flink1.13.3和1.14.0

parent baa157da
......@@ -116,35 +116,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public String getStreamGraphString(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
}
List<Operation> operations = super.parser.parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans).getStreamingPlanAsJSON();
}else{
return "Unsupported SQL query! explainSql() need a single SQL to query.";
}
}
}
public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
......
......@@ -117,35 +117,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public String getStreamGraphString(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
}
List<Operation> operations = super.parser.parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans).getStreamingPlanAsJSON();
}else{
return "Unsupported SQL query! explainSql() need a single SQL to query.";
}
}
}
public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
......
......@@ -14,7 +14,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.2</flink.version>
<flink.version>1.13.3</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......
......@@ -111,35 +111,6 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public String getStreamGraphString(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
}
List<Operation> operations = super.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans).getStreamingPlanAsJSON();
}else{
return "Unsupported SQL query! explainSql() need a single SQL to query.";
}
}
}
public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.3.2-SANPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-client-1.14</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--打jar包-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.dlink.executor.custom;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.internal.DlinkTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.module.ModuleManager;
/**
* 定制TableEnvironmentImpl
*
* @author wenmo
* @since 2021/10/22 10:02
**/
public class CustomTableEnvironmentImpl extends DlinkTableEnvironmentImpl {
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager,sqlManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return create(settings, settings.toConfiguration());
}
public static CustomTableEnvironmentImpl create(Configuration configuration) {
return create(EnvironmentSettings.fromConfiguration(configuration), configuration);
}
public static CustomTableEnvironmentImpl create(EnvironmentSettings settings) {
return create(settings, settings.toConfiguration());
}
private static CustomTableEnvironmentImpl create(
EnvironmentSettings settings, Configuration configuration) {
// temporary solution until FLINK-15635 is fixed
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// use configuration to init table config
final TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
final ModuleManager moduleManager = new ModuleManager();
final SqlManager sqlManager = new SqlManager();
final CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, settings.getExecutor());
final Executor executor = executorFactory.create(configuration);
final Planner planner =
PlannerFactoryUtil.createPlanner(
settings.getPlanner(),
executor,
tableConfig,
catalogManager,
functionCatalog);
return new CustomTableEnvironmentImpl(
catalogManager,
sqlManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
settings.isStreamingMode(),
classLoader);
}
}
package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.DlinkTableEnvironmentImpl;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/10/22 10:02
**/
@Internal
public final class SqlManager {
private Map<String, String> sqlFragments;
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
sqlFragments = new HashMap<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public List<String> listSqlFragments() {
return new ArrayList<>(sqlFragments.keySet());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public Map<String, String> getSqlFragment() {
return sqlFragments;
}
public TableResult getSqlFragments() {
List<Row> rows = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows);
}
public Iterator getSqlFragmentsIterator() {
return sqlFragments.entrySet().iterator();
}
public Table getSqlFragmentsTable(DlinkTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
}
return environment.fromValues(keys);
}
public boolean checkShowFragments(String sql){
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) {
return statement;
}
String[] strs = statement.split(";");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim();
if (str.length() == 0) {
continue;
}
if (str.contains(":=")) {
String[] strs2 = str.split(":=");
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(":=") + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
}
} else {
sb.append(replaceVariable(str));
}
}
return sb.toString();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
}
package com.dlink.executor.custom;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/10/22 10:02
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
package com.dlink.utils;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
/**
* FlinkUtil
*
* @author wenmo
* @since 2021/10/22 10:02
*/
public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table){
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return tableOpt.get().getResolvedSchema().getColumnNames();
}else{
return new ArrayList<String>();
}
}
}
......@@ -16,5 +16,6 @@
<module>dlink-client-1.12</module>
<module>dlink-client-1.13</module>
<module>dlink-client-1.11</module>
<module>dlink-client-1.14</module>
</modules>
</project>
\ No newline at end of file
......@@ -158,10 +158,6 @@ public abstract class Executor {
return stEnvironment.explainSqlRecord(statement,extraDetails);
}
public String getStreamGraphString(String statement){
return stEnvironment.getStreamGraphString(statement);
}
public ObjectNode getStreamGraph(String statement){
return stEnvironment.getStreamGraph(statement);
}
......
......@@ -76,10 +76,6 @@ public class FlinkSqlPlus {
return explainer.explainSqlColumnCA(statement);
}
public String getStreamGraphString(String statement) {
return executor.getStreamGraphString(statement);
}
public ObjectNode getStreamGraph(String statement) {
return executor.getStreamGraph(statement);
}
......
......@@ -332,6 +332,12 @@ export default (): React.ReactNode => {
<li>
<Link>支持set语法来设置执行环境参数</Link>
</li>
<li>
<Link>升级了 Flink 1.13 的版本支持为 1.13.3</Link>
</li>
<li>
<Link>扩展了 Flink 1.14 的支持</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
......@@ -96,7 +96,7 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<!--<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
......@@ -105,7 +105,7 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependency>-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
......@@ -163,6 +163,11 @@
<artifactId>dlink-client-1.13</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.14</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.11</artifactId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment