Unverified Commit 53ed9180 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-484] [core] Fix to job plan info was executed twice

[Fix-484] [core] Fix to job plan info was executed twice
parents 596167e2 85b04877
......@@ -65,7 +65,8 @@ public class StudioController {
try {
return Result.succeed(studioService.getJobPlan(studioExecuteDTO), "获取作业计划成功");
} catch (Exception e) {
return Result.failed("目前只支持获取 INSERT 语句的作业计划");
e.printStackTrace();
return Result.failed(e.getMessage());
}
}
......
package com.dlink.explainer;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.table.catalog.CatalogManager;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.Executor;
import com.dlink.explainer.ca.*;
import com.dlink.explainer.ca.ColumnCA;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA;
import com.dlink.explainer.ca.TableCAGenerator;
import com.dlink.explainer.ca.TableCAResult;
import com.dlink.explainer.lineage.LineageColumnGenerator;
import com.dlink.explainer.lineage.LineageTableGenerator;
import com.dlink.explainer.trans.Trans;
......@@ -21,13 +35,6 @@ import com.dlink.utils.LogUtil;
import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.table.catalog.CatalogManager;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Explainer
......@@ -94,7 +101,7 @@ public class Explainer {
ddl.add(new StatementParam(statement, operationType));
}
}
return new JobParam(ddl, trans, execute);
return new JobParam(Arrays.asList(statements), ddl, trans, execute);
}
public List<SqlExplainResult> explainSqlResult(String statement) {
......@@ -226,7 +233,6 @@ public class Explainer {
record = executor.explainSqlRecord(item.getValue());
if (Asserts.isNull(record)) {
record = new SqlExplainResult();
executor.getStreamGraph();
} else {
executor.executeSql(item.getValue());
}
......@@ -252,59 +258,31 @@ public class Explainer {
}
public ObjectNode getStreamGraph(String statement) {
List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults();
List<String> sqlPlans = new ArrayList<>();
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
if (jobParam.getTrans().size() > 0) {
return executor.getStreamGraph(jobParam.getStatements());
} else if (jobParam.getExecute().size() > 0) {
List<String> datastreamPlans = new ArrayList<>();
for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) {
String[] statements = SqlUtil.getStatements(item.getSql(), sqlSeparator);
for (String str : statements) {
sqlPlans.add(str);
}
continue;
}
if (Asserts.isNotNull(item.getType())
&& item.getType().equals(FlinkSQLConstant.DATASTREAM)) {
String[] statements = SqlUtil.getStatements(item.getSql(), sqlSeparator);
for (String str : statements) {
datastreamPlans.add(str);
}
}
for (StatementParam item : jobParam.getExecute()) {
datastreamPlans.add(item.getValue());
}
if (sqlPlans.size() > 0) {
return executor.getStreamGraph(sqlPlans);
} else if (datastreamPlans.size() > 0) {
return executor.getStreamGraphFromDataStream(sqlPlans);
return executor.getStreamGraphFromDataStream(datastreamPlans);
} else {
return mapper.createObjectNode();
}
}
public JobPlanInfo getJobPlanInfo(String statement) {
List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults();
List<String> sqlPlans = new ArrayList<>();
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
if (jobParam.getTrans().size() > 0) {
return executor.getJobPlanInfo(jobParam.getStatements());
} else if (jobParam.getExecute().size() > 0) {
List<String> datastreamPlans = new ArrayList<>();
for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) {
String[] statements = SqlUtil.getStatements(item.getSql(), sqlSeparator);
for (String str : statements) {
sqlPlans.add(str);
}
continue;
}
if (Asserts.isNotNull(item.getType())
&& item.getType().equals(FlinkSQLConstant.DATASTREAM)) {
String[] statements = SqlUtil.getStatements(item.getSql(), sqlSeparator);
for (String str : statements) {
datastreamPlans.add(str);
}
}
for (StatementParam item : jobParam.getExecute()) {
datastreamPlans.add(item.getValue());
}
if (sqlPlans.size() > 0) {
return executor.getJobPlanInfo(sqlPlans);
} else if (datastreamPlans.size() > 0) {
return executor.getJobPlanInfoFromDataStream(datastreamPlans);
} else {
return new JobPlanInfo("");
......@@ -451,20 +429,21 @@ public class Explainer {
for (NodeRel nodeRel : columnCAResult.getColumnCASRelChain()) {
if (nodeRel.getPreId().equals(item.getValue().getId())) {
for (NodeRel nodeRel2 : columnCAResult.getColumnCASRelChain()) {
if (columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getSufId()) && columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getPreId()) && columnCAResult.getColumnCASMaps().containsKey(nodeRel.getSufId()) &&
if (columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getSufId()) && columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getPreId()) &&
columnCAResult.getColumnCASMaps().containsKey(nodeRel.getSufId()) &&
columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getTableId().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getTableId()) &&
columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getName().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getName()) &&
!columnCAResult.getColumnCASMaps().get(nodeRel2.getPreId()).getType().equals("Data Sink")) {
addNodeRels.add(new NodeRel(nodeRel2.getPreId(),nodeRel.getPreId()));
addNodeRels.add(new NodeRel(nodeRel2.getPreId(), nodeRel.getPreId()));
}
}
delNodeRels.add(nodeRel);
}
}
for (NodeRel nodeRel : addNodeRels){
for (NodeRel nodeRel : addNodeRels) {
columnCAResult.getColumnCASRelChain().add(nodeRel);
}
for (NodeRel nodeRel : delNodeRels){
for (NodeRel nodeRel : delNodeRels) {
columnCAResult.getColumnCASRelChain().remove(nodeRel);
}
}
......
......@@ -9,6 +9,7 @@ import java.util.List;
* @since 2021/11/16
*/
public class JobParam {
private List<String> statements;
private List<StatementParam> ddl;
private List<StatementParam> trans;
private List<StatementParam> execute;
......@@ -18,12 +19,21 @@ public class JobParam {
this.trans = trans;
}
public JobParam(List<StatementParam> ddl, List<StatementParam> trans, List<StatementParam> execute) {
public JobParam(List<String> statements, List<StatementParam> ddl, List<StatementParam> trans, List<StatementParam> execute) {
this.statements = statements;
this.ddl = ddl;
this.trans = trans;
this.execute = execute;
}
public List<String> getStatements() {
return statements;
}
public void setStatements(List<String> statements) {
this.statements = statements;
}
public List<StatementParam> getDdl() {
return ddl;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment