Unverified Commit c1b4b556 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-923][*] Build column lineage base on flink logical plan (#924)

* [Feature-923][*] Build column lineage base on flink logical plan

* [Feature-923][*] Dynamic add getColumnOrigins method to class RelMdColumnOrigins by javassist
Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent 72412ddc
...@@ -56,6 +56,7 @@ import com.dlink.model.DataBase; ...@@ -56,6 +56,7 @@ import com.dlink.model.DataBase;
import com.dlink.model.FlinkColumn; import com.dlink.model.FlinkColumn;
import com.dlink.model.Savepoints; import com.dlink.model.Savepoints;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.model.SystemConfiguration;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.model.Task; import com.dlink.model.Task;
import com.dlink.result.DDLResult; import com.dlink.result.DDLResult;
...@@ -322,7 +323,11 @@ public class StudioServiceImpl implements StudioService { ...@@ -322,7 +323,11 @@ public class StudioServiceImpl implements StudioService {
} }
} else { } else {
addFlinkSQLEnv(studioCADTO); addFlinkSQLEnv(studioCADTO);
return LineageBuilder.getLineage(studioCADTO.getStatement(), studioCADTO.getStatementSet()); if (SystemConfiguration.getInstances().isUseLogicalPlan()) {
return LineageBuilder.getColumnLineageByLogicalPlan(studioCADTO.getStatement());
} else {
return LineageBuilder.getLineage(studioCADTO.getStatement());
}
} }
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package com.dlink.executor; package com.dlink.executor;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -308,6 +309,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -308,6 +309,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
createTemporaryView(path, fromDataStream(dataStream, fields)); createTemporaryView(path, fromDataStream(dataStream, fields));
} }
@Override
public List<LineageRel> getLineage(String statement) {
return null;
}
@Override @Override
public <T> void createTemporaryView( public <T> void createTemporaryView(
String path, DataStream<T> dataStream, Expression... fields) { String path, DataStream<T> dataStream, Expression... fields) {
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package com.dlink.executor; package com.dlink.executor;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.RuntimeExecutionMode;
...@@ -310,6 +311,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -310,6 +311,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
createTemporaryView(path, fromDataStream(dataStream, fields)); createTemporaryView(path, fromDataStream(dataStream, fields));
} }
@Override
public List<LineageRel> getLineage(String statement) {
return null;
}
@Override @Override
public <T> void createTemporaryView( public <T> void createTemporaryView(
String path, DataStream<T> dataStream, Expression... fields) { String path, DataStream<T> dataStream, Expression... fields) {
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package com.dlink.executor; package com.dlink.executor;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.RuntimeExecutionMode;
...@@ -351,6 +352,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -351,6 +352,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
this.createTemporaryView(path, this.fromDataStream(dataStream, fields)); this.createTemporaryView(path, this.fromDataStream(dataStream, fields));
} }
@Override
public List<LineageRel> getLineage(String statement) {
return null;
}
@Override @Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields) { public <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields) {
this.createTemporaryView(path, this.fromDataStream(dataStream, fields)); this.createTemporaryView(path, this.fromDataStream(dataStream, fields));
......
...@@ -54,11 +54,5 @@ ...@@ -54,11 +54,5 @@
<artifactId>jackson-datatype-jsr310</artifactId> <artifactId>jackson-datatype-jsr310</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.3_flink-1.14_2.11</version>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -19,9 +19,6 @@ ...@@ -19,9 +19,6 @@
package com.dlink.executor; package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
...@@ -62,6 +59,7 @@ import org.apache.flink.table.operations.QueryOperation; ...@@ -62,6 +59,7 @@ import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation; import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation; import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.planner.delegation.DefaultExecutor; import org.apache.flink.table.planner.delegation.DefaultExecutor;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.typeutils.FieldInfoUtils; import org.apache.flink.table.typeutils.FieldInfoUtils;
import java.lang.reflect.Method; import java.lang.reflect.Method;
...@@ -72,6 +70,11 @@ import java.util.List; ...@@ -72,6 +70,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import com.dlink.assertion.Asserts;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult;
import com.dlink.utils.FlinkStreamProgramWithoutPhysical;
import com.dlink.utils.LineageContext;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
...@@ -85,6 +88,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; ...@@ -85,6 +88,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment { public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment {
private final StreamExecutionEnvironment executionEnvironment; private final StreamExecutionEnvironment executionEnvironment;
private final FlinkChainedProgram flinkChainedProgram;
public CustomTableEnvironmentImpl( public CustomTableEnvironmentImpl(
CatalogManager catalogManager, CatalogManager catalogManager,
...@@ -106,6 +110,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -106,6 +110,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
isStreamingMode, isStreamingMode,
userClassLoader); userClassLoader);
this.executionEnvironment = executionEnvironment; this.executionEnvironment = executionEnvironment;
this.flinkChainedProgram = FlinkStreamProgramWithoutPhysical.buildProgram((Configuration) executionEnvironment.getConfiguration());
} }
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) { public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
...@@ -349,6 +354,12 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements ...@@ -349,6 +354,12 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements
createTemporaryView(path, fromDataStream(dataStream, fields)); createTemporaryView(path, fromDataStream(dataStream, fields));
} }
@Override
public List<LineageRel> getLineage(String statement) {
LineageContext lineageContext = new LineageContext(flinkChainedProgram, this);
return lineageContext.getLineage(statement);
}
@Override @Override
public <T> void createTemporaryView( public <T> void createTemporaryView(
String path, DataStream<T> dataStream, Expression... fields) { String path, DataStream<T> dataStream, Expression... fields) {
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.utils;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.hep.HepMatchOrder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram;
import org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgramBuilder;
import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
import org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets;
/**
* FlinkStreamProgramWithoutPhysical
*
* @author wenmo
* @since 2022/8/20 23:33
*/
public class FlinkStreamProgramWithoutPhysical {
private final static String SUBQUERY_REWRITE = "subquery_rewrite";
private final static String TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite";
private final static String DECORRELATE = "decorrelate";
private final static String DEFAULT_REWRITE = "default_rewrite";
private final static String PREDICATE_PUSHDOWN = "predicate_pushdown";
private final static String JOIN_REORDER = "join_reorder";
private final static String PROJECT_REWRITE = "project_rewrite";
private final static String LOGICAL = "logical";
private final static String LOGICAL_REWRITE = "logical_rewrite";
private final static String TIME_INDICATOR = "time_indicator";
public static FlinkChainedProgram buildProgram(Configuration config) {
FlinkChainedProgram chainedProgram = new FlinkChainedProgram();
// rewrite sub-queries to joins
chainedProgram.addLast(
SUBQUERY_REWRITE,
FlinkGroupProgramBuilder.newBuilder()
// rewrite QueryOperationCatalogViewTable before rewriting sub-queries
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.TABLE_REF_RULES())
.build(), "convert table references before rewriting sub-queries to semi-join")
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.SEMI_JOIN_RULES())
.build(), "rewrite sub-queries to semi-join")
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.TABLE_SUBQUERY_RULES())
.build(), "sub-queries remove")
// convert RelOptTableImpl (which exists in SubQuery before) to FlinkRelOptTable
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.TABLE_REF_RULES())
.build(), "convert table references after sub-queries removed")
.build());
// rewrite special temporal join plan
chainedProgram.addLast(
TEMPORAL_JOIN_REWRITE,
FlinkGroupProgramBuilder.newBuilder()
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.EXPAND_PLAN_RULES())
.build(), "convert correlate to temporal table join")
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.POST_EXPAND_CLEAN_UP_RULES())
.build(), "convert enumerable table scan")
.build());
// query decorrelation
chainedProgram.addLast(DECORRELATE,
FlinkGroupProgramBuilder.newBuilder()
// rewrite before decorrelation
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.PRE_DECORRELATION_RULES())
.build(), "pre-rewrite before decorrelation")
.addProgram(new FlinkDecorrelateProgram(), "")
.build());
// default rewrite, includes: predicate simplification, expression reduction, window
// properties rewrite, etc.
chainedProgram.addLast(
DEFAULT_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.DEFAULT_REWRITE_RULES())
.build());
// rule based optimization: push down predicate(s) in where clause, so it only needs to read
// the required data
chainedProgram.addLast(
PREDICATE_PUSHDOWN,
FlinkGroupProgramBuilder.newBuilder()
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.FILTER_PREPARE_RULES())
.build(), "filter rules")
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.FILTER_TABLESCAN_PUSHDOWN_RULES())
.build(), "push predicate into table scan")
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.PRUNE_EMPTY_RULES())
.build(), "prune empty after predicate push down")
.build());
// join reorder
if (config.getBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
chainedProgram.addLast(
JOIN_REORDER,
FlinkGroupProgramBuilder.newBuilder()
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.JOIN_REORDER_PREPARE_RULES())
.build(), "merge join into MultiJoin")
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.JOIN_REORDER_RULES())
.build(), "do join reorder")
.build());
}
// project rewrite
chainedProgram.addLast(
PROJECT_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.PROJECT_RULES())
.build());
// optimize the logical plan
chainedProgram.addLast(
LOGICAL,
FlinkVolcanoProgramBuilder.newBuilder()
.add(FlinkStreamRuleSets.LOGICAL_OPT_RULES())
.setRequiredOutputTraits(new Convention.Impl[] {
FlinkConventions.LOGICAL()
})
.build());
// logical rewrite
chainedProgram.addLast(
LOGICAL_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.LOGICAL_REWRITE())
.build());
return chainedProgram;
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.utils;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
import org.apache.flink.table.planner.plan.trait.MiniBatchInterval;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import com.dlink.model.LineageRel;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.Modifier;
/**
* LineageContext
*
* @author baisong
* @since 2022/8/6 11:06
*/
public class LineageContext {
private final FlinkChainedProgram flinkChainedProgram;
private final TableEnvironmentImpl tableEnv;
public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentImpl tableEnv) {
this.flinkChainedProgram = flinkChainedProgram;
this.tableEnv = tableEnv;
}
/**
* Dynamic add getColumnOrigins method to class RelMdColumnOrigins by javassist:
*
* public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn) {
* return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
* }
*/
static {
try {
ClassPool classPool = ClassPool.getDefault();
CtClass ctClass = classPool.getCtClass("org.apache.calcite.rel.metadata.RelMdColumnOrigins");
CtClass[] parameters = new CtClass[] {classPool.get(Snapshot.class.getName())
, classPool.get(RelMetadataQuery.class.getName())
, CtClass.intType
};
// add method
CtMethod ctMethod = new CtMethod(classPool.get("java.util.Set"), "getColumnOrigins", parameters, ctClass);
ctMethod.setModifiers(Modifier.PUBLIC);
ctMethod.setBody("{return $2.getColumnOrigins($1.getInput(), $3);}");
ctClass.addMethod(ctMethod);
// load the class
ctClass.toClass();
} catch (Exception e) {
throw new TableException("Dynamic add getColumnOrigins() method exception.", e);
}
}
public List<LineageRel> getLineage(String statement) {
// 1. Generate original relNode tree
Tuple2<String, RelNode> parsed = parseStatement(statement);
String sinkTable = parsed.getField(0);
RelNode oriRelNode = parsed.getField(1);
// 2. Optimize original relNode to generate Optimized Logical Plan
RelNode optRelNode = optimize(oriRelNode);
// 3. Build lineage based from RelMetadataQuery
return buildFiledLineageResult(sinkTable, optRelNode);
}
private Tuple2<String, RelNode> parseStatement(String sql) {
List<Operation> operations = tableEnv.getParser().parse(sql);
if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! only accepts a single SQL statement.");
}
Operation operation = operations.get(0);
if (operation instanceof CatalogSinkModifyOperation) {
CatalogSinkModifyOperation sinkOperation = (CatalogSinkModifyOperation) operation;
PlannerQueryOperation queryOperation = (PlannerQueryOperation) sinkOperation.getChild();
RelNode relNode = queryOperation.getCalciteTree();
return new Tuple2<>(
sinkOperation.getTableIdentifier().asSummaryString(),
relNode);
} else {
throw new TableException("Only insert is supported now.");
}
}
/**
* Calling each program's optimize method in sequence.
*/
public RelNode optimize(RelNode relNode) {
return flinkChainedProgram.optimize(relNode, new StreamOptimizeContext() {
@Override
public boolean isBatchMode() {
return false;
}
@Override
public TableConfig getTableConfig() {
return tableEnv.getConfig();
}
@Override
public FunctionCatalog getFunctionCatalog() {
return ((PlannerBase) tableEnv.getPlanner()).getFlinkContext().getFunctionCatalog();
}
@Override
public CatalogManager getCatalogManager() {
return tableEnv.getCatalogManager();
}
@Override
public SqlExprToRexConverterFactory getSqlExprToRexConverterFactory() {
return relNode.getCluster().getPlanner().getContext().unwrap(FlinkContext.class).getSqlExprToRexConverterFactory();
}
@Override
public <C> C unwrap(Class<C> clazz) {
return StreamOptimizeContext.super.unwrap(clazz);
}
@Override
public FlinkRelBuilder getFlinkRelBuilder() {
return ((PlannerBase) tableEnv.getPlanner()).getRelBuilder();
}
@Override
public boolean needFinalTimeIndicatorConversion() {
return true;
}
@Override
public boolean isUpdateBeforeRequired() {
return false;
}
@Override
public MiniBatchInterval getMiniBatchInterval() {
return MiniBatchInterval.NONE;
}
});
}
/**
* Check the size of query and sink fields match
*/
private void validateSchema(String sinkTable, RelNode relNode, List<String> sinkFieldList) {
List<String> queryFieldList = relNode.getRowType().getFieldNames();
if (queryFieldList.size() != sinkFieldList.size()) {
throw new ValidationException(
String.format(
"Column types of query result and sink for %s do not match.\n"
+ "Query schema: %s\n"
+ "Sink schema: %s",
sinkTable, queryFieldList, sinkFieldList));
}
}
private List<LineageRel> buildFiledLineageResult(String sinkTable, RelNode optRelNode) {
// target columns
List<String> targetColumnList = tableEnv.from(sinkTable)
.getResolvedSchema()
.getColumnNames();
// check the size of query and sink fields match
validateSchema(sinkTable, optRelNode, targetColumnList);
RelMetadataQuery metadataQuery = optRelNode.getCluster().getMetadataQuery();
List<LineageRel> resultList = new ArrayList<>();
for (int index = 0; index < targetColumnList.size(); index++) {
String targetColumn = targetColumnList.get(index);
Set<RelColumnOrigin> relColumnOriginSet = metadataQuery.getColumnOrigins(optRelNode, index);
if (CollectionUtils.isNotEmpty(relColumnOriginSet)) {
for (RelColumnOrigin relColumnOrigin : relColumnOriginSet) {
// table
RelOptTable table = relColumnOrigin.getOriginTable();
String sourceTable = String.join(".", table.getQualifiedName());
// filed
int ordinal = relColumnOrigin.getOriginColumnOrdinal();
List<String> fieldNames = table.getRowType().getFieldNames();
String sourceColumn = fieldNames.get(ordinal);
// add record
resultList.add(LineageRel.build(sourceTable, sourceColumn, sinkTable, targetColumn));
}
}
}
return resultList;
}
}
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package com.dlink.executor; package com.dlink.executor;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.RuntimeExecutionMode;
...@@ -315,6 +316,11 @@ public class CustomTableEnvironmentImpl extends AbstractStreamTableEnvironmentIm ...@@ -315,6 +316,11 @@ public class CustomTableEnvironmentImpl extends AbstractStreamTableEnvironmentIm
createTemporaryView(path, fromDataStream(dataStream, fields)); createTemporaryView(path, fromDataStream(dataStream, fields));
} }
@Override
public List<LineageRel> getLineage(String statement) {
return null;
}
@Override @Override
public <T> void createTemporaryView( public <T> void createTemporaryView(
String path, DataStream<T> dataStream, Expression... fields) { String path, DataStream<T> dataStream, Expression... fields) {
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
<source.level>1.8</source.level> <source.level>1.8</source.level>
<target.level>1.8</target.level> <target.level>1.8</target.level>
<dubbo.version>3.0.2.1</dubbo.version> <dubbo.version>3.0.2.1</dubbo.version>
<flink.version>1.13.6</flink.version> <flink.version>1.14.5</flink.version>
<junit.version>4.12</junit.version> <junit.version>4.12</junit.version>
<spring.version>4.3.16.RELEASE</spring.version> <spring.version>4.3.16.RELEASE</spring.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
...@@ -57,7 +57,7 @@ ...@@ -57,7 +57,7 @@
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <!--<dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<exclusions> <exclusions>
...@@ -68,7 +68,7 @@ ...@@ -68,7 +68,7 @@
</exclusions> </exclusions>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>-->
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId> <artifactId>flink-clients_${scala.binary.version}</artifactId>
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package com.dlink.executor; package com.dlink.executor;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
...@@ -92,4 +93,6 @@ public interface CustomTableEnvironment { ...@@ -92,4 +93,6 @@ public interface CustomTableEnvironment {
Parser getParser(); Parser getParser();
Planner getPlanner(); Planner getPlanner();
List<LineageRel> getLineage(String statement);
} }
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.model;
/**
* LineageResult
*
* @author wenmo
* @since 2022/8/20 21:09
*/
public class LineageRel {
private String sourceCatalog;
private String sourceDatabase;
private String sourceTable;
private String sourceColumn;
private String targetCatalog;
private String targetDatabase;
private String targetTable;
private String targetColumn;
private final static String DELIMITER = ".";
public LineageRel(String sourceCatalog, String sourceDatabase, String sourceTable, String sourceColumn, String targetCatalog, String targetDatabase, String targetTable,
String targetColumn) {
this.sourceCatalog = sourceCatalog;
this.sourceDatabase = sourceDatabase;
this.sourceTable = sourceTable;
this.sourceColumn = sourceColumn;
this.targetCatalog = targetCatalog;
this.targetDatabase = targetDatabase;
this.targetTable = targetTable;
this.targetColumn = targetColumn;
}
public static LineageRel build(String sourceTablePath, String sourceColumn, String targetTablePath, String targetColumn) {
String[] sourceItems = sourceTablePath.split("\\.");
String[] targetItems = targetTablePath.split("\\.");
return new LineageRel(sourceItems[0], sourceItems[1], sourceItems[2], sourceColumn, targetItems[0], targetItems[1], targetItems[2], targetColumn);
}
public static LineageRel build(String sourceCatalog, String sourceDatabase, String sourceTable, String sourceColumn, String targetCatalog, String targetDatabase, String targetTable,
String targetColumn) {
return new LineageRel(sourceCatalog, sourceDatabase, sourceTable, sourceColumn, targetCatalog, targetDatabase, targetTable, targetColumn);
}
public String getSourceCatalog() {
return sourceCatalog;
}
public String getSourceDatabase() {
return sourceDatabase;
}
public String getSourceTable() {
return sourceTable;
}
public String getSourceColumn() {
return sourceColumn;
}
public String getTargetCatalog() {
return targetCatalog;
}
public String getTargetDatabase() {
return targetDatabase;
}
public String getTargetTable() {
return targetTable;
}
public String getTargetColumn() {
return targetColumn;
}
public String getSourceTablePath() {
return sourceCatalog + DELIMITER + sourceDatabase + DELIMITER + sourceTable;
}
public String getTargetTablePath() {
return targetCatalog + DELIMITER + targetDatabase + DELIMITER + targetTable;
}
}
...@@ -19,12 +19,11 @@ ...@@ -19,12 +19,11 @@
package com.dlink.model; package com.dlink.model;
import com.dlink.assertion.Asserts;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
/** /**
...@@ -46,6 +45,7 @@ public class SystemConfiguration { ...@@ -46,6 +45,7 @@ public class SystemConfiguration {
add(systemConfiguration.sqlSubmitJarParas); add(systemConfiguration.sqlSubmitJarParas);
add(systemConfiguration.sqlSubmitJarMainAppClass); add(systemConfiguration.sqlSubmitJarMainAppClass);
add(systemConfiguration.useRestAPI); add(systemConfiguration.useRestAPI);
add(systemConfiguration.useLogicalPlan);
add(systemConfiguration.sqlSeparator); add(systemConfiguration.sqlSeparator);
}}; }};
...@@ -84,6 +84,13 @@ public class SystemConfiguration { ...@@ -84,6 +84,13 @@ public class SystemConfiguration {
";\n", ";\n",
"Flink SQL 的语句分割符" "Flink SQL 的语句分割符"
); );
private Configuration useLogicalPlan = new Configuration(
"useLogicalPlan",
"使用逻辑计划计算血缘",
ValueType.BOOLEAN,
false,
"在计算 Flink 任务的字段血缘分析时是否基于逻辑计划进行,只支持 1.14 版本"
);
public void setConfiguration(JsonNode jsonNode) { public void setConfiguration(JsonNode jsonNode) {
for (Configuration item : CONFIGURATION_LIST) { for (Configuration item : CONFIGURATION_LIST) {
...@@ -154,6 +161,14 @@ public class SystemConfiguration { ...@@ -154,6 +161,14 @@ public class SystemConfiguration {
this.sqlSeparator.setValue(sqlSeparator); this.sqlSeparator.setValue(sqlSeparator);
} }
public boolean isUseLogicalPlan() {
return (boolean) useLogicalPlan.getValue();
}
public void setUseLogicalPlan(boolean useLogicalPlan) {
this.useLogicalPlan.setValue(useLogicalPlan);
}
enum ValueType { enum ValueType {
STRING, INT, DOUBLE, FLOAT, BOOLEAN, DATE STRING, INT, DOUBLE, FLOAT, BOOLEAN, DATE
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package com.dlink.utils; package com.dlink.utils;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.model.SystemConfiguration;
/** /**
* SqlUtil * SqlUtil
...@@ -31,12 +32,16 @@ public class SqlUtil { ...@@ -31,12 +32,16 @@ public class SqlUtil {
private static final String SEMICOLON = ";"; private static final String SEMICOLON = ";";
public static String[] getStatements(String sql) {
return getStatements(sql, SystemConfiguration.getInstances().getSqlSeparator());
}
public static String[] getStatements(String sql, String sqlSeparator) { public static String[] getStatements(String sql, String sqlSeparator) {
if (Asserts.isNullString(sql)) { if (Asserts.isNullString(sql)) {
return new String[0]; return new String[0];
} }
String[] splits = sql.replaceAll(";\r\n",";\n").split(sqlSeparator); String[] splits = sql.replaceAll(";\r\n", ";\n").split(sqlSeparator);
String lastStatement = splits[splits.length - 1].trim(); String lastStatement = splits[splits.length - 1].trim();
if (lastStatement.endsWith(SEMICOLON)) { if (lastStatement.endsWith(SEMICOLON)) {
splits[splits.length - 1] = lastStatement.substring(0, lastStatement.length() - 1); splits[splits.length - 1] = lastStatement.substring(0, lastStatement.length() - 1);
......
...@@ -19,6 +19,14 @@ ...@@ -19,6 +19,14 @@
package com.dlink.explainer; package com.dlink.explainer;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.table.catalog.CatalogManager;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
...@@ -35,6 +43,7 @@ import com.dlink.explainer.trans.TransGenerator; ...@@ -35,6 +43,7 @@ import com.dlink.explainer.trans.TransGenerator;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.job.JobParam; import com.dlink.job.JobParam;
import com.dlink.job.StatementParam; import com.dlink.job.StatementParam;
import com.dlink.model.LineageRel;
import com.dlink.model.SystemConfiguration; import com.dlink.model.SystemConfiguration;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.result.ExplainResult; import com.dlink.result.ExplainResult;
...@@ -43,15 +52,6 @@ import com.dlink.trans.Operations; ...@@ -43,15 +52,6 @@ import com.dlink.trans.Operations;
import com.dlink.utils.FlinkUtil; import com.dlink.utils.FlinkUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.table.catalog.CatalogManager;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
...@@ -138,7 +138,7 @@ public class Explainer { ...@@ -138,7 +138,7 @@ public class Explainer {
if (Asserts.isNullString(sql)) { if (Asserts.isNullString(sql)) {
continue; continue;
} }
SqlType operationType = Operations.getOperationType(item); SqlType operationType = Operations.getOperationType(sql);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) { if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
record = executor.explainSqlRecord(sql); record = executor.explainSqlRecord(sql);
if (Asserts.isNull(record)) { if (Asserts.isNull(record)) {
...@@ -282,7 +282,7 @@ public class Explainer { ...@@ -282,7 +282,7 @@ public class Explainer {
public ObjectNode getStreamGraph(String statement) { public ObjectNode getStreamGraph(String statement) {
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator)); JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
if (jobParam.getDdl().size() > 0) { if (jobParam.getDdl().size() > 0) {
for (StatementParam statementParam: jobParam.getDdl()) { for (StatementParam statementParam : jobParam.getDdl()) {
executor.executeSql(statementParam.getValue()); executor.executeSql(statementParam.getValue());
} }
} }
...@@ -302,7 +302,7 @@ public class Explainer { ...@@ -302,7 +302,7 @@ public class Explainer {
public JobPlanInfo getJobPlanInfo(String statement) { public JobPlanInfo getJobPlanInfo(String statement) {
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator)); JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
if (jobParam.getDdl().size() > 0) { if (jobParam.getDdl().size() > 0) {
for (StatementParam statementParam: jobParam.getDdl()) { for (StatementParam statementParam : jobParam.getDdl()) {
executor.executeSql(statementParam.getValue()); executor.executeSql(statementParam.getValue());
} }
} }
...@@ -495,4 +495,27 @@ public class Explainer { ...@@ -495,4 +495,27 @@ public class Explainer {
return new TransGenerator(plan).translateTrans(); return new TransGenerator(plan).translateTrans();
} }
public List<LineageRel> getLineage(String statement) {
String[] sqls = SqlUtil.getStatements(statement, sqlSeparator);
List<LineageRel> lineageRelList = new ArrayList<>();
for (String item : sqls) {
String sql = "";
try {
sql = FlinkInterceptor.pretreatStatement(executor, item);
if (Asserts.isNullString(sql)) {
continue;
}
SqlType operationType = Operations.getOperationType(sql);
if (operationType.equals(SqlType.INSERT)) {
lineageRelList.addAll(executor.getLineage(sql));
} else {
executor.executeSql(sql);
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
return lineageRelList;
}
} }
...@@ -19,11 +19,6 @@ ...@@ -19,11 +19,6 @@
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA;
import com.dlink.plus.FlinkSqlPlus;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
...@@ -31,6 +26,12 @@ import java.util.List; ...@@ -31,6 +26,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA;
import com.dlink.model.LineageRel;
import com.dlink.plus.FlinkSqlPlus;
/** /**
* LineageBuilder * LineageBuilder
* *
...@@ -91,11 +92,11 @@ public class LineageBuilder { ...@@ -91,11 +92,11 @@ public class LineageBuilder {
} }
} }
//重复表合并 //重复表合并
Map<String,String> correctTableIdMap = new HashMap<>(); Map<String, String> correctTableIdMap = new HashMap<>();
for (List<LineageTable> tableList : repeatTablesList) { for (List<LineageTable> tableList : repeatTablesList) {
LineageTable newTable = new LineageTable(); LineageTable newTable = new LineageTable();
Set<String> columnKeySet = new HashSet<>(); Set<String> columnKeySet = new HashSet<>();
for (LineageTable table: tableList) { for (LineageTable table : tableList) {
if (newTable.getId() == null || newTable.getName() == null) { if (newTable.getId() == null || newTable.getName() == null) {
newTable.setId(table.getId()); newTable.setId(table.getId());
newTable.setName(table.getName()); newTable.setName(table.getName());
...@@ -108,7 +109,7 @@ public class LineageBuilder { ...@@ -108,7 +109,7 @@ public class LineageBuilder {
columnKeySet.add(key); columnKeySet.add(key);
} }
} }
correctTableIdMap.put(table.getId(),newTable.getId()); correctTableIdMap.put(table.getId(), newTable.getId());
tables.remove(table); tables.remove(table);
} }
tables.add(newTable); tables.add(newTable);
...@@ -124,4 +125,56 @@ public class LineageBuilder { ...@@ -124,4 +125,56 @@ public class LineageBuilder {
} }
return LineageResult.build(tables, relations); return LineageResult.build(tables, relations);
} }
public static LineageResult getColumnLineageByLogicalPlan(String statement) {
FlinkSqlPlus plus = FlinkSqlPlus.build(false);
List<LineageRel> lineageRelList = plus.getLineage(statement);
List<LineageRelation> relations = new ArrayList<>();
Map<String, LineageTable> tableMap = new HashMap<>();
int tableIndex = 1;
int relIndex = 1;
for (LineageRel lineageRel : lineageRelList) {
String sourceTablePath = lineageRel.getSourceTablePath();
String sourceTableId = null;
String targetTableId = null;
if (tableMap.containsKey(sourceTablePath)) {
LineageTable lineageTable = tableMap.get(sourceTablePath);
LineageColumn lineageColumn = LineageColumn.build(lineageRel.getSourceColumn(), lineageRel.getSourceColumn());
if (!lineageTable.getColumns().contains(lineageColumn)) {
lineageTable.getColumns().add(lineageColumn);
}
sourceTableId = lineageTable.getId();
} else {
tableIndex++;
LineageTable lineageTable = LineageTable.build(tableIndex + "", sourceTablePath);
lineageTable.getColumns().add(LineageColumn.build(lineageRel.getSourceColumn(), lineageRel.getSourceColumn()));
tableMap.put(sourceTablePath, lineageTable);
sourceTableId = lineageTable.getId();
}
String targetTablePath = lineageRel.getTargetTablePath();
if (tableMap.containsKey(targetTablePath)) {
LineageTable lineageTable = tableMap.get(targetTablePath);
LineageColumn lineageColumn = LineageColumn.build(lineageRel.getTargetColumn(), lineageRel.getTargetColumn());
if (!lineageTable.getColumns().contains(lineageColumn)) {
lineageTable.getColumns().add(lineageColumn);
}
targetTableId = lineageTable.getId();
} else {
tableIndex++;
LineageTable lineageTable = LineageTable.build(tableIndex + "", targetTablePath);
lineageTable.getColumns().add(LineageColumn.build(lineageRel.getTargetColumn(), lineageRel.getTargetColumn()));
tableMap.put(targetTablePath, lineageTable);
targetTableId = lineageTable.getId();
}
LineageRelation lineageRelation = LineageRelation.build(sourceTableId, targetTableId, lineageRel.getSourceColumn(), lineageRel.getTargetColumn());
if (!relations.contains(lineageRelation)) {
relIndex++;
lineageRelation.setId(relIndex + "");
relations.add(lineageRelation);
}
}
List<LineageTable> tables = new ArrayList<>(tableMap.values());
return LineageResult.build(tables, relations);
}
} }
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import java.util.Objects;
/** /**
* LineageColumn * LineageColumn
* *
...@@ -56,4 +58,21 @@ public class LineageColumn { ...@@ -56,4 +58,21 @@ public class LineageColumn {
public void setTitle(String title) { public void setTitle(String title) {
this.title = title; this.title = title;
} }
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LineageColumn that = (LineageColumn) o;
return Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
} }
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import java.util.Objects;
/** /**
* LineageRelation * LineageRelation
* *
...@@ -35,6 +37,13 @@ public class LineageRelation { ...@@ -35,6 +37,13 @@ public class LineageRelation {
public LineageRelation() { public LineageRelation() {
} }
public LineageRelation(String srcTableId, String tgtTableId, String srcTableColName, String tgtTableColName) {
this.srcTableId = srcTableId;
this.tgtTableId = tgtTableId;
this.srcTableColName = srcTableColName;
this.tgtTableColName = tgtTableColName;
}
public LineageRelation(String id, String srcTableId, String tgtTableId, String srcTableColName, String tgtTableColName) { public LineageRelation(String id, String srcTableId, String tgtTableId, String srcTableColName, String tgtTableColName) {
this.id = id; this.id = id;
this.srcTableId = srcTableId; this.srcTableId = srcTableId;
...@@ -43,6 +52,10 @@ public class LineageRelation { ...@@ -43,6 +52,10 @@ public class LineageRelation {
this.tgtTableColName = tgtTableColName; this.tgtTableColName = tgtTableColName;
} }
public static LineageRelation build(String srcTableId, String tgtTableId, String srcTableColName, String tgtTableColName) {
return new LineageRelation(srcTableId, tgtTableId, srcTableColName, tgtTableColName);
}
public static LineageRelation build(String id, String srcTableId, String tgtTableId, String srcTableColName, String tgtTableColName) { public static LineageRelation build(String id, String srcTableId, String tgtTableId, String srcTableColName, String tgtTableColName) {
return new LineageRelation(id, srcTableId, tgtTableId, srcTableColName, tgtTableColName); return new LineageRelation(id, srcTableId, tgtTableId, srcTableColName, tgtTableColName);
} }
...@@ -86,4 +99,22 @@ public class LineageRelation { ...@@ -86,4 +99,22 @@ public class LineageRelation {
public void setTgtTableColName(String tgtTableColName) { public void setTgtTableColName(String tgtTableColName) {
this.tgtTableColName = tgtTableColName; this.tgtTableColName = tgtTableColName;
} }
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LineageRelation that = (LineageRelation) o;
return Objects.equals(srcTableId, that.srcTableId) && Objects.equals(tgtTableId, that.tgtTableId) && Objects.equals(srcTableColName, that.srcTableColName) &&
Objects.equals(tgtTableColName, that.tgtTableColName);
}
@Override
public int hashCode() {
return Objects.hash(srcTableId, tgtTableId, srcTableColName, tgtTableColName);
}
} }
...@@ -19,11 +19,11 @@ ...@@ -19,11 +19,11 @@
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import com.dlink.explainer.ca.TableCA;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.dlink.explainer.ca.TableCA;
/** /**
* LineageTable * LineageTable
* *
...@@ -38,6 +38,16 @@ public class LineageTable { ...@@ -38,6 +38,16 @@ public class LineageTable {
public LineageTable() { public LineageTable() {
} }
public LineageTable(String id, String name) {
this.id = id;
this.name = name;
this.columns = new ArrayList<>();
}
public static LineageTable build(String id, String name) {
return new LineageTable(id, name);
}
public static LineageTable build(TableCA tableCA) { public static LineageTable build(TableCA tableCA) {
LineageTable lineageTable = new LineageTable(); LineageTable lineageTable = new LineageTable();
lineageTable.setId(tableCA.getId().toString()); lineageTable.setId(tableCA.getId().toString());
......
...@@ -23,6 +23,7 @@ import com.dlink.executor.Executor; ...@@ -23,6 +23,7 @@ import com.dlink.executor.Executor;
import com.dlink.explainer.Explainer; import com.dlink.explainer.Explainer;
import com.dlink.explainer.ca.ColumnCAResult; import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.TableCAResult; import com.dlink.explainer.ca.TableCAResult;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
...@@ -115,4 +116,7 @@ public class FlinkSqlPlus { ...@@ -115,4 +116,7 @@ public class FlinkSqlPlus {
return explainer.getJobPlanInfo(statement); return explainer.getJobPlanInfo(statement);
} }
public List<LineageRel> getLineage(String statement){
return explainer.getLineage(statement);
}
} }
...@@ -22,6 +22,7 @@ package com.dlink.executor; ...@@ -22,6 +22,7 @@ package com.dlink.executor;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.interceptor.FlinkInterceptorResult; import com.dlink.interceptor.FlinkInterceptorResult;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
...@@ -45,6 +46,8 @@ import java.util.HashMap; ...@@ -45,6 +46,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.utils.LineageContext;
import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
...@@ -377,4 +380,8 @@ public abstract class Executor { ...@@ -377,4 +380,8 @@ public abstract class Executor {
public boolean parseAndLoadConfiguration(String statement) { public boolean parseAndLoadConfiguration(String statement) {
return stEnvironment.parseAndLoadConfiguration(statement, environment, setConfig); return stEnvironment.parseAndLoadConfiguration(statement, environment, setConfig);
} }
public List<LineageRel> getLineage(String statement){
return stEnvironment.getLineage(statement);
}
} }
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
<flink.version>1.14.5</flink.version> <flink.version>1.14.5</flink.version>
<flink.guava.version>14.0</flink.guava.version> <flink.guava.version>14.0</flink.guava.version>
<flinkcdc.version>2.2.1</flinkcdc.version> <flinkcdc.version>2.2.1</flinkcdc.version>
<scala.binary.version>2.12</scala.binary.version>
<commons.version>1.3.1</commons.version> <commons.version>1.3.1</commons.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
...@@ -64,7 +65,6 @@ ...@@ -64,7 +65,6 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId> <artifactId>flink-shaded-guava</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope> <scope>provided</scope>
...@@ -87,7 +87,12 @@ ...@@ -87,7 +87,12 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
...@@ -160,8 +165,13 @@ ...@@ -160,8 +165,13 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.doris</groupId> <groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.14_2.12</artifactId> <artifactId>flink-doris-connector-1.14_${scala.binary.version}</artifactId>
<version>1.0.3</version> <version>1.0.3</version>
</dependency> </dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.3_flink-1.14_${scala.binary.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
import React, {useEffect, useState} from 'react'; import React, {useEffect, useState} from 'react';
import {Form, Input, List, Switch} from 'antd'; import {Form, Input, List, Switch} from 'antd';
import {connect} from "umi"; import {connect} from "umi";
...@@ -29,62 +28,72 @@ type FlinkConfigProps = { ...@@ -29,62 +28,72 @@ type FlinkConfigProps = {
sqlSubmitJarParas: SettingsStateType['sqlSubmitJarParas']; sqlSubmitJarParas: SettingsStateType['sqlSubmitJarParas'];
sqlSubmitJarMainAppClass: SettingsStateType['sqlSubmitJarMainAppClass']; sqlSubmitJarMainAppClass: SettingsStateType['sqlSubmitJarMainAppClass'];
useRestAPI: SettingsStateType['useRestAPI']; useRestAPI: SettingsStateType['useRestAPI'];
useLogicalPlan: SettingsStateType['useLogicalPlan'];
sqlSeparator: SettingsStateType['sqlSeparator']; sqlSeparator: SettingsStateType['sqlSeparator'];
dispatch: any; dispatch: any;
}; };
const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => { const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
const {sqlSubmitJarPath, sqlSubmitJarParas, sqlSubmitJarMainAppClass,useRestAPI,sqlSeparator, dispatch} = props; const {
sqlSubmitJarPath,
sqlSubmitJarParas,
sqlSubmitJarMainAppClass,
useRestAPI,
useLogicalPlan,
sqlSeparator,
dispatch
} = props;
const [editName, setEditName] = useState<string>(''); const [editName, setEditName] = useState<string>('');
const [formValues, setFormValues] = useState(props); const [formValues, setFormValues] = useState(props);
const [form] = Form.useForm(); const [form] = Form.useForm();
useEffect(()=>{ useEffect(() => {
form.setFieldsValue(props); form.setFieldsValue(props);
},[props]); }, [props]);
const getData = () => [ const getData = () => [
{ {
title: '提交FlinkSQL的Jar文件路径', title: '提交FlinkSQL的Jar文件路径',
description: ( description: (
editName!='sqlSubmitJarPath'? editName != 'sqlSubmitJarPath' ?
(sqlSubmitJarPath?sqlSubmitJarPath:'未设置'):( (sqlSubmitJarPath ? sqlSubmitJarPath : '未设置') : (
<Input <Input
id='sqlSubmitJarPath' id='sqlSubmitJarPath'
defaultValue={sqlSubmitJarPath} defaultValue={sqlSubmitJarPath}
onChange={onChange} onChange={onChange}
placeholder="hdfs:///dlink/jar/dlink-app.jar" />)), placeholder="hdfs:///dlink/jar/dlink-app.jar"/>)),
actions: editName!='sqlSubmitJarPath'?[<a onClick={({}) => handleEditClick('sqlSubmitJarPath')}>修改</a>]: actions: editName != 'sqlSubmitJarPath' ? [<a onClick={({}) => handleEditClick('sqlSubmitJarPath')}>修改</a>] :
[<a onClick={({}) => handleSaveClick('sqlSubmitJarPath')}>保存</a>, [<a onClick={({}) => handleSaveClick('sqlSubmitJarPath')}>保存</a>,
<a onClick={({}) => handleCancelClick()}>取消</a>], <a onClick={({}) => handleCancelClick()}>取消</a>],
}, },
{ {
title: '提交FlinkSQL的Jar的主类入参', title: '提交FlinkSQL的Jar的主类入参',
description: ( description: (
editName!='sqlSubmitJarParas'? editName != 'sqlSubmitJarParas' ?
(sqlSubmitJarParas?sqlSubmitJarParas:'未设置'):(<Input (sqlSubmitJarParas ? sqlSubmitJarParas : '未设置') : (<Input
id='sqlSubmitJarParas' id='sqlSubmitJarParas'
defaultValue={sqlSubmitJarParas} defaultValue={sqlSubmitJarParas}
onChange={onChange} onChange={onChange}
placeholder="" />)), placeholder=""/>)),
actions: editName!='sqlSubmitJarParas'?[<a onClick={({}) => handleEditClick('sqlSubmitJarParas')}>修改</a>]: actions: editName != 'sqlSubmitJarParas' ? [<a onClick={({}) => handleEditClick('sqlSubmitJarParas')}>修改</a>] :
[<a onClick={({}) => handleSaveClick('sqlSubmitJarParas')}>保存</a>, [<a onClick={({}) => handleSaveClick('sqlSubmitJarParas')}>保存</a>,
<a onClick={({}) => handleCancelClick()}>取消</a>], <a onClick={({}) => handleCancelClick()}>取消</a>],
}, },
{ {
title: '提交FlinkSQL的Jar的主类', title: '提交FlinkSQL的Jar的主类',
description: ( description: (
editName!='sqlSubmitJarMainAppClass'? editName != 'sqlSubmitJarMainAppClass' ?
(sqlSubmitJarMainAppClass?sqlSubmitJarMainAppClass:'未设置'):(<Input (sqlSubmitJarMainAppClass ? sqlSubmitJarMainAppClass : '未设置') : (<Input
id='sqlSubmitJarMainAppClass' id='sqlSubmitJarMainAppClass'
defaultValue={sqlSubmitJarMainAppClass} defaultValue={sqlSubmitJarMainAppClass}
onChange={onChange} onChange={onChange}
placeholder="com.dlink.app.MainApp" />)), placeholder="com.dlink.app.MainApp"/>)),
actions: editName!='sqlSubmitJarMainAppClass'?[<a onClick={({}) => handleEditClick('sqlSubmitJarMainAppClass')}>修改</a>]: actions: editName != 'sqlSubmitJarMainAppClass' ? [<a
onClick={({}) => handleEditClick('sqlSubmitJarMainAppClass')}>修改</a>] :
[<a onClick={({}) => handleSaveClick('sqlSubmitJarMainAppClass')}>保存</a>, [<a onClick={({}) => handleSaveClick('sqlSubmitJarMainAppClass')}>保存</a>,
<a onClick={({}) => handleCancelClick()}>取消</a>], <a onClick={({}) => handleCancelClick()}>取消</a>],
},{ }, {
title: '使用 RestAPI', title: '使用 RestAPI',
description: '启用后,Flink 任务的 savepoint、停止等操作将通过 JobManager 的 RestAPI 进行', description: '启用后,Flink 任务的 savepoint、停止等操作将通过 JobManager 的 RestAPI 进行',
actions: [ actions: [
...@@ -94,41 +103,52 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => { ...@@ -94,41 +103,52 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
<Switch checkedChildren="启用" unCheckedChildren="禁用" <Switch checkedChildren="启用" unCheckedChildren="禁用"
checked={useRestAPI} checked={useRestAPI}
/></Form.Item>], /></Form.Item>],
},{ }, {
title: 'FlinkSQL语句分割符', title: 'FlinkSQL语句分割符',
description: ( description: (
editName!='sqlSeparator'? editName != 'sqlSeparator' ?
(sqlSeparator?sqlSeparator:'未设置'):(<Input (sqlSeparator ? sqlSeparator : '未设置') : (<Input
id='sqlSeparator' id='sqlSeparator'
defaultValue={sqlSeparator} defaultValue={sqlSeparator}
onChange={onChange} onChange={onChange}
placeholder=";" />)), placeholder=";"/>)),
actions: editName!='sqlSeparator'?[<a onClick={({}) => handleEditClick('sqlSeparator')}>修改</a>]: actions: editName != 'sqlSeparator' ? [<a onClick={({}) => handleEditClick('sqlSeparator')}>修改</a>] :
[<a onClick={({}) => handleSaveClick('sqlSeparator')}>保存</a>, [<a onClick={({}) => handleSaveClick('sqlSeparator')}>保存</a>,
<a onClick={({}) => handleCancelClick()}>取消</a>], <a onClick={({}) => handleCancelClick()}>取消</a>],
}, },
{
title: '使用逻辑计划计算血缘',
description: '在计算 Flink 任务的字段血缘分析时是否基于逻辑计划进行,只支持 1.14 版本',
actions: [
<Form.Item
name="useLogicalPlan" valuePropName="checked"
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
checked={useLogicalPlan}
/></Form.Item>],
},
]; ];
const onChange = e => { const onChange = e => {
let values = {}; let values = {};
values[e.target.id]=e.target.value; values[e.target.id] = e.target.value;
setFormValues({...formValues,...values}); setFormValues({...formValues, ...values});
}; };
const onValuesChange = (change:any,all:any) => { const onValuesChange = (change: any, all: any) => {
let values = {}; let values = {};
for(let key in change){ for (let key in change) {
values[key]=all[key]; values[key] = all[key];
} }
saveSettings(values, dispatch); saveSettings(values, dispatch);
}; };
const handleEditClick = (name:string)=>{ const handleEditClick = (name: string) => {
setEditName(name); setEditName(name);
}; };
const handleSaveClick = (name:string)=>{ const handleSaveClick = (name: string) => {
if(formValues[name]!=props[name]) { if (formValues[name] != props[name]) {
let values = {}; let values = {};
values[name] = formValues[name]; values[name] = formValues[name];
saveSettings(values, dispatch); saveSettings(values, dispatch);
...@@ -136,7 +156,7 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => { ...@@ -136,7 +156,7 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
setEditName(''); setEditName('');
}; };
const handleCancelClick = ()=>{ const handleCancelClick = () => {
setFormValues(props); setFormValues(props);
setEditName(''); setEditName('');
}; };
...@@ -167,5 +187,6 @@ export default connect(({Settings}: { Settings: SettingsStateType }) => ({ ...@@ -167,5 +187,6 @@ export default connect(({Settings}: { Settings: SettingsStateType }) => ({
sqlSubmitJarParas: Settings.sqlSubmitJarParas, sqlSubmitJarParas: Settings.sqlSubmitJarParas,
sqlSubmitJarMainAppClass: Settings.sqlSubmitJarMainAppClass, sqlSubmitJarMainAppClass: Settings.sqlSubmitJarMainAppClass,
useRestAPI: Settings.useRestAPI, useRestAPI: Settings.useRestAPI,
useLogicalPlan: Settings.useLogicalPlan,
sqlSeparator: Settings.sqlSeparator, sqlSeparator: Settings.sqlSeparator,
}))(FlinkConfigView); }))(FlinkConfigView);
...@@ -17,22 +17,21 @@ ...@@ -17,22 +17,21 @@
* *
*/ */
import {Reducer} from "umi";
import {Effect, Reducer} from "umi";
export type SettingsStateType = { export type SettingsStateType = {
sqlSubmitJarPath: string, sqlSubmitJarPath: string,
sqlSubmitJarParas: string, sqlSubmitJarParas: string,
sqlSubmitJarMainAppClass: string, sqlSubmitJarMainAppClass: string,
useRestAPI: boolean, useRestAPI: boolean,
useLogicalPlan: boolean,
sqlSeparator: string, sqlSeparator: string,
}; };
export type ModelType = { export type ModelType = {
namespace: string; namespace: string;
state: SettingsStateType; state: SettingsStateType;
effects: { effects: {};
};
reducers: { reducers: {
saveSettings: Reducer<SettingsStateType>; saveSettings: Reducer<SettingsStateType>;
}; };
...@@ -41,15 +40,13 @@ export type ModelType = { ...@@ -41,15 +40,13 @@ export type ModelType = {
const SettingsModel: ModelType = { const SettingsModel: ModelType = {
namespace: 'Settings', namespace: 'Settings',
state: { state: {
sqlSubmitJarPath:'', sqlSubmitJarPath: '',
sqlSubmitJarParas:'', sqlSubmitJarParas: '',
sqlSubmitJarMainAppClass:'', sqlSubmitJarMainAppClass: '',
useRestAPI:true, useRestAPI: true,
}, },
effects: { effects: {},
},
reducers: { reducers: {
saveSettings(state, {payload}) { saveSettings(state, {payload}) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment