Commit 3df7d3bb authored by godkaikai's avatar godkaikai

血缘分析改进

parent 0e91d5e4
...@@ -161,6 +161,7 @@ ...@@ -161,6 +161,7 @@
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot-dependencies.version}</version>
<configuration> <configuration>
<layout>ZIP</layout> <layout>ZIP</layout>
<mainClass>com.dlink.Dlink</mainClass> <mainClass>com.dlink.Dlink</mainClass>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.3.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-client-1.11</artifactId>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.11.4</flink.version>
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties>
</project>
...@@ -3,11 +3,18 @@ package com.dlink.executor.custom; ...@@ -3,11 +3,18 @@ package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*; import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionCatalog;
...@@ -162,8 +169,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -162,8 +169,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
if(execEnv instanceof ExecutorBase){ if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans); StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph); JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
ObjectNode jsonNode = jsonGenerator.getJSONNode(); String json = jsonGenerator.getJSON();
return jsonNode; ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
return objectNode;
}
}else{ }else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase."); throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.*;
/** Helper class for generating a JSON representation from a {@link StreamGraph}. */
@Internal
public class JSONGenerator {
public static final String STEPS = "step_function";
public static final String ID = "id";
public static final String SIDE = "side";
public static final String SHIP_STRATEGY = "ship_strategy";
public static final String PREDECESSORS = "predecessors";
public static final String TYPE = "type";
public static final String PACT = "pact";
public static final String CONTENTS = "contents";
public static final String PARALLELISM = "parallelism";
private StreamGraph streamGraph;
private final ObjectMapper mapper = new ObjectMapper();
public JSONGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
}
public String getJSON() {
return getJSONNode().toPrettyString();
}
public ObjectNode getJSONNode() {
ObjectNode json = mapper.createObjectNode();
ArrayNode nodes = mapper.createArrayNode();
json.put("nodes", nodes);
List<Integer> operatorIDs = new ArrayList<>(streamGraph.getVertexIDs());
Comparator<Integer> operatorIDComparator =
Comparator.comparingInt(
(Integer id) -> streamGraph.getSinkIDs().contains(id) ? 1 : 0)
.thenComparingInt(id -> id);
operatorIDs.sort(operatorIDComparator);
visit(nodes, operatorIDs, new HashMap<>());
return json;
}
private void visit(
ArrayNode jsonArray, List<Integer> toVisit, Map<Integer, Integer> edgeRemapings) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
ObjectNode node = mapper.createObjectNode();
decorateNode(vertexID, node);
if (!streamGraph.getSourceIDs().contains(vertexID)) {
ArrayNode inputs = mapper.createArrayNode();
node.put(PREDECESSORS, inputs);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
Integer mappedID =
(edgeRemapings.keySet().contains(inputID))
? edgeRemapings.get(inputID)
: inputID;
decorateEdge(inputs, inEdge, mappedID);
}
}
jsonArray.add(node);
toVisit.remove(vertexID);
} else {
Integer iterationHead = -1;
for (StreamEdge inEdge : vertex.getInEdges()) {
int operator = inEdge.getSourceId();
if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
iterationHead = operator;
}
}
ObjectNode obj = mapper.createObjectNode();
ArrayNode iterationSteps = mapper.createArrayNode();
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
ArrayNode iterationInputs = mapper.createArrayNode();
obj.put(PREDECESSORS, iterationInputs);
toVisit.remove(iterationHead);
visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
jsonArray.add(obj);
}
if (!toVisit.isEmpty()) {
visit(jsonArray, toVisit, edgeRemapings);
}
}
private void visitIteration(
ArrayNode jsonArray,
List<Integer> toVisit,
int headId,
Map<Integer, Integer> edgeRemapings,
ArrayNode iterationInEdges) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);
// Ignoring head and tail to avoid redundancy
if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
ObjectNode obj = mapper.createObjectNode();
jsonArray.add(obj);
decorateNode(vertexID, obj);
ArrayNode inEdges = mapper.createArrayNode();
obj.put(PREDECESSORS, inEdges);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
if (edgeRemapings.keySet().contains(inputID)) {
decorateEdge(inEdges, inEdge, inputID);
} else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
decorateEdge(iterationInEdges, inEdge, inputID);
}
}
edgeRemapings.put(vertexID, headId);
visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
}
}
private void decorateEdge(ArrayNode inputArray, StreamEdge inEdge, int mappedInputID) {
ObjectNode input = mapper.createObjectNode();
inputArray.add(input);
input.put(ID, mappedInputID);
input.put(SHIP_STRATEGY, inEdge.getPartitioner().toString());
input.put(SIDE, (inputArray.size() == 0) ? "first" : "second");
}
private void decorateNode(Integer vertexID, ObjectNode node) {
StreamNode vertex = streamGraph.getStreamNode(vertexID);
node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
if (streamGraph.getSourceIDs().contains(vertexID)) {
node.put(PACT, "Data Source");
} else if (streamGraph.getSinkIDs().contains(vertexID)) {
node.put(PACT, "Data Sink");
} else {
node.put(PACT, "Operator");
}
node.put(CONTENTS, vertex.getOperatorName());
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}
}
...@@ -61,7 +61,7 @@ ...@@ -61,7 +61,7 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version> <scala.binary.version>2.11</scala.binary.version>
<flink.version>1.12.4</flink.version> <flink.version>1.12.5</flink.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
......
...@@ -3,14 +3,18 @@ package com.dlink.executor.custom; ...@@ -3,14 +3,18 @@ package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.calcite.shaded.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.calcite.shaded.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*; import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionCatalog;
...@@ -165,8 +169,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -165,8 +169,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
if(execEnv instanceof ExecutorBase){ if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans); StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph); JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
ObjectNode jsonNode = jsonGenerator.getJSONNode(); String json = jsonGenerator.getJSON();
return jsonNode; ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
return objectNode;
}
}else{ }else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase."); throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** Helper class for generating a JSON representation from a {@link StreamGraph}. */
@Internal
public class JSONGenerator {
public static final String STEPS = "step_function";
public static final String ID = "id";
public static final String SIDE = "side";
public static final String SHIP_STRATEGY = "ship_strategy";
public static final String PREDECESSORS = "predecessors";
public static final String TYPE = "type";
public static final String PACT = "pact";
public static final String CONTENTS = "contents";
public static final String PARALLELISM = "parallelism";
private StreamGraph streamGraph;
private final ObjectMapper mapper = new ObjectMapper();
public JSONGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
}
public String getJSON() {
return getJSONNode().toPrettyString();
}
public ObjectNode getJSONNode() {
ObjectNode json = mapper.createObjectNode();
ArrayNode nodes = mapper.createArrayNode();
json.put("nodes", nodes);
List<Integer> operatorIDs = new ArrayList<>(streamGraph.getVertexIDs());
Comparator<Integer> operatorIDComparator =
Comparator.comparingInt(
(Integer id) -> streamGraph.getSinkIDs().contains(id) ? 1 : 0)
.thenComparingInt(id -> id);
operatorIDs.sort(operatorIDComparator);
visit(nodes, operatorIDs, new HashMap<>());
return json;
}
private void visit(
ArrayNode jsonArray, List<Integer> toVisit, Map<Integer, Integer> edgeRemapings) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
ObjectNode node = mapper.createObjectNode();
decorateNode(vertexID, node);
if (!streamGraph.getSourceIDs().contains(vertexID)) {
ArrayNode inputs = mapper.createArrayNode();
node.put(PREDECESSORS, inputs);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
Integer mappedID =
(edgeRemapings.keySet().contains(inputID))
? edgeRemapings.get(inputID)
: inputID;
decorateEdge(inputs, inEdge, mappedID);
}
}
jsonArray.add(node);
toVisit.remove(vertexID);
} else {
Integer iterationHead = -1;
for (StreamEdge inEdge : vertex.getInEdges()) {
int operator = inEdge.getSourceId();
if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
iterationHead = operator;
}
}
ObjectNode obj = mapper.createObjectNode();
ArrayNode iterationSteps = mapper.createArrayNode();
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
ArrayNode iterationInputs = mapper.createArrayNode();
obj.put(PREDECESSORS, iterationInputs);
toVisit.remove(iterationHead);
visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
jsonArray.add(obj);
}
if (!toVisit.isEmpty()) {
visit(jsonArray, toVisit, edgeRemapings);
}
}
private void visitIteration(
ArrayNode jsonArray,
List<Integer> toVisit,
int headId,
Map<Integer, Integer> edgeRemapings,
ArrayNode iterationInEdges) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);
// Ignoring head and tail to avoid redundancy
if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
ObjectNode obj = mapper.createObjectNode();
jsonArray.add(obj);
decorateNode(vertexID, obj);
ArrayNode inEdges = mapper.createArrayNode();
obj.put(PREDECESSORS, inEdges);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
if (edgeRemapings.keySet().contains(inputID)) {
decorateEdge(inEdges, inEdge, inputID);
} else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
decorateEdge(iterationInEdges, inEdge, inputID);
}
}
edgeRemapings.put(vertexID, headId);
visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
}
}
private void decorateEdge(ArrayNode inputArray, StreamEdge inEdge, int mappedInputID) {
ObjectNode input = mapper.createObjectNode();
inputArray.add(input);
input.put(ID, mappedInputID);
input.put(SHIP_STRATEGY, inEdge.getPartitioner().toString());
input.put(SIDE, (inputArray.size() == 0) ? "first" : "second");
}
private void decorateNode(Integer vertexID, ObjectNode node) {
StreamNode vertex = streamGraph.getStreamNode(vertexID);
node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
if (streamGraph.getSourceIDs().contains(vertexID)) {
node.put(PACT, "Data Source");
} else if (streamGraph.getSinkIDs().contains(vertexID)) {
node.put(PACT, "Data Sink");
} else {
node.put(PACT, "Operator");
}
node.put(CONTENTS, vertex.getOperatorName());
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}
}
...@@ -61,7 +61,7 @@ ...@@ -61,7 +61,7 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version> <scala.binary.version>2.11</scala.binary.version>
<flink.version>1.13.1</flink.version> <flink.version>1.13.2</flink.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
......
...@@ -3,6 +3,8 @@ package com.dlink.executor.custom; ...@@ -3,6 +3,8 @@ package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator; import org.apache.flink.streaming.api.graph.JSONGenerator;
...@@ -162,8 +164,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl { ...@@ -162,8 +164,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
if(execEnv instanceof ExecutorBase){ if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans); StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph); JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
ObjectNode jsonNode = jsonGenerator.getJSONNode(); String json = jsonGenerator.getJSON();
return jsonNode; ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
return objectNode;
}
}else{ }else{
throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query."); throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query.");
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.*;
/** Helper class for generating a JSON representation from a {@link StreamGraph}. */
@Internal
public class JSONGenerator {
public static final String STEPS = "step_function";
public static final String ID = "id";
public static final String SIDE = "side";
public static final String SHIP_STRATEGY = "ship_strategy";
public static final String PREDECESSORS = "predecessors";
public static final String TYPE = "type";
public static final String PACT = "pact";
public static final String CONTENTS = "contents";
public static final String PARALLELISM = "parallelism";
private StreamGraph streamGraph;
private final ObjectMapper mapper = new ObjectMapper();
public JSONGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
}
public String getJSON() {
return getJSONNode().toPrettyString();
}
public ObjectNode getJSONNode() {
ObjectNode json = mapper.createObjectNode();
ArrayNode nodes = mapper.createArrayNode();
json.put("nodes", nodes);
List<Integer> operatorIDs = new ArrayList<>(streamGraph.getVertexIDs());
Comparator<Integer> operatorIDComparator =
Comparator.comparingInt(
(Integer id) -> streamGraph.getSinkIDs().contains(id) ? 1 : 0)
.thenComparingInt(id -> id);
operatorIDs.sort(operatorIDComparator);
visit(nodes, operatorIDs, new HashMap<>());
return json;
}
private void visit(
ArrayNode jsonArray, List<Integer> toVisit, Map<Integer, Integer> edgeRemapings) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
ObjectNode node = mapper.createObjectNode();
decorateNode(vertexID, node);
if (!streamGraph.getSourceIDs().contains(vertexID)) {
ArrayNode inputs = mapper.createArrayNode();
node.put(PREDECESSORS, inputs);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
Integer mappedID =
(edgeRemapings.keySet().contains(inputID))
? edgeRemapings.get(inputID)
: inputID;
decorateEdge(inputs, inEdge, mappedID);
}
}
jsonArray.add(node);
toVisit.remove(vertexID);
} else {
Integer iterationHead = -1;
for (StreamEdge inEdge : vertex.getInEdges()) {
int operator = inEdge.getSourceId();
if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
iterationHead = operator;
}
}
ObjectNode obj = mapper.createObjectNode();
ArrayNode iterationSteps = mapper.createArrayNode();
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
ArrayNode iterationInputs = mapper.createArrayNode();
obj.put(PREDECESSORS, iterationInputs);
toVisit.remove(iterationHead);
visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
jsonArray.add(obj);
}
if (!toVisit.isEmpty()) {
visit(jsonArray, toVisit, edgeRemapings);
}
}
private void visitIteration(
ArrayNode jsonArray,
List<Integer> toVisit,
int headId,
Map<Integer, Integer> edgeRemapings,
ArrayNode iterationInEdges) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);
// Ignoring head and tail to avoid redundancy
if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
ObjectNode obj = mapper.createObjectNode();
jsonArray.add(obj);
decorateNode(vertexID, obj);
ArrayNode inEdges = mapper.createArrayNode();
obj.put(PREDECESSORS, inEdges);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
if (edgeRemapings.keySet().contains(inputID)) {
decorateEdge(inEdges, inEdge, inputID);
} else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
decorateEdge(iterationInEdges, inEdge, inputID);
}
}
edgeRemapings.put(vertexID, headId);
visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
}
}
private void decorateEdge(ArrayNode inputArray, StreamEdge inEdge, int mappedInputID) {
ObjectNode input = mapper.createObjectNode();
inputArray.add(input);
input.put(ID, mappedInputID);
input.put(SHIP_STRATEGY, inEdge.getPartitioner().toString());
input.put(SIDE, (inputArray.size() == 0) ? "first" : "second");
}
private void decorateNode(Integer vertexID, ObjectNode node) {
StreamNode vertex = streamGraph.getStreamNode(vertexID);
node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
if (streamGraph.getSourceIDs().contains(vertexID)) {
node.put(PACT, "Data Source");
} else if (streamGraph.getSinkIDs().contains(vertexID)) {
node.put(PACT, "Data Sink");
} else {
node.put(PACT, "Operator");
}
node.put(CONTENTS, vertex.getOperatorName());
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}
}
...@@ -58,7 +58,7 @@ public class OperatorTrans extends AbstractTrans implements Trans { ...@@ -58,7 +58,7 @@ public class OperatorTrans extends AbstractTrans implements Trans {
@Override @Override
public void translate() { public void translate() {
name = pact; name = pact;
Map map = MapParseUtils.parse(contents,"where"); Map map = MapParseUtils.parseForSelect(contents);
translateSelect((ArrayList<String>) map.get("select")); translateSelect((ArrayList<String>) map.get("select"));
joinType = (ArrayList<String>) map.get("joinType"); joinType = (ArrayList<String>) map.get("joinType");
where = map.containsKey("where")?map.get("where").toString():null; where = map.containsKey("where")?map.get("where").toString():null;
......
...@@ -74,6 +74,37 @@ public class MapParseUtils { ...@@ -74,6 +74,37 @@ public class MapParseUtils {
return nestIndexList; return nestIndexList;
} }
/**
* 获取最外层括号下标 table=[((f.SERIAL_NO || f.PRESC_NO) || f.ITEM_NO) AS EXPR$0, ((f.DATE || f.TIME) || f.ITEM_NO) AS EXPR$2]
* ^(下标x) ^(下标y) ^(下标z) ^(下标n)
* List<Integer> [x, y, z, n]
*
* @param inStr
* @return
*/
public static List<Integer> getBracketsList(String inStr) {
Stack nestIndexList = new Stack();
if (inStr == null || inStr.isEmpty()) {
return nestIndexList;
}
Deque<Integer> stack = new LinkedList<>();
for (int i = 0; i < inStr.length(); i++) {
if (inStr.charAt(i) == '(') {
if (stack.isEmpty()) {
nestIndexList.add(i);
}
stack.push(i);
}
if (inStr.charAt(i) == ')') {
stack.pop();
if (stack.size() == 0) {
nestIndexList.add(i);
}
}
}
return nestIndexList;
}
/** /**
* 转换map * 转换map
...@@ -81,9 +112,9 @@ public class MapParseUtils { ...@@ -81,9 +112,9 @@ public class MapParseUtils {
* @param inStr * @param inStr
* @return * @return
*/ */
public static Map parse(String inStr,String... blackKeys) { public static Map parse(String inStr, String... blackKeys) {
if (getStrIsNest(inStr)) { if (getStrIsNest(inStr)) {
return parseForNest(inStr,blackKeys); return parseForNest(inStr, blackKeys);
} else { } else {
return parseForNotNest(inStr); return parseForNotNest(inStr);
} }
...@@ -95,7 +126,7 @@ public class MapParseUtils { ...@@ -95,7 +126,7 @@ public class MapParseUtils {
* @param inStr * @param inStr
* @return * @return
*/ */
public static Map parseForNest(String inStr,String... blackKeys) { public static Map parseForNest(String inStr, String... blackKeys) {
Map map = new HashMap(); Map map = new HashMap();
List<Integer> nestList = getNestList(inStr); List<Integer> nestList = getNestList(inStr);
int num = nestList.size() / 2; int num = nestList.size() / 2;
...@@ -105,17 +136,17 @@ public class MapParseUtils { ...@@ -105,17 +136,17 @@ public class MapParseUtils {
String key = getMapKey(substring); String key = getMapKey(substring);
boolean isNext = true; boolean isNext = true;
for (int j = 0; j < blackKeys.length; j++) { for (int j = 0; j < blackKeys.length; j++) {
if(key.equals(blackKeys[j])){ if (key.equals(blackKeys[j])) {
isNext = false; isNext = false;
} }
} }
if(isNext) { if (isNext) {
if (getStrIsNest(substring)) { if (getStrIsNest(substring)) {
map.put(key, getMapListNest(substring)); map.put(key, getMapListNest(substring));
} else { } else {
map.put(key, getMapList(substring)); map.put(key, getMapList(substring));
} }
}else{ } else {
map.put(key, getTextValue(substring)); map.put(key, getTextValue(substring));
} }
} else { } else {
...@@ -123,17 +154,17 @@ public class MapParseUtils { ...@@ -123,17 +154,17 @@ public class MapParseUtils {
String key = getMapKey(substring); String key = getMapKey(substring);
boolean isNext = true; boolean isNext = true;
for (int j = 0; j < blackKeys.length; j++) { for (int j = 0; j < blackKeys.length; j++) {
if(key.equals(blackKeys[j])){ if (key.equals(blackKeys[j])) {
isNext = false; isNext = false;
} }
} }
if(isNext) { if (isNext) {
if (getStrIsNest(substring)) { if (getStrIsNest(substring)) {
map.put(key, getMapListNest(substring)); map.put(key, getMapListNest(substring));
} else { } else {
map.put(key, getMapList(substring)); map.put(key, getMapList(substring));
} }
}else{ } else {
map.put(key, getTextValue(substring)); map.put(key, getTextValue(substring));
} }
} }
...@@ -141,6 +172,38 @@ public class MapParseUtils { ...@@ -141,6 +172,38 @@ public class MapParseUtils {
return map; return map;
} }
/**
* @return java.util.Map
* @author lewnn
* @operate
* @date 2021/8/20 15:03
*/
public static Map parseForSelect(String inStr) {
Map map = new HashMap();
List<Integer> bracketsList = getBracketsList(inStr);
String mapKey = getMapKey(inStr);
List<String> list = new ArrayList<>();
int size = bracketsList.size();
if (size % 2 != 0) {
// 此处若size部位偶数 则返回空 可能会存在问题
return map;
} else {
int numSize = size / 2;//括号对数
for (int i = 0; i < numSize; i++) {
String msgStr = "";
if (2 * i + 2 >= size) {
msgStr = inStr.substring(bracketsList.get(2 * i), inStr.lastIndexOf("]"));
} else {
msgStr = inStr.substring(bracketsList.get(2 * i), bracketsList.get(2 * i + 2));
msgStr = msgStr.substring(0, msgStr.lastIndexOf(",") > 0 ? msgStr.lastIndexOf(",") : msgStr.length());
}
list.add(msgStr);
}
}
map.put(mapKey, list);
return map;
}
/** /**
* 非嵌套解析 * 非嵌套解析
* *
...@@ -151,10 +214,10 @@ public class MapParseUtils { ...@@ -151,10 +214,10 @@ public class MapParseUtils {
String[] split = inStr.split("], "); String[] split = inStr.split("], ");
Map map = new HashMap(); Map map = new HashMap();
for (int i = 0; i < split.length; i++) { for (int i = 0; i < split.length; i++) {
if(i == split.length -1){ if (i == split.length - 1) {
map.put(getMapKey( split[i]), getMapList(split[i])); map.put(getMapKey(split[i]), getMapList(split[i]));
}else { } else {
map.put(getMapKey( split[i]+ "]"), getMapList(split[i] + "]")); map.put(getMapKey(split[i] + "]"), getMapList(split[i] + "]"));
} }
} }
return map; return map;
...@@ -235,7 +298,7 @@ public class MapParseUtils { ...@@ -235,7 +298,7 @@ public class MapParseUtils {
return list; return list;
} }
private static String getTextValue(String splitStr){ private static String getTextValue(String splitStr) {
return splitStr.substring(splitStr.indexOf("[") + 1, splitStr.lastIndexOf("]")); return splitStr.substring(splitStr.indexOf("[") + 1, splitStr.lastIndexOf("]"));
} }
} }
...@@ -358,4 +358,7 @@ CREATE TABLE `dlink_database` ( ...@@ -358,4 +358,7 @@ CREATE TABLE `dlink_database` (
UNIQUE INDEX `db_index`(`name`) USING BTREE UNIQUE INDEX `db_index`(`name`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
ALTER TABLE `dlink`.`dlink_cluster`
ADD COLUMN `version` varchar(20) NULL COMMENT '版本' AFTER `job_manager_host`;
SET FOREIGN_KEY_CHECKS = 1; SET FOREIGN_KEY_CHECKS = 1;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment