Commit 67b3294e authored by JinPeng's avatar JinPeng

[Fix-263][dlink-admin] Fix the bug of abnormal blood relationship analysis

parent 1e5b06c1
......@@ -13,5 +13,6 @@ import lombok.Setter;
@Setter
public class StudioCADTO extends AbstractStatementDTO {
// It's useless for the time being
private Boolean statementSet;
private Integer type;
}
......@@ -114,7 +114,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
@Override
public LineageResult getLineage(Integer id) {
return LineageBuilder.getLineage(getJobInfoDetail(id).getHistory().getStatement());
History history = getJobInfoDetail(id).getHistory();
return LineageBuilder.getLineage(history.getStatement(), history.getConfig().get("useStatementSet").asBoolean());
}
@Override
......
......@@ -257,7 +257,7 @@ public class StudioServiceImpl implements StudioService {
@Override
public LineageResult getLineage(StudioCADTO studioCADTO) {
addFlinkSQLEnv(studioCADTO);
return LineageBuilder.getLineage(studioCADTO.getStatement());
return LineageBuilder.getLineage(studioCADTO.getStatement(), studioCADTO.getStatementSet());
}
@Override
......
......@@ -43,6 +43,13 @@ public class Explainer {
public Explainer(Executor executor) {
this.executor = executor;
this.useStatementSet = true;
init();
}
public Explainer(Executor executor, boolean useStatementSet) {
this.executor = executor;
this.useStatementSet = useStatementSet;
init();
}
......@@ -356,8 +363,9 @@ public class Explainer {
}
}
List<ColumnCAResult> results = new ArrayList<>();
for (int i = 0; i < strPlans.size(); i++) {
List<Trans> trans = translateTrans(translateObjectNode(strPlans.get(i)));
// statementsets
if (useStatementSet) {
List<Trans> trans = translateTrans(translateObjectNode(strPlans));
LineageColumnGenerator generator = LineageColumnGenerator.build(trans);
LineageTableGenerator tableGenerator = LineageTableGenerator.build(trans);
tableGenerator.translate();
......@@ -365,7 +373,20 @@ public class Explainer {
generator.translate();
ColumnCAResult columnCAResult = new ColumnCAResult(generator);
correctColumn(columnCAResult);
correctSinkSets(columnCAResult);
results.add(columnCAResult);
} else {
for (int i = 0; i < strPlans.size(); i++) {
List<Trans> trans = translateTrans(translateObjectNode(strPlans.get(i)));
LineageColumnGenerator generator = LineageColumnGenerator.build(trans);
LineageTableGenerator tableGenerator = LineageTableGenerator.build(trans);
tableGenerator.translate();
generator.setTableCAS(tableGenerator.getTables());
generator.translate();
ColumnCAResult columnCAResult = new ColumnCAResult(generator);
correctColumn(columnCAResult);
results.add(columnCAResult);
}
}
return results;
}
......@@ -383,7 +404,7 @@ public class Explainer {
if (!sinkColumnName.equals(oldFields.get(i))) {
for (Map.Entry<Integer, ColumnCA> item : columnCAResult.getColumnCASMaps().entrySet()) {
ColumnCA columnCA = item.getValue();
if (columnCA.getTableId() == tableCA.getId() && columnCA.getName().equals(oldFields.get(i))) {
if (columnCA.getTableId().equals(tableCA.getId()) && columnCA.getName().equals(oldFields.get(i))) {
columnCA.setName(sinkColumnName);
fields.set(i, sinkColumnName);
}
......@@ -397,27 +418,59 @@ public class Explainer {
List<String> columnList = FlinkUtil.getFieldNamesFromCatalogManager(catalogManager, tableCA.getCatalog(), tableCA.getDatabase(), tableCA.getTable());
List<String> fields = tableCA.getFields();
int i = 0;
List<Integer> idList = new ArrayList<>();
while (i < fields.size()) {
if (!columnList.contains(fields.get(i))) {
List<Integer> idList = new ArrayList<>();
for (Map.Entry<Integer, ColumnCA> item : columnCAResult.getColumnCASMaps().entrySet()) {
if (item.getValue().getName().equals(fields.get(i)) && item.getValue().getTableId() == tableCA.getId()) {
if (item.getValue().getName().equals(fields.get(i)) && item.getValue().getTableId().equals(tableCA.getId())) {
idList.add(item.getValue().getId());
break;
}
}
for (Integer id : idList) {
fields.remove(i);
} else {
i++;
}
}
for (Integer id : idList) {
for (NodeRel nodeRel : columnCAResult.getColumnCASRelChain()) {
if (nodeRel.getPreId().equals(id)) {
columnCAResult.getColumnCASMaps().remove(id);
columnCAResult.getColumnCASRelChain().remove(nodeRel);
break;
}
}
}
}
}
private void correctSinkSets(ColumnCAResult columnCAResult) {
for (TableCA tableCA : columnCAResult.getTableCAS()) {
if (tableCA.getType().equals("Data Sink")) {
for (Map.Entry<Integer, ColumnCA> item : columnCAResult.getColumnCASMaps().entrySet()) {
if (item.getValue().getTableId().equals(tableCA.getId())) {
List<NodeRel> addNodeRels = new ArrayList<>();
List<NodeRel> delNodeRels = new ArrayList<>();
for (NodeRel nodeRel : columnCAResult.getColumnCASRelChain()) {
if (nodeRel.getPreId() == id) {
columnCAResult.getColumnCASMaps().remove(id);
columnCAResult.getColumnCASRelChain().remove(nodeRel);
break;
if (nodeRel.getPreId().equals(item.getValue().getId())) {
for (NodeRel nodeRel2 : columnCAResult.getColumnCASRelChain()) {
if (columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getSufId()) && columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getPreId()) && columnCAResult.getColumnCASMaps().containsKey(nodeRel.getSufId()) &&
columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getTableId().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getTableId()) &&
columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getName().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getName()) &&
!columnCAResult.getColumnCASMaps().get(nodeRel2.getPreId()).getType().equals("Data Sink")) {
addNodeRels.add(new NodeRel(nodeRel2.getPreId(),nodeRel.getPreId()));
}
}
delNodeRels.add(nodeRel);
}
}
for (NodeRel nodeRel : addNodeRels){
columnCAResult.getColumnCASRelChain().add(nodeRel);
}
for (NodeRel nodeRel : delNodeRels){
columnCAResult.getColumnCASRelChain().remove(nodeRel);
}
}
fields.remove(i);
} else {
i++;
}
}
}
......@@ -427,6 +480,10 @@ public class Explainer {
return executor.getStreamGraph(statement);
}
private ObjectNode translateObjectNode(List<String> statement) {
return executor.getStreamGraph(statement);
}
private List<Trans> translateTrans(ObjectNode plan) {
return new TransGenerator(plan).translateTrans();
}
......
......@@ -67,7 +67,6 @@ public class TableCA implements ICA {
List<String> tableList = trans.getTable();
this.id = trans.getId();
this.parentId = trans.getParentId();
this.name = trans.getName();
List<Field> select = trans.getSelect();
List<String> fieldList = new ArrayList<>();
for (Field field : select) {
......@@ -94,6 +93,7 @@ public class TableCA implements ICA {
this.table = strings[0];
}
}
this.name = this.catalog + "." + this.database + "." + this.table;
}
public static TableCA build(Trans trans) {
......
......@@ -5,8 +5,7 @@ import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA;
import com.dlink.plus.FlinkSqlPlus;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
/**
* LineageBuilder
......@@ -17,7 +16,11 @@ import java.util.List;
public class LineageBuilder {
public static LineageResult getLineage(String statement) {
FlinkSqlPlus plus = FlinkSqlPlus.build();
return getLineage(statement, true);
}
public static LineageResult getLineage(String statement, boolean statementSet) {
FlinkSqlPlus plus = FlinkSqlPlus.build(statementSet);
List<ColumnCAResult> columnCAResults = plus.explainSqlColumnCA(statement);
List<LineageTable> tables = new ArrayList<>();
List<LineageRelation> relations = new ArrayList<>();
......@@ -26,14 +29,72 @@ public class LineageBuilder {
for (TableCA tableCA : item.getTableCAS()) {
tables.add(LineageTable.build(tableCA));
}
Set<String> keySet = new HashSet<>();
for (NodeRel nodeRel : item.getColumnCASRelChain()) {
index++;
relations.add(LineageRelation.build(index + "",
item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().toString(),
item.getColumnCASMaps().get(nodeRel.getSufId()).getTableId().toString(),
item.getColumnCASMaps().get(nodeRel.getPreId()).getName(),
item.getColumnCASMaps().get(nodeRel.getSufId()).getName()
));
if( item.getColumnCASMaps().containsKey(nodeRel.getPreId())&&item.getColumnCASMaps().containsKey(nodeRel.getSufId())
&& !item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().equals(item.getColumnCASMaps().get(nodeRel.getSufId()).getTableId())) {
String key = item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().toString() + "@" +
item.getColumnCASMaps().get(nodeRel.getSufId()).getTableId().toString() + "@" +
item.getColumnCASMaps().get(nodeRel.getPreId()).getName() + "@" +
item.getColumnCASMaps().get(nodeRel.getSufId()).getName();
//去重
if(!keySet.contains(key)){
index++;
relations.add(LineageRelation.build(index + "",
item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().toString(),
item.getColumnCASMaps().get(nodeRel.getSufId()).getTableId().toString(),
item.getColumnCASMaps().get(nodeRel.getPreId()).getName(),
item.getColumnCASMaps().get(nodeRel.getSufId()).getName()
));
keySet.add(key);
}
}
}
}
//获取重复表集合
List<List<LineageTable>> repeatTablesList = new ArrayList<>();
for (int i = 0; i < tables.size() - 1; i++) {
List<LineageTable> repeatTables = new ArrayList<>();
for (int j = i + 1; j < tables.size(); j++) {
if (tables.get(i).getName().equals(tables.get(j).getName())) {
repeatTables.add(tables.get(j));
}
}
if (repeatTables.size() > 0) {
repeatTables.add(tables.get(i));
repeatTablesList.add(repeatTables);
}
}
//重复表合并
Map<String,String> correctTableIdMap = new HashMap<>();
for(List<LineageTable> tableList : repeatTablesList){
LineageTable newTable = new LineageTable();
Set<String> columnKeySet = new HashSet<>();
for(LineageTable table: tableList){
if(newTable.getId() == null || newTable.getName() == null){
newTable.setId(table.getId());
newTable.setName(table.getName());
newTable.setColumns(new ArrayList<>());
}
for(LineageColumn column : table.getColumns()){
String key = column.getName() + "@&" + column.getTitle();
if(!columnKeySet.contains(key)){
newTable.getColumns().add(column);
columnKeySet.add(key);
}
}
correctTableIdMap.put(table.getId(),newTable.getId());
tables.remove(table);
}
tables.add(newTable);
}
//关系中id重新指向
for (LineageRelation relation : relations){
if(correctTableIdMap.containsKey(relation.getSrcTableId())){
relation.setSrcTableId(correctTableIdMap.get(relation.getSrcTableId()));
}
if(correctTableIdMap.containsKey(relation.getTgtTableId())){
relation.setTgtTableId(correctTableIdMap.get(relation.getTgtTableId()));
}
}
return LineageResult.build(tables, relations);
......
......@@ -21,14 +21,24 @@ public class FlinkSqlPlus {
private Executor executor;
private Explainer explainer;
private boolean statementSet;
public FlinkSqlPlus(Executor executor) {
this.executor = executor;
this.explainer = new Explainer(executor);
}
public FlinkSqlPlus(Executor executor, boolean statementSet) {
this.executor = executor;
this.explainer = new Explainer(executor, statementSet);
}
public static FlinkSqlPlus build() {
return new FlinkSqlPlus(Executor.build());
return build(true);
}
public static FlinkSqlPlus build(boolean statementSet) {
return new FlinkSqlPlus(Executor.build(), statementSet);
}
public List<SqlResult> executeSql(String sql) {
......
......@@ -16,6 +16,7 @@ const StudioCA = (props: any) => {
const handleLineage=()=>{
const res = getLineage({
statement:current.value,
statementSet:current.task.statementSet,
type: 1,
});
res.then((result)=>{
......
......@@ -45,5 +45,6 @@ export type StudioParam = {
}
export type CAParam = {
statement: string,
statementSet: boolean,
type: number,
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment