Commit f9ae43cc authored by wenmo's avatar wenmo

新增字段级血缘分析

parent 51cab779
...@@ -49,7 +49,7 @@ Dinky(原 Dlink): ...@@ -49,7 +49,7 @@ Dinky(原 Dlink):
| | | 新增 作业生命周期管理 | 0.6.0 | | | | 新增 作业生命周期管理 | 0.6.0 |
| | | 新增 基于 Explain 的语法校验与逻辑解析 | 0.4.0 | | | | 新增 基于 Explain 的语法校验与逻辑解析 | 0.4.0 |
| | | 新增 JobPlan 图预览 | 0.5.0 | | | | 新增 JobPlan 图预览 | 0.5.0 |
| | | 新增 基于 StreamGraph 的表级血缘分析 | 0.4.0 | | | | 新增 基于 StreamGraph 的字段级血缘分析 | 0.6.0 |
| | | 新增 基于上下文元数据自动提示与补全 | 0.4.0 | | | | 新增 基于上下文元数据自动提示与补全 | 0.4.0 |
| | | 新增 自定义规则的自动提示与补全 | 0.4.0 | | | | 新增 自定义规则的自动提示与补全 | 0.4.0 |
| | | 新增 关键字高亮与代码缩略图 | 0.4.0 | | | | 新增 关键字高亮与代码缩略图 | 0.4.0 |
...@@ -102,7 +102,7 @@ Dinky(原 Dlink): ...@@ -102,7 +102,7 @@ Dinky(原 Dlink):
| | | 新增 作业日志 | dev | | | | 新增 作业日志 | dev |
| | | 新增 自动调优 | dev | | | | 新增 自动调优 | dev |
| | | 新增 FlinkSQL | 0.6.0 | | | | 新增 FlinkSQL | 0.6.0 |
| | | 新增 数据地图 | dev | | | | 新增 数据地图 | 0.6.0 |
| | | 新增 即席查询 | dev | | | | 新增 即席查询 | dev |
| | | 新增 历史版本 | dev | | | | 新增 历史版本 | dev |
| | | 新增 告警记录 | 0.6.0 | | | | 新增 告警记录 | 0.6.0 |
......
...@@ -98,10 +98,10 @@ public class JobInstanceController { ...@@ -98,10 +98,10 @@ public class JobInstanceController {
} }
/** /**
* 获取单的血缘分析 * 获取单任务实例的血缘分析
*/ */
@GetMapping("/getOneTableColumnCA") @GetMapping("/getLineage")
public Result getOneTableColumnCA(@RequestParam Integer id) { public Result getLineage(@RequestParam Integer id) {
return Result.succeed(jobInstanceService.getOneTableColumnCA(id), "刷新成功"); return Result.succeed(jobInstanceService.getLineage(id), "刷新成功");
} }
} }
...@@ -81,15 +81,11 @@ public class StudioController { ...@@ -81,15 +81,11 @@ public class StudioController {
} }
/** /**
* 获取单的血缘分析 * 获取单任务实例的血缘分析
*/ */
@PostMapping("/getCAByStatement") @PostMapping("/getLineage")
public Result getCAByStatement(@RequestBody StudioCADTO studioCADTO) { public Result getLineage(@RequestBody StudioCADTO studioCADTO) {
switch (studioCADTO.getType()){ return Result.succeed(studioService.getLineage(studioCADTO), "刷新成功");
case 1:return Result.succeed(studioService.getOneTableColumnCAByStatement(studioCADTO.getStatement()),"执行成功");
case 2:return Result.succeed(studioService.getColumnCAByStatement(studioCADTO.getStatement()),"执行成功");
default:return Result.failed("敬请期待");
}
} }
/** /**
......
...@@ -2,6 +2,7 @@ package com.dlink.service; ...@@ -2,6 +2,7 @@ package com.dlink.service;
import com.dlink.db.service.ISuperService; import com.dlink.db.service.ISuperService;
import com.dlink.explainer.ca.TableCANode; import com.dlink.explainer.ca.TableCANode;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.model.JobInfoDetail; import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance; import com.dlink.model.JobInstance;
import com.dlink.model.JobInstanceStatus; import com.dlink.model.JobInstanceStatus;
...@@ -24,5 +25,5 @@ public interface JobInstanceService extends ISuperService<JobInstance> { ...@@ -24,5 +25,5 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance); JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance);
List<TableCANode> getOneTableColumnCA(Integer id); LineageResult getLineage(Integer id);
} }
package com.dlink.service; package com.dlink.service;
import com.dlink.dto.SessionDTO; import com.dlink.dto.*;
import com.dlink.dto.SqlDTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.ca.ColumnCANode; import com.dlink.explainer.ca.ColumnCANode;
import com.dlink.explainer.ca.TableCANode; import com.dlink.explainer.ca.TableCANode;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import com.dlink.result.SelectResult; import com.dlink.result.SelectResult;
...@@ -44,11 +42,7 @@ public interface StudioService { ...@@ -44,11 +42,7 @@ public interface StudioService {
List<SessionInfo> listSession(String createUser); List<SessionInfo> listSession(String createUser);
List<TableCANode> getOneTableCAByStatement(String statement); LineageResult getLineage(StudioCADTO studioCADTO);
List<TableCANode> getOneTableColumnCAByStatement(String statement);
List<ColumnCANode> getColumnCAByStatement(String statement);
List<JsonNode> listJobs(Integer clusterId); List<JsonNode> listJobs(Integer clusterId);
......
...@@ -6,6 +6,8 @@ import com.dlink.constant.FlinkRestResultConstant; ...@@ -6,6 +6,8 @@ import com.dlink.constant.FlinkRestResultConstant;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.explainer.ca.CABuilder; import com.dlink.explainer.ca.CABuilder;
import com.dlink.explainer.ca.TableCANode; import com.dlink.explainer.ca.TableCANode;
import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.mapper.JobInstanceMapper; import com.dlink.mapper.JobInstanceMapper;
import com.dlink.model.Cluster; import com.dlink.model.Cluster;
import com.dlink.model.History; import com.dlink.model.History;
...@@ -127,8 +129,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, ...@@ -127,8 +129,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
} }
@Override @Override
public List<TableCANode> getOneTableColumnCA(Integer id) { public LineageResult getLineage(Integer id) {
return CABuilder.getOneTableColumnCAByStatement(getJobInfoDetail(id).getHistory().getStatement()); return LineageBuilder.getLineage(getJobInfoDetail(id).getHistory().getStatement());
} }
} }
...@@ -8,6 +8,8 @@ import com.dlink.dto.*; ...@@ -8,6 +8,8 @@ import com.dlink.dto.*;
import com.dlink.explainer.ca.CABuilder; import com.dlink.explainer.ca.CABuilder;
import com.dlink.explainer.ca.ColumnCANode; import com.dlink.explainer.ca.ColumnCANode;
import com.dlink.explainer.ca.TableCANode; import com.dlink.explainer.ca.TableCANode;
import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
...@@ -254,18 +256,9 @@ public class StudioServiceImpl implements StudioService { ...@@ -254,18 +256,9 @@ public class StudioServiceImpl implements StudioService {
} }
@Override @Override
public List<TableCANode> getOneTableCAByStatement(String statement) { public LineageResult getLineage(StudioCADTO studioCADTO) {
return CABuilder.getOneTableCAByStatement(statement); addFlinkSQLEnv(studioCADTO);
} return LineageBuilder.getLineage(studioCADTO.getStatement());
@Override
public List<TableCANode> getOneTableColumnCAByStatement(String statement) {
return CABuilder.getOneTableColumnCAByStatement(statement);
}
@Override
public List<ColumnCANode> getColumnCAByStatement(String statement) {
return CABuilder.getColumnCAByStatement(statement);
} }
@Override @Override
......
...@@ -4,6 +4,8 @@ import com.dlink.assertion.Asserts; ...@@ -4,6 +4,8 @@ import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.explainer.ca.*; import com.dlink.explainer.ca.*;
import com.dlink.explainer.lineage.LineageColumnGenerator;
import com.dlink.explainer.lineage.LineageTableGenerator;
import com.dlink.explainer.trans.Trans; import com.dlink.explainer.trans.Trans;
import com.dlink.explainer.trans.TransGenerator; import com.dlink.explainer.trans.TransGenerator;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
...@@ -22,9 +24,7 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo; ...@@ -22,9 +24,7 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.*;
import java.util.Date;
import java.util.List;
/** /**
* Explainer * Explainer
...@@ -43,18 +43,18 @@ public class Explainer { ...@@ -43,18 +43,18 @@ public class Explainer {
this.executor = executor; this.executor = executor;
} }
public Explainer(Executor executor, boolean useStatementSet,String sqlSeparator) { public Explainer(Executor executor, boolean useStatementSet, String sqlSeparator) {
this.executor = executor; this.executor = executor;
this.useStatementSet = useStatementSet; this.useStatementSet = useStatementSet;
this.sqlSeparator = sqlSeparator; this.sqlSeparator = sqlSeparator;
} }
public static Explainer build(Executor executor){ public static Explainer build(Executor executor) {
return new Explainer(executor,false,";"); return new Explainer(executor, false, ";");
} }
public static Explainer build(Executor executor, boolean useStatementSet, String sqlSeparator){ public static Explainer build(Executor executor, boolean useStatementSet, String sqlSeparator) {
return new Explainer(executor,useStatementSet,sqlSeparator); return new Explainer(executor, useStatementSet, sqlSeparator);
} }
public JobParam pretreatStatements(String[] statements) { public JobParam pretreatStatements(String[] statements) {
...@@ -67,13 +67,13 @@ public class Explainer { ...@@ -67,13 +67,13 @@ public class Explainer {
continue; continue;
} }
SqlType operationType = Operations.getOperationType(statement); SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)|| operationType.equals(SqlType.SHOW) if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT) || operationType.equals(SqlType.SHOW)
|| operationType.equals(SqlType.DESCRIBE)|| operationType.equals(SqlType.DESC)) { || operationType.equals(SqlType.DESCRIBE) || operationType.equals(SqlType.DESC)) {
trans.add(new StatementParam(statement, operationType)); trans.add(new StatementParam(statement, operationType));
if (!useStatementSet) { if (!useStatementSet) {
break; break;
} }
} else if(operationType.equals(SqlType.EXECUTE)){ } else if (operationType.equals(SqlType.EXECUTE)) {
execute.add(new StatementParam(statement, operationType)); execute.add(new StatementParam(statement, operationType));
} else { } else {
ddl.add(new StatementParam(statement, operationType)); ddl.add(new StatementParam(statement, operationType));
...@@ -82,28 +82,27 @@ public class Explainer { ...@@ -82,28 +82,27 @@ public class Explainer {
return new JobParam(ddl, trans, execute); return new JobParam(ddl, trans, execute);
} }
@Deprecated
public List<SqlExplainResult> explainSqlResult(String statement) { public List<SqlExplainResult> explainSqlResult(String statement) {
String[] sqls = SqlUtil.getStatements(statement,sqlSeparator); String[] sqls = SqlUtil.getStatements(statement, sqlSeparator);
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>(); List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
int index = 1; int index = 1;
for (String item : sqls) { for (String item : sqls) {
SqlExplainResult record = new SqlExplainResult(); SqlExplainResult record = new SqlExplainResult();
String sql = ""; String sql = "";
try { try {
sql = FlinkInterceptor.pretreatStatement(executor,item); sql = FlinkInterceptor.pretreatStatement(executor, item);
if(Asserts.isNullString(sql)){ if (Asserts.isNullString(sql)) {
continue; continue;
} }
SqlType operationType = Operations.getOperationType(item); SqlType operationType = Operations.getOperationType(item);
if (operationType.equals(SqlType.INSERT)||operationType.equals(SqlType.SELECT)) { if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
record = executor.explainSqlRecord(sql); record = executor.explainSqlRecord(sql);
if(Asserts.isNull(record)){ if (Asserts.isNull(record)) {
continue; continue;
} }
}else{ } else {
record = executor.explainSqlRecord(sql); record = executor.explainSqlRecord(sql);
if(Asserts.isNull(record)){ if (Asserts.isNull(record)) {
continue; continue;
} }
executor.executeSql(sql); executor.executeSql(sql);
...@@ -128,7 +127,7 @@ public class Explainer { ...@@ -128,7 +127,7 @@ public class Explainer {
} }
public ExplainResult explainSql(String statement) { public ExplainResult explainSql(String statement) {
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement,sqlSeparator)); JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>(); List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
int index = 1; int index = 1;
boolean correct = true; boolean correct = true;
...@@ -140,7 +139,7 @@ public class Explainer { ...@@ -140,7 +139,7 @@ public class Explainer {
continue; continue;
} }
executor.executeSql(item.getValue()); executor.executeSql(item.getValue());
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
record.setError(e.getMessage()); record.setError(e.getMessage());
record.setExplainTrue(false); record.setExplainTrue(false);
...@@ -172,13 +171,13 @@ public class Explainer { ...@@ -172,13 +171,13 @@ public class Explainer {
record.setExplain(executor.explainStatementSet(inserts)); record.setExplain(executor.explainStatementSet(inserts));
record.setParseTrue(true); record.setParseTrue(true);
record.setExplainTrue(true); record.setExplainTrue(true);
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
record.setError(e.getMessage()); record.setError(e.getMessage());
record.setParseTrue(false); record.setParseTrue(false);
record.setExplainTrue(false); record.setExplainTrue(false);
correct = false; correct = false;
}finally { } finally {
record.setType("Modify DML"); record.setType("Modify DML");
record.setExplainTime(LocalDateTime.now()); record.setExplainTime(LocalDateTime.now());
record.setSql(sqlSet); record.setSql(sqlSet);
...@@ -186,20 +185,20 @@ public class Explainer { ...@@ -186,20 +185,20 @@ public class Explainer {
sqlExplainRecords.add(record); sqlExplainRecords.add(record);
} }
} }
}else{ } else {
for (StatementParam item : jobParam.getTrans()) { for (StatementParam item : jobParam.getTrans()) {
SqlExplainResult record = new SqlExplainResult(); SqlExplainResult record = new SqlExplainResult();
try { try {
record.setExplain(executor.explainSql(item.getValue())); record.setExplain(executor.explainSql(item.getValue()));
record.setParseTrue(true); record.setParseTrue(true);
record.setExplainTrue(true); record.setExplainTrue(true);
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
record.setError(e.getMessage()); record.setError(e.getMessage());
record.setParseTrue(false); record.setParseTrue(false);
record.setExplainTrue(false); record.setExplainTrue(false);
correct = false; correct = false;
}finally { } finally {
record.setType("Modify DML"); record.setType("Modify DML");
record.setExplainTime(LocalDateTime.now()); record.setExplainTime(LocalDateTime.now());
record.setSql(item.getValue()); record.setSql(item.getValue());
...@@ -216,12 +215,12 @@ public class Explainer { ...@@ -216,12 +215,12 @@ public class Explainer {
if (Asserts.isNull(record)) { if (Asserts.isNull(record)) {
record = new SqlExplainResult(); record = new SqlExplainResult();
executor.getStreamGraph(); executor.getStreamGraph();
}else { } else {
executor.executeSql(item.getValue()); executor.executeSql(item.getValue());
} }
record.setType("DATASTREAM"); record.setType("DATASTREAM");
record.setParseTrue(true); record.setParseTrue(true);
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
record.setError(e.getMessage()); record.setError(e.getMessage());
record.setExplainTrue(false); record.setExplainTrue(false);
...@@ -238,65 +237,65 @@ public class Explainer { ...@@ -238,65 +237,65 @@ public class Explainer {
record.setIndex(index++); record.setIndex(index++);
sqlExplainRecords.add(record); sqlExplainRecords.add(record);
} }
return new ExplainResult(correct,sqlExplainRecords.size(),sqlExplainRecords); return new ExplainResult(correct, sqlExplainRecords.size(), sqlExplainRecords);
} }
public ObjectNode getStreamGraph(String statement){ public ObjectNode getStreamGraph(String statement) {
List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults(); List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults();
List<String> sqlPlans = new ArrayList<>(); List<String> sqlPlans = new ArrayList<>();
List<String> datastreamPlans = new ArrayList<>(); List<String> datastreamPlans = new ArrayList<>();
for (SqlExplainResult item : sqlExplainRecords) { for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType()) if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) { && item.getType().contains(FlinkSQLConstant.DML)) {
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator); String[] statements = SqlUtil.getStatements(item.getSql(), sqlSeparator);
for(String str : statements){ for (String str : statements) {
sqlPlans.add(str); sqlPlans.add(str);
} }
continue; continue;
} }
if(Asserts.isNotNull(item.getType()) if (Asserts.isNotNull(item.getType())
&& item.getType().equals(FlinkSQLConstant.DATASTREAM)){ && item.getType().equals(FlinkSQLConstant.DATASTREAM)) {
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator); String[] statements = SqlUtil.getStatements(item.getSql(), sqlSeparator);
for(String str : statements){ for (String str : statements) {
datastreamPlans.add(str); datastreamPlans.add(str);
} }
} }
} }
if(sqlPlans.size()>0){ if (sqlPlans.size() > 0) {
return executor.getStreamGraph(sqlPlans); return executor.getStreamGraph(sqlPlans);
}else if(datastreamPlans.size()>0){ } else if (datastreamPlans.size() > 0) {
return executor.getStreamGraphFromDataStream(sqlPlans); return executor.getStreamGraphFromDataStream(sqlPlans);
}else{ } else {
return mapper.createObjectNode(); return mapper.createObjectNode();
} }
} }
public JobPlanInfo getJobPlanInfo(String statement){ public JobPlanInfo getJobPlanInfo(String statement) {
List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults(); List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults();
List<String> sqlPlans = new ArrayList<>(); List<String> sqlPlans = new ArrayList<>();
List<String> datastreamPlans = new ArrayList<>(); List<String> datastreamPlans = new ArrayList<>();
for (SqlExplainResult item : sqlExplainRecords) { for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType()) if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) { && item.getType().contains(FlinkSQLConstant.DML)) {
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator); String[] statements = SqlUtil.getStatements(item.getSql(), sqlSeparator);
for(String str : statements){ for (String str : statements) {
sqlPlans.add(str); sqlPlans.add(str);
} }
continue; continue;
} }
if(Asserts.isNotNull(item.getType()) if (Asserts.isNotNull(item.getType())
&& item.getType().equals(FlinkSQLConstant.DATASTREAM)){ && item.getType().equals(FlinkSQLConstant.DATASTREAM)) {
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator); String[] statements = SqlUtil.getStatements(item.getSql(), sqlSeparator);
for(String str : statements){ for (String str : statements) {
datastreamPlans.add(str); datastreamPlans.add(str);
} }
} }
} }
if(sqlPlans.size()>0){ if (sqlPlans.size() > 0) {
return executor.getJobPlanInfo(sqlPlans); return executor.getJobPlanInfo(sqlPlans);
}else if(datastreamPlans.size()>0){ } else if (datastreamPlans.size() > 0) {
return executor.getJobPlanInfoFromDataStream(datastreamPlans); return executor.getJobPlanInfoFromDataStream(datastreamPlans);
}else{ } else {
return new JobPlanInfo(""); return new JobPlanInfo("");
} }
} }
...@@ -326,7 +325,7 @@ public class Explainer { ...@@ -326,7 +325,7 @@ public class Explainer {
for (int i = 0; i < results.size(); i++) { for (int i = 0; i < results.size(); i++) {
TableCA sinkTableCA = (TableCA) results.get(i).getSinkTableCA(); TableCA sinkTableCA = (TableCA) results.get(i).getSinkTableCA();
if (Asserts.isNotNull(sinkTableCA)) { if (Asserts.isNotNull(sinkTableCA)) {
sinkTableCA.setFields(FlinkUtil.getFieldNamesFromCatalogManager(catalogManager,sinkTableCA.getCatalog(), sinkTableCA.getDatabase(), sinkTableCA.getTable())); sinkTableCA.setFields(FlinkUtil.getFieldNamesFromCatalogManager(catalogManager, sinkTableCA.getCatalog(), sinkTableCA.getDatabase(), sinkTableCA.getTable()));
} }
} }
} }
...@@ -352,16 +351,40 @@ public class Explainer { ...@@ -352,16 +351,40 @@ public class Explainer {
List<ColumnCAResult> results = new ArrayList<>(); List<ColumnCAResult> results = new ArrayList<>();
for (int i = 0; i < strPlans.size(); i++) { for (int i = 0; i < strPlans.size(); i++) {
List<Trans> trans = translateTrans(translateObjectNode(strPlans.get(i))); List<Trans> trans = translateTrans(translateObjectNode(strPlans.get(i)));
ColumnCAGenerator generator = new ColumnCAGenerator(trans); LineageColumnGenerator generator = LineageColumnGenerator.build(trans);
TableCAGenerator tableGenerator = new TableCAGenerator(trans); LineageTableGenerator tableGenerator = LineageTableGenerator.build(trans);
tableGenerator.translate(); tableGenerator.translate();
generator.setSourceTableCAS(tableGenerator.getSourceTableCAS()); generator.setTableCAS(tableGenerator.getTables());
generator.translate(); generator.translate();
results.add(new ColumnCAResult(generator)); ColumnCAResult columnCAResult = new ColumnCAResult(generator);
modifySinkColumn(columnCAResult);
results.add(columnCAResult);
} }
return results; return results;
} }
private void modifySinkColumn(ColumnCAResult columnCAResult) {
for (TableCA tableCA : columnCAResult.getTableCAS()) {
if (tableCA.getType().equals("Data Sink")) {
CatalogManager catalogManager = executor.getCatalogManager();
List<String> columnList = FlinkUtil.getFieldNamesFromCatalogManager(catalogManager, tableCA.getCatalog(), tableCA.getDatabase(), tableCA.getTable());
List<String> fields = tableCA.getFields();
for (int i = 0; i < columnList.size(); i++) {
String sinkColumnName = columnList.get(i);
if(!sinkColumnName.equals(fields.get(i))){
for (Map.Entry<Integer, ColumnCA> item : columnCAResult.getColumnCASMaps().entrySet()) {
ColumnCA columnCA = item.getValue();
if(columnCA.getTableId()==tableCA.getId()&&columnCA.getName().equals(fields.get(i))){
columnCA.setName(sinkColumnName);
fields.set(i,sinkColumnName);
}
}
}
}
}
}
}
private ObjectNode translateObjectNode(String statement) { private ObjectNode translateObjectNode(String statement) {
return executor.getStreamGraph(statement); return executor.getStreamGraph(statement);
} }
......
...@@ -12,6 +12,7 @@ import java.util.Set; ...@@ -12,6 +12,7 @@ import java.util.Set;
* @author wenmo * @author wenmo
* @since 2021/6/23 11:03 * @since 2021/6/23 11:03
**/ **/
@Deprecated
public class CABuilder { public class CABuilder {
public static List<TableCANode> getOneTableCAByStatement(String statement){ public static List<TableCANode> getOneTableCAByStatement(String statement){
...@@ -53,6 +54,7 @@ public class CABuilder { ...@@ -53,6 +54,7 @@ public class CABuilder {
return tableCANodes; return tableCANodes;
} }
@Deprecated
public static List<ColumnCANode> getColumnCAByStatement(String statement){ public static List<ColumnCANode> getColumnCAByStatement(String statement){
List<ColumnCANode> columnCANodes = new ArrayList<>(); List<ColumnCANode> columnCANodes = new ArrayList<>();
FlinkSqlPlus plus = FlinkSqlPlus.build(); FlinkSqlPlus plus = FlinkSqlPlus.build();
......
...@@ -7,6 +7,7 @@ import com.dlink.explainer.trans.OperatorTrans; ...@@ -7,6 +7,7 @@ import com.dlink.explainer.trans.OperatorTrans;
import com.dlink.explainer.trans.SinkTrans; import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.SourceTrans; import com.dlink.explainer.trans.SourceTrans;
import com.dlink.explainer.trans.Trans; import com.dlink.explainer.trans.Trans;
import com.dlink.utils.MapParseUtils;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -23,6 +24,7 @@ import java.util.Set; ...@@ -23,6 +24,7 @@ import java.util.Set;
* @author wenmo * @author wenmo
* @since 2021/6/22 * @since 2021/6/22
**/ **/
@Deprecated
public class ColumnCAGenerator implements CAGenerator { public class ColumnCAGenerator implements CAGenerator {
private List<Trans> transList; private List<Trans> transList;
private Map<Integer, Trans> transMaps; private Map<Integer, Trans> transMaps;
...@@ -33,7 +35,7 @@ public class ColumnCAGenerator implements CAGenerator { ...@@ -33,7 +35,7 @@ public class ColumnCAGenerator implements CAGenerator {
private Set<NodeRel> columnCASRel; private Set<NodeRel> columnCASRel;
private ICA sinkTableCA = null; private ICA sinkTableCA = null;
private String sinkTableName; private String sinkTableName;
private Integer index = 0; private Integer index = 1;
private List<Integer> sinkColumns; private List<Integer> sinkColumns;
private List<Integer> sourceColumns; private List<Integer> sourceColumns;
...@@ -151,15 +153,12 @@ public class ColumnCAGenerator implements CAGenerator { ...@@ -151,15 +153,12 @@ public class ColumnCAGenerator implements CAGenerator {
} }
private void searchSelect(TableCA tableCA, ColumnCA columnCA, OperatorTrans trans, String operation, String alias) { private void searchSelect(TableCA tableCA, ColumnCA columnCA, OperatorTrans trans, String operation, String alias) {
if(Asserts.isEquals(operation,columnCA.getAlias())||operation.contains(" " + columnCA.getAlias() + " ") || if(MapParseUtils.hasField(operation,columnCA.getAlias())) {
operation.contains("(" + columnCA.getAlias() + " ") ||
operation.contains(" " + columnCA.getAlias() + ")")) {
boolean isHad = false; boolean isHad = false;
Integer cid = null; Integer cid = null;
for (int j = 0; j < this.columnCAS.size(); j++) { for (int j = 0; j < this.columnCAS.size(); j++) {
ColumnCA columnCA1 = (ColumnCA) this.columnCAS.get(j); ColumnCA columnCA1 = (ColumnCA) this.columnCAS.get(j);
if (columnCA1.getTableCA().getId() == tableCA.getId() && if (columnCA1.getTableCA().getId() == tableCA.getId() && columnCA1.getName().equals(alias)) {
columnCA1.getName().equals(operation)) {
isHad = true; isHad = true;
cid = columnCA1.getId(); cid = columnCA1.getId();
break; break;
...@@ -167,10 +166,8 @@ public class ColumnCAGenerator implements CAGenerator { ...@@ -167,10 +166,8 @@ public class ColumnCAGenerator implements CAGenerator {
} }
if (!isHad) { if (!isHad) {
cid = index++; cid = index++;
String columnOperation = operation.replaceAll(" " + columnCA.getAlias() + " "," " + columnCA.getOperation() + " ") // String columnOperation = MapParseUtils.replaceField(operation,columnCA.getAlias(),columnCA.getOperation());
.replaceAll("\\(" + columnCA.getAlias() + " "," " + columnCA.getOperation() + " ") ColumnCA columnCA2 = new ColumnCA(cid, alias, alias, alias, alias,operation, tableCA,trans);
.replaceAll(" " + columnCA.getAlias() + "\\)"," " + columnCA.getOperation() + " ");
ColumnCA columnCA2 = new ColumnCA(cid, operation, alias, operation, operation,columnOperation, tableCA,trans);
this.columnCASMaps.put(cid, columnCA2); this.columnCASMaps.put(cid, columnCA2);
this.columnCAS.add(columnCA2); this.columnCAS.add(columnCA2);
buildColumnCAFields(tableCA, trans.getParentId(), columnCA2); buildColumnCAFields(tableCA, trans.getParentId(), columnCA2);
......
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import com.dlink.explainer.lineage.LineageColumnGenerator;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
...@@ -13,24 +14,68 @@ import java.util.Set; ...@@ -13,24 +14,68 @@ import java.util.Set;
* @author wenmo * @author wenmo
* @since 2021/6/22 * @since 2021/6/22
**/ **/
@Getter
@Setter
public class ColumnCAResult { public class ColumnCAResult {
private String sinkName; private List<TableCA> tableCAS;
private ICA sinkTableCA; private Map<Integer, ColumnCA> columnCASMaps;
private List<ICA> columnCAS;
private Map<Integer, ICA> columnCASMaps;
private Set<NodeRel> columnCASRel; private Set<NodeRel> columnCASRel;
private Set<NodeRel> columnCASRelChain;
private List<Integer> sinkColumns; private List<Integer> sinkColumns;
private List<Integer> sourColumns; private List<Integer> sourColumns;
public ColumnCAResult(ColumnCAGenerator generator) { public ColumnCAResult(LineageColumnGenerator generator) {
this.columnCAS = generator.getColumnCAS(); this.tableCAS = generator.getTableCAS();
this.sinkTableCA = generator.getSinkTableCA();
this.sinkName = generator.getSinkTableName();
this.columnCASMaps = generator.getColumnCASMaps(); this.columnCASMaps = generator.getColumnCASMaps();
this.columnCASRel = generator.getColumnCASRel(); this.columnCASRel = generator.getColumnCASRel();
this.columnCASRelChain = generator.getColumnCASRelChain();
this.sinkColumns = generator.getSinkColumns(); this.sinkColumns = generator.getSinkColumns();
this.sourColumns = generator.getSourceColumns(); this.sourColumns = generator.getSourceColumns();
} }
public List<TableCA> getTableCAS() {
return tableCAS;
}
public void setTableCAS(List<TableCA> tableCAS) {
this.tableCAS = tableCAS;
}
public Map<Integer, ColumnCA> getColumnCASMaps() {
return columnCASMaps;
}
public void setColumnCASMaps(Map<Integer, ColumnCA> columnCASMaps) {
this.columnCASMaps = columnCASMaps;
}
public Set<NodeRel> getColumnCASRel() {
return columnCASRel;
}
public void setColumnCASRel(Set<NodeRel> columnCASRel) {
this.columnCASRel = columnCASRel;
}
public Set<NodeRel> getColumnCASRelChain() {
return columnCASRelChain;
}
public void setColumnCASRelChain(Set<NodeRel> columnCASRelChain) {
this.columnCASRelChain = columnCASRelChain;
}
public List<Integer> getSinkColumns() {
return sinkColumns;
}
public void setSinkColumns(List<Integer> sinkColumns) {
this.sinkColumns = sinkColumns;
}
public List<Integer> getSourColumns() {
return sourColumns;
}
public void setSourColumns(List<Integer> sourColumns) {
this.sourColumns = sourColumns;
}
} }
...@@ -30,6 +30,7 @@ public class TableCAGenerator implements CAGenerator { ...@@ -30,6 +30,7 @@ public class TableCAGenerator implements CAGenerator {
private Map<Integer, Trans> transMaps; private Map<Integer, Trans> transMaps;
private Set<Integer> parentIdSet; private Set<Integer> parentIdSet;
private List<ICA> sourceTableCAS = new ArrayList<>(); private List<ICA> sourceTableCAS = new ArrayList<>();
private List<ICA> sinkTableCAS = new ArrayList<>();
private ICA sinkTableCA = null; private ICA sinkTableCA = null;
private String sinkTableName; private String sinkTableName;
...@@ -38,8 +39,8 @@ public class TableCAGenerator implements CAGenerator { ...@@ -38,8 +39,8 @@ public class TableCAGenerator implements CAGenerator {
this.transMaps = new HashMap<>(); this.transMaps = new HashMap<>();
this.parentIdSet = new HashSet<>(); this.parentIdSet = new HashSet<>();
for (int i = 0; i < transList.size(); i++) { for (int i = 0; i < transList.size(); i++) {
this.transMaps.put(transList.get(i).getId(),transList.get(i)); this.transMaps.put(transList.get(i).getId(), transList.get(i));
if(transList.get(i).getParentId()!=null) { if (transList.get(i).getParentId() != null) {
parentIdSet.add(transList.get(i).getParentId()); parentIdSet.add(transList.get(i).getParentId());
} }
} }
...@@ -49,35 +50,36 @@ public class TableCAGenerator implements CAGenerator { ...@@ -49,35 +50,36 @@ public class TableCAGenerator implements CAGenerator {
return new TableCAGenerator(transList); return new TableCAGenerator(transList);
} }
public TableCAResult getResult(){ public TableCAResult getResult() {
return new TableCAResult(this); return new TableCAResult(this);
} }
@Override @Override
public void translate() { public void translate() {
for(Trans trans : transList){ for (Trans trans : transList) {
if(trans instanceof SourceTrans) { if (trans instanceof SourceTrans) {
TableCA tableCA = TableCA.build(trans); TableCA tableCA = TableCA.build(trans);
List<String> sourceFields = new ArrayList<>(); List<String> sourceFields = new ArrayList<>();
CollectionUtils.addAll(sourceFields, new Object[tableCA.getFields().size()]); CollectionUtils.addAll(sourceFields, new Object[tableCA.getFields().size()]);
Collections.copy(sourceFields, tableCA.getFields()); Collections.copy(sourceFields, tableCA.getFields());
for (int j = 0; j < sourceFields.size(); j++) { for (int j = 0; j < sourceFields.size(); j++) {
buildTableCAFields(tableCA,tableCA.getParentId(),sourceFields.get(j)); buildTableCAFields(tableCA, tableCA.getParentId(), sourceFields.get(j));
} }
this.sourceTableCAS.add(tableCA); this.sourceTableCAS.add(tableCA);
}else if(trans instanceof SinkTrans) { } else if (trans instanceof SinkTrans) {
TableCA tableCA = TableCA.build(trans); TableCA tableCA = TableCA.build(trans);
this.sinkTableCA = tableCA; this.sinkTableCA = tableCA;
this.sinkTableName = tableCA.getName(); this.sinkTableName = tableCA.getName();
this.sinkTableCAS.add(TableCA.build(trans));
} }
} }
} }
public void translateOnlyTable() { public void translateOnlyTable() {
for(Trans trans : transList){ for (Trans trans : transList) {
if(trans instanceof SourceTrans) { if (trans instanceof SourceTrans) {
this.sourceTableCAS.add(new TableCA((SourceTrans) trans)); this.sourceTableCAS.add(new TableCA((SourceTrans) trans));
}else if(trans instanceof SinkTrans) { } else if (trans instanceof SinkTrans) {
TableCA tableCA = new TableCA((SinkTrans) trans); TableCA tableCA = new TableCA((SinkTrans) trans);
this.sinkTableCA = tableCA; this.sinkTableCA = tableCA;
this.sinkTableName = tableCA.getName(); this.sinkTableName = tableCA.getName();
...@@ -85,26 +87,26 @@ public class TableCAGenerator implements CAGenerator { ...@@ -85,26 +87,26 @@ public class TableCAGenerator implements CAGenerator {
} }
} }
private void buildTableCAFields(TableCA tableCA,Integer id,String field){ private void buildTableCAFields(TableCA tableCA, Integer id, String field) {
if(transMaps.get(id) instanceof OperatorTrans){ if (transMaps.get(id) instanceof OperatorTrans) {
OperatorTrans trans = (OperatorTrans) transMaps.get(id); OperatorTrans trans = (OperatorTrans) transMaps.get(id);
searchSelectFields(tableCA, trans.getSelect(),field); searchSelectFields(tableCA, trans.getSelect(), field);
searchWhereFields(tableCA, trans.getWhere(),field); searchWhereFields(tableCA, trans.getWhere(), field);
if(Asserts.isNotNull(trans.getParentId())){ if (Asserts.isNotNull(trans.getParentId())) {
buildTableCAFields(tableCA,trans.getParentId(),field); buildTableCAFields(tableCA, trans.getParentId(), field);
} }
} }
} }
private void searchSelectFields(TableCA tableCA, List<Field> selects, String field){ private void searchSelectFields(TableCA tableCA, List<Field> selects, String field) {
if(Asserts.isNotNull(selects)){ if (Asserts.isNotNull(selects)) {
for (int i = 0; i < selects.size(); i++){ for (int i = 0; i < selects.size(); i++) {
List<String> fields = matchFields( selects.get(i).getFragment(),field); List<String> fields = matchFields(selects.get(i).getFragment(), field);
/*if(tableCA.getFields().contains(field)){ /*if(tableCA.getFields().contains(field)){
tableCA.getFields().remove(field); tableCA.getFields().remove(field);
}*/ }*/
for (int j = 0; j < fields.size(); j++) { for (int j = 0; j < fields.size(); j++) {
if(!tableCA.getUseFields().contains(fields.get(j))){ if (!tableCA.getUseFields().contains(fields.get(j))) {
tableCA.getUseFields().add(fields.get(j)); tableCA.getUseFields().add(fields.get(j));
} }
} }
...@@ -112,28 +114,30 @@ public class TableCAGenerator implements CAGenerator { ...@@ -112,28 +114,30 @@ public class TableCAGenerator implements CAGenerator {
} }
} }
private void searchWhereFields(TableCA tableCA,String wheres,String field){ private void searchWhereFields(TableCA tableCA, String wheres, String field) {
if(Asserts.isNotNull(wheres)&&!"[]".equals(wheres)){ if (Asserts.isNotNull(wheres) && !"[]".equals(wheres)) {
List<String> fields = matchFields( wheres,field); List<String> fields = matchFields(wheres, field);
/*if(tableCA.getFields().contains(field)){ /*if(tableCA.getFields().contains(field)){
tableCA.getFields().remove(field); tableCA.getFields().remove(field);
}*/ }*/
for (int j = 0; j < fields.size(); j++) { for (int j = 0; j < fields.size(); j++) {
if(!tableCA.getUseFields().contains(fields.get(j))){ if (!tableCA.getUseFields().contains(fields.get(j))) {
tableCA.getUseFields().add(fields.get(j)); tableCA.getUseFields().add(fields.get(j));
} }
} }
} }
} }
private List<String> matchFields(String fragement,String field){ private List<String> matchFields(String fragement, String field) {
List<String> fields = new ArrayList<>(); List<String> fields = new ArrayList<>();
Pattern p = Pattern.compile(field+"\\.(.*?) "); String sign = "([^a-zA-Z0-9_]?)";
Matcher m = p.matcher(fragement+" "); Pattern p = Pattern.compile(sign + field + sign);
while(m.find()){ Matcher m = p.matcher(fragement);
fields.add(m.group(0).replaceFirst("\\)","").trim()); while (m.find()) {
fields.add(m.group(1).trim());
fields.add(fragement.trim());
} }
if(fragement.equals(field)){ if (fragement.equals(field)) {
fields.add(fragement.trim()); fields.add(fragement.trim());
} }
return fields; return fields;
......
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import com.dlink.explainer.ca.ColumnCAResult; import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA;
import com.dlink.plus.FlinkSqlPlus; import com.dlink.plus.FlinkSqlPlus;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
...@@ -16,9 +19,23 @@ public class LineageBuilder { ...@@ -16,9 +19,23 @@ public class LineageBuilder {
public static LineageResult getLineage(String statement){ public static LineageResult getLineage(String statement){
FlinkSqlPlus plus = FlinkSqlPlus.build(); FlinkSqlPlus plus = FlinkSqlPlus.build();
List<ColumnCAResult> columnCAResults = plus.explainSqlColumnCA(statement); List<ColumnCAResult> columnCAResults = plus.explainSqlColumnCA(statement);
for (int j = 0; j < columnCAResults.size(); j++) { List<LineageTable> tables = new ArrayList<>();
ColumnCAResult result = columnCAResults.get(j); List<LineageRelation> relations = new ArrayList<>();
int index = 0;
for (ColumnCAResult item: columnCAResults) {
for(TableCA tableCA: item.getTableCAS()){
tables.add(LineageTable.build(tableCA));
} }
return null; for(NodeRel nodeRel: item.getColumnCASRelChain()){
index ++;
relations.add(LineageRelation.build(index+"",
item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().toString(),
item.getColumnCASMaps().get(nodeRel.getSufId()).getTableId().toString(),
item.getColumnCASMaps().get(nodeRel.getPreId()).getName(),
item.getColumnCASMaps().get(nodeRel.getSufId()).getName()
));
}
}
return LineageResult.build(tables,relations);
} }
} }
...@@ -13,6 +13,15 @@ public class LineageColumn { ...@@ -13,6 +13,15 @@ public class LineageColumn {
public LineageColumn() { public LineageColumn() {
} }
public LineageColumn(String name, String title) {
this.name = name;
this.title = title;
}
public static LineageColumn build(String name, String title){
return new LineageColumn(name,title);
}
public String getName() { public String getName() {
return name; return name;
} }
......
package com.dlink.explainer.lineage;
import com.dlink.assertion.Asserts;
import com.dlink.explainer.ca.ColumnCA;
import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA;
import com.dlink.explainer.trans.Field;
import com.dlink.explainer.trans.OperatorTrans;
import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.SourceTrans;
import com.dlink.explainer.trans.Trans;
import com.dlink.utils.MapParseUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* LineageColumnGenerator
*
* @author wenmo
* @since 2022/3/16 20:20
**/
public class LineageColumnGenerator {
private Map<Integer, Trans> transMaps;
private List<TableCA> tableCAS = new ArrayList<>();
private List<ColumnCA> columnCAS = new ArrayList<>();
private Map<Integer, ColumnCA> columnCASMaps = new HashMap<>();
private Set<NodeRel> columnCASRel = new HashSet<>();
private Set<NodeRel> columnCASRelChain = new HashSet<>();
private int index = 1;
private List<Integer> sinkColumns = new ArrayList<>();
private List<Integer> sourceColumns = new ArrayList<>();
public static LineageColumnGenerator build(List<Trans> transList) {
LineageColumnGenerator generator = new LineageColumnGenerator();
Map<Integer, Trans> map = new HashMap<>();
for (Trans trans : transList) {
map.put(trans.getId(), trans);
}
generator.setTransMaps(map);
return generator;
}
public void translate() {
for (Map.Entry<Integer, Trans> entry : transMaps.entrySet()) {
Trans trans = entry.getValue();
if (trans instanceof SourceTrans) {
TableCA tableCA = new TableCA((SourceTrans) trans);
for (String fieldName : tableCA.getFields()) {
int id = index++;
ColumnCA columnCA = new ColumnCA(id, fieldName, fieldName, fieldName, fieldName, fieldName, tableCA, trans);
columnCASMaps.put(id, columnCA);
columnCAS.add(columnCA);
}
for (ColumnCA columnCA : columnCAS) {
if (columnCA.getTableCA().getId() == tableCA.getId()) {
buildColumnCAFields(tableCA, tableCA.getParentId(), columnCA);
}
}
}
}
for (Map.Entry<Integer, Trans> entry : transMaps.entrySet()) {
Trans trans = entry.getValue();
if (trans instanceof SinkTrans) {
TableCA tableCA = new TableCA((SinkTrans) trans);
matchSinkField(tableCA,trans);
searchColumnCAId(tableCA);
}
}
chainRelation();
}
private void matchSinkField(TableCA tableCA,Trans trans){
for(ColumnCA columnCA: columnCAS){
for(String fieldName: tableCA.getFields()){
if(columnCA.getName().equals(fieldName)){
int cid = index++;
ColumnCA sinkColumnCA = new ColumnCA(cid, fieldName, fieldName, fieldName, fieldName, fieldName, tableCA, trans);
columnCASMaps.put(cid, sinkColumnCA);
columnCASRel.add(new NodeRel(columnCA.getId(), cid));
}
}
}
}
private void buildColumnCAFields(TableCA tableCA, Integer id, ColumnCA columnCA) {
if (transMaps.get(id) instanceof OperatorTrans) {
OperatorTrans trans = (OperatorTrans) transMaps.get(id);
List<Field> selects = trans.getSelect();
if (Asserts.isNotNull(selects)) {
for (int i = 0; i < selects.size(); i++) {
String operation = selects.get(i).getFragment();
String alias = selects.get(i).getAlias();
searchSelect(tableCA, columnCA, trans, operation, alias);
}
}
List<String> fields = trans.getFields();
if (Asserts.isNotNull(fields)) {
for (int i = 0; i < fields.size(); i++) {
String field = fields.get(i);
searchSelect(tableCA, columnCA, trans, field, field);
}
}
if (trans.getParentId() != null) {
buildColumnCAFields(tableCA, trans.getParentId(), columnCA);
}
}
}
private void searchSelect(TableCA tableCA, ColumnCA columnCA, OperatorTrans trans, String operation, String alias) {
if (MapParseUtils.hasField(operation, columnCA.getAlias())) {
boolean isHad = false;
Integer cid = null;
for (Map.Entry<Integer, ColumnCA> item : columnCASMaps.entrySet()) {
ColumnCA columnCA1 = item.getValue();
if (columnCA1.getTableCA().getId() == tableCA.getId() && columnCA1.getName().equals(alias)) {
isHad = true;
cid = columnCA1.getId();
break;
}
}
if(columnCA.getId()==cid){
return;
}
if (!isHad) {
cid = index++;
ColumnCA columnCA2 = new ColumnCA(cid, alias, alias, alias, alias, operation, tableCA, trans);
columnCASMaps.put(cid, columnCA2);
buildColumnCAFields(tableCA, trans.getParentId(), columnCA2);
}
columnCASRel.add(new NodeRel(columnCA.getId(), cid));
}
}
private void searchColumnCAId(TableCA tableCA) {
List<Integer> sufOnly = new ArrayList<>();
for (NodeRel nodeRel : columnCASRel) {
if (!sufOnly.contains(nodeRel.getSufId())) {
sufOnly.add(nodeRel.getSufId());
}
}
for (NodeRel nodeRel : columnCASRel) {
if (sufOnly.contains(nodeRel.getPreId())) {
sufOnly.remove(nodeRel.getPreId());
}
}
List<Integer> preOnly = new ArrayList<>();
for (NodeRel nodeRel : columnCASRel) {
if (!preOnly.contains(nodeRel.getPreId())) {
preOnly.add(nodeRel.getPreId());
}
}
for (NodeRel nodeRel : columnCASRel) {
if (preOnly.contains(nodeRel.getSufId())) {
preOnly.remove(nodeRel.getSufId());
}
}
for (int i = 0; i < sufOnly.size(); i++) {
ColumnCA columnCA = columnCASMaps.get(sufOnly.get(i));
List<String> fields = tableCA.getFields();
for (int j = 0; j < fields.size(); j++) {
if (columnCA.getAlias().equals(fields.get(j))) {
tableCA.getColumnCAIds().add(sufOnly.get(i));
columnCA.setTableId(tableCA.getId());
break;
}
}
}
sinkColumns = sufOnly;
sourceColumns = preOnly;
}
private void chainRelation() {
Set<NodeRel> nodeRelsChain = new HashSet<>();
for (Integer item : sourceColumns) {
nodeRelsChain.add(new NodeRel(item, getNextSuf(item)));
}
columnCASRelChain = nodeRelsChain;
}
private Integer getNextSuf(Integer sufId) {
for (NodeRel nodeRel : columnCASRel) {
if (nodeRel.getPreId() == sufId) {
return getNextSuf(nodeRel.getSufId());
}
}
return sufId;
}
public Map<Integer, Trans> getTransMaps() {
return transMaps;
}
public void setTransMaps(Map<Integer, Trans> transMaps) {
this.transMaps = transMaps;
}
public List<TableCA> getTableCAS() {
return tableCAS;
}
public void setTableCAS(List<TableCA> tableCAS) {
this.tableCAS = tableCAS;
}
public List<ColumnCA> getColumnCAS() {
return columnCAS;
}
public void setColumnCAS(List<ColumnCA> columnCAS) {
this.columnCAS = columnCAS;
}
public Map<Integer, ColumnCA> getColumnCASMaps() {
return columnCASMaps;
}
public void setColumnCASMaps(Map<Integer, ColumnCA> columnCASMaps) {
this.columnCASMaps = columnCASMaps;
}
public Set<NodeRel> getColumnCASRel() {
return columnCASRel;
}
public void setColumnCASRel(Set<NodeRel> columnCASRel) {
this.columnCASRel = columnCASRel;
}
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
public List<Integer> getSinkColumns() {
return sinkColumns;
}
public void setSinkColumns(List<Integer> sinkColumns) {
this.sinkColumns = sinkColumns;
}
public List<Integer> getSourceColumns() {
return sourceColumns;
}
public void setSourceColumns(List<Integer> sourceColumns) {
this.sourceColumns = sourceColumns;
}
public Set<NodeRel> getColumnCASRelChain() {
return columnCASRelChain;
}
public void setColumnCASRelChain(Set<NodeRel> columnCASRelChain) {
this.columnCASRelChain = columnCASRelChain;
}
}
...@@ -16,6 +16,17 @@ public class LineageRelation { ...@@ -16,6 +16,17 @@ public class LineageRelation {
public LineageRelation() { public LineageRelation() {
} }
public LineageRelation(String id, String srcTableId, String tgtTableId, String srcTableColName, String tgtTableColName) {
this.id = id;
this.srcTableId = srcTableId;
this.tgtTableId = tgtTableId;
this.srcTableColName = srcTableColName;
this.tgtTableColName = tgtTableColName;
}
public static LineageRelation build(String id, String srcTableId, String tgtTableId, String srcTableColName, String tgtTableColName){
return new LineageRelation(id,srcTableId,tgtTableId,srcTableColName,tgtTableColName);
}
public String getId() { public String getId() {
return id; return id;
} }
......
...@@ -15,6 +15,15 @@ public class LineageResult { ...@@ -15,6 +15,15 @@ public class LineageResult {
public LineageResult() { public LineageResult() {
} }
public LineageResult(List<LineageTable> tables, List<LineageRelation> relations) {
this.tables = tables;
this.relations = relations;
}
public static LineageResult build(List<LineageTable> tables, List<LineageRelation> relations){
return new LineageResult(tables,relations);
}
public List<LineageTable> getTables() { public List<LineageTable> getTables() {
return tables; return tables;
} }
......
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import com.dlink.explainer.ca.TableCA;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
...@@ -16,6 +19,18 @@ public class LineageTable { ...@@ -16,6 +19,18 @@ public class LineageTable {
public LineageTable() { public LineageTable() {
} }
public static LineageTable build(TableCA tableCA){
LineageTable lineageTable = new LineageTable();
lineageTable.setId(tableCA.getId().toString());
lineageTable.setName(tableCA.getName());
List<LineageColumn> columnList = new ArrayList<>();
for(String columnName: tableCA.getFields()){
columnList.add(LineageColumn.build(columnName,columnName));
}
lineageTable.setColumns(columnList);
return lineageTable;
}
public String getId() { public String getId() {
return id; return id;
} }
......
package com.dlink.explainer.lineage;
import com.dlink.explainer.ca.TableCA;
import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.SourceTrans;
import com.dlink.explainer.trans.Trans;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* LineageTableGenerator
*
* @author wenmo
* @since 2022/3/16 19:56
**/
public class LineageTableGenerator {
private Map<Integer, Trans> transMaps;
private List<TableCA> tables = new ArrayList<>();
public LineageTableGenerator() {
}
public static LineageTableGenerator build(List<Trans> transList){
LineageTableGenerator generator = new LineageTableGenerator();
Map<Integer, Trans> map = new HashMap<>();
for (Trans trans: transList) {
map.put(trans.getId(), trans);
}
generator.setTransMaps(map);
return generator;
}
public void translate() {
for (Map.Entry<Integer, Trans> entry : transMaps.entrySet()) {
if (entry.getValue() instanceof SourceTrans) {
tables.add(TableCA.build(entry.getValue()));
} else if (entry.getValue() instanceof SinkTrans) {
tables.add(TableCA.build(entry.getValue()));
}
}
}
public Map<Integer, Trans> getTransMaps() {
return transMaps;
}
public void setTransMaps(Map<Integer, Trans> transMaps) {
this.transMaps = transMaps;
}
public List<TableCA> getTables() {
return tables;
}
public void setTables(List<TableCA> tables) {
this.tables = tables;
}
}
...@@ -16,18 +16,23 @@ import java.util.Map; ...@@ -16,18 +16,23 @@ import java.util.Map;
public class OperatorTrans extends AbstractTrans implements Trans { public class OperatorTrans extends AbstractTrans implements Trans {
private List<Field> select; private List<Field> select;
private List<String> fields;
private List<String> joinType; private List<String> joinType;
private String where; private String where;
private List<String> leftInputSpec; private List<String> leftInputSpec;
private List<String> rightInputSpec; private List<String> rightInputSpec;
public final static String TRANS_TYPE = "Operator"; public final static String TRANS_TYPE = "Operator";
private final static String FIELD_SEPARATOR = " AS "; private final static String FIELD_AS = " AS ";
public List<Field> getSelect() { public List<Field> getSelect() {
return select; return select;
} }
public List<String> getFields() {
return fields;
}
public List<String> getJoinType() { public List<String> getJoinType() {
return joinType; return joinType;
} }
...@@ -60,31 +65,32 @@ public class OperatorTrans extends AbstractTrans implements Trans { ...@@ -60,31 +65,32 @@ public class OperatorTrans extends AbstractTrans implements Trans {
name = pact; name = pact;
Map map = MapParseUtils.parseForSelect(contents); Map map = MapParseUtils.parseForSelect(contents);
translateSelect((ArrayList<String>) map.get("select")); translateSelect((ArrayList<String>) map.get("select"));
fields = (ArrayList<String>) map.get("fields");
joinType = (ArrayList<String>) map.get("joinType"); joinType = (ArrayList<String>) map.get("joinType");
where = map.containsKey("where")?map.get("where").toString():null; where = map.containsKey("where") ? map.get("where").toString() : null;
leftInputSpec = (ArrayList<String>) map.get("leftInputSpec"); leftInputSpec = (ArrayList<String>) map.get("leftInputSpec");
rightInputSpec = (ArrayList<String>) map.get("rightInputSpec"); rightInputSpec = (ArrayList<String>) map.get("rightInputSpec");
} }
private void translateSelect(ArrayList<String> fieldStrs){ private void translateSelect(ArrayList<String> fieldStrs) {
if(fieldStrs!=null&&fieldStrs.size()>0) { if (fieldStrs != null && fieldStrs.size() > 0) {
select = new ArrayList<>(); select = new ArrayList<>();
for (int i = 0; i < fieldStrs.size(); i++) { for (int i = 0; i < fieldStrs.size(); i++) {
String fieldStr = fieldStrs.get(i); String fieldStr = fieldStrs.get(i).trim();
if(fieldStr.toUpperCase().contains(FIELD_SEPARATOR)){ if (fieldStr.toUpperCase().contains(FIELD_AS)) {
String [] fieldNames = fieldStr.split(FIELD_SEPARATOR); String[] fieldNames = fieldStr.split(FIELD_AS);
if(fieldNames.length==2) { if (fieldNames.length == 2) {
select.add(new Field(fieldNames[0], fieldNames[1])); select.add(new Field(fieldNames[0].trim(), fieldNames[1].trim()));
}else if(fieldNames.length==1) { } else if (fieldNames.length == 1) {
select.add(new Field(fieldNames[0])); select.add(new Field(fieldNames[0].trim()));
}else{ } else {
List<String> fieldNameList = new ArrayList<>(); List<String> fieldNameList = new ArrayList<>();
for (int j = 0; j < fieldNames.length-1; j++) { for (int j = 0; j < fieldNames.length - 1; j++) {
fieldNameList.add(fieldNames[j]); fieldNameList.add(fieldNames[j]);
} }
select.add(new Field(StringUtils.join(fieldNameList,FIELD_SEPARATOR),fieldNames[fieldNames.length-1])); select.add(new Field(StringUtils.join(fieldNameList, FIELD_AS).trim(), fieldNames[fieldNames.length - 1].trim()));
} }
}else{ } else {
select.add(new Field(fieldStr)); select.add(new Field(fieldStr));
} }
} }
......
...@@ -8,6 +8,8 @@ import java.util.LinkedList; ...@@ -8,6 +8,8 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Stack; import java.util.Stack;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -105,6 +107,54 @@ public class MapParseUtils { ...@@ -105,6 +107,54 @@ public class MapParseUtils {
return nestIndexList; return nestIndexList;
} }
public static List<String> getSelectList(String inStr) {
List<String> selects = new ArrayList<>();
int startIndex = inStr.indexOf("=[") + 2;
if (inStr == null || inStr.isEmpty()) {
return selects;
}
Deque<Integer> stack = new LinkedList<>();
for (int i = 0; i < inStr.length(); i++) {
if (inStr.charAt(i) == ',' && stack.size() == 0) {
selects.add(inStr.substring(startIndex, i));
startIndex = i + 1;
}
if (inStr.charAt(i) == '(') {
stack.push(i);
}
if (inStr.charAt(i) == ')') {
stack.pop();
}
}
if (startIndex < inStr.length()) {
selects.add(inStr.substring(startIndex, inStr.length() - 1));
}
return selects;
}
public static boolean hasField(String fragement, String field) {
if(field.startsWith("$")){
field = field.substring(1,field.length());
}
String sign = "([^a-zA-Z0-9_]?)";
Pattern p = Pattern.compile(sign + field + sign);
Matcher m = p.matcher(fragement);
while (m.find()) {
return true;
}
return false;
}
public static String replaceField(String operation, String field, String fragement) {
String newOperation = operation;
String sign = "([^a-zA-Z0-9_]?)";
Pattern p = Pattern.compile(sign + field + sign);
Matcher m = p.matcher(operation);
while (m.find()) {
newOperation = newOperation.substring(0,m.start(1)+1)+ fragement + newOperation.substring(m.end(1)+1,newOperation.length());
}
return newOperation;
}
/** /**
* 转换map * 转换map
...@@ -180,27 +230,7 @@ public class MapParseUtils { ...@@ -180,27 +230,7 @@ public class MapParseUtils {
*/ */
public static Map parseForSelect(String inStr) { public static Map parseForSelect(String inStr) {
Map map = new HashMap(); Map map = new HashMap();
List<Integer> bracketsList = getBracketsList(inStr); map.put(getMapKey(inStr), getSelectList(inStr));
String mapKey = getMapKey(inStr);
List<String> list = new ArrayList<>();
int size = bracketsList.size();
if (size % 2 != 0) {
// 此处若size部位偶数 则返回空 可能会存在问题
return map;
} else {
int numSize = size / 2;//括号对数
for (int i = 0; i < numSize; i++) {
String msgStr = "";
if (2 * i + 2 >= size) {
msgStr = inStr.substring(bracketsList.get(2 * i), inStr.lastIndexOf("]"));
} else {
msgStr = inStr.substring(bracketsList.get(2 * i), bracketsList.get(2 * i + 2));
msgStr = msgStr.substring(0, msgStr.lastIndexOf(",") > 0 ? msgStr.lastIndexOf(",") : msgStr.length());
}
list.add(msgStr);
}
}
map.put(mapKey, list);
return map; return map;
} }
......
package com.dlink.core; package com.dlink.core;
import com.dlink.explainer.ca.CABuilder;
import com.dlink.explainer.ca.ColumnCANode;
import com.dlink.explainer.ca.TableCANode;
import com.dlink.explainer.lineage.LineageBuilder; import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult; import com.dlink.explainer.lineage.LineageResult;
import org.junit.Test; import org.junit.Test;
import java.util.List;
/** /**
* LineageTest * LineageTest
* *
......
import React from 'react';
import {Tooltip} from 'antd';
import {
FullscreenOutlined,
FullscreenExitOutlined,
VerticalAlignBottomOutlined,
VerticalAlignTopOutlined
} from '@ant-design/icons';
const LineageOps = ({
isExpand,
isFold,
onAction,
tableId,
}) => [
isExpand ?
{
tooltip: '收起血缘',
action: 'shrink',
component: <FullscreenExitOutlined/>
}
:
{
tooltip: '展开血缘',
action: 'expand',
component: <FullscreenOutlined/>
},
isFold ?
{
tooltip: '展开字段',
action: 'fold',
component: <VerticalAlignBottomOutlined/>
}
:
{
tooltip: '收起字段',
action: 'unfold',
component: <VerticalAlignTopOutlined/>
}
].map(op => {
return {
component: (
<Tooltip
title={op.tooltip}
>
<span onClick={() => onAction(op.action, tableId)}>
{
op.component
}
</span>
</Tooltip>
)
}
});
export default LineageOps;
import {LineageTable} from 'react-lineage-dag';
import {useEffect, useState, useRef} from "react";
import LineageOps from "@/components/Lineage/LineageOps";
export const getInit = () => {
return {
tables: [],
relations: []
}
};
const Lineage = (props: any) => {
const {datas} = props;
const cvsRef = useRef(null);
const [data, setData] = useState(getInit());
const [allData, setAllData] = useState(getInit());
const [relayout, setRelayout] = useState(false);
const [focus, setFocus] = useState(false);
const getChildren = (tableId) => {
const children = {
tables: [],
relations: []
};
debugger;
allData.relations.forEach(relation => {
if (relation.srcTableId !== tableId) {
return;
}
children.relations.push(relation)
const tgtTableId = relation.tgtTableId;
if (children.tables.some(table => table.id === tgtTableId)) {
return;
}
const table = allData.tables.find(table => table.id === tgtTableId);
children.tables.push(table);
});
return children;
};
const onAction = (action, tableId) => {
switch (action) {
case 'expand': {
const table = data.tables.find(t => t.id === tableId);
table.isExpand = true;
const children = getChildren(tableId);
children.tables.forEach(table => {
if(data.tables.some(t => t.id === table.id)) {
return;
}
data.tables.push(table);
});
children.relations.forEach(relation => {
if(data.relations.some(r => r.id === relation.id)) {
return;
}
data.relations.push(relation);
});
setData({...data});
break;
}
case 'shrink': {
const table = data.tables.find(t => t.id === tableId);
table.isExpand = false;
const children = getChildren(tableId);
children.tables.forEach(table => {
const index = data.tables.findIndex(t => t.id === table.id);
data.tables.splice(index, 1);
});
children.relations.forEach(relation => {
const index = data.relations.findIndex(r => r.id === relation.id);
data.relations.splice(index, 1);
});
setData({...data});
break;
}
case 'fold': {
data.tables.forEach(table => {
if (table.id !== tableId) {
return;
}
table.isFold = false;
});
data.tables = [...data.tables];
setData({...data});
break;
}
case 'unfold': {
data.tables.forEach(table => {
if (table.id !== tableId) {
return;
}
table.isFold = true;
});
data.tables = [...data.tables];
setData({...data});
break;
}
}
};
const getData = () => {
setData(datas);
let newDatas = {
tables: [...datas.tables],
relations: [...datas.relations]
};
setAllData(newDatas);
};
useEffect(() => {
getData();
}, [datas]);
data.tables.forEach(table => {
table.operators = LineageOps({
isExpand: !!table.isExpand,
isFold: !!table.isFold,
onAction,
tableId: table.id
})
});
return (<LineageTable {...data}
onLoaded={(canvas) => {
cvsRef.current = canvas;
}}
onEachFrame={() => {
if (!cvsRef.current) {
return;
}
if (relayout) {
cvsRef.current.relayout();
setRelayout(false);
}
if (focus) {
cvsRef.current.focusNode(focus);
setFocus(false);
}
}}/>)
};
export default Lineage;
import { Tabs,Empty,Tooltip,Button } from "antd"; import { Tabs,Tooltip,Button } from "antd";
import { FlowAnalysisGraph } from '@ant-design/charts';
import {SearchOutlined} from "@ant-design/icons"; import {SearchOutlined} from "@ant-design/icons";
import {StateType} from "@/pages/FlinkSqlStudio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi"; import {connect} from "umi";
import styles from "./index.less"; import styles from "./index.less";
import { getCAByStatement} from "@/pages/FlinkSqlStudio/service"; import {getLineage} from "@/pages/FlinkSqlStudio/service";
import {useState} from "react"; import {useState} from "react";
import Lineage, {getInit} from "@/components/Lineage";
const { TabPane } = Tabs; const { TabPane } = Tabs;
const StudioCA = (props:any) => { const StudioCA = (props: any) => {
const {current} = props; const {current} = props;
const [oneTableCAData,setOneTableCAData] = useState<any>(null); const [data, setData] = useState(getInit());
const [oneColumnCAData,setOneColumnCAData] = useState<any>(null);
const nodeStateStyles = { const handleLineage=()=>{
hover: { const res = getLineage({
stroke: '#1890ff',
lineWidth: 2,
},
selected: {
stroke: '#f00',
lineWidth: 3,
},
};
const buildGraphData=(data,graphData)=>{
if(!graphData.nodes){
graphData.nodes = [];
}
if(!graphData.edges){
graphData.edges = [];
}
for(let i in data){
let nodesItem = {
id:data[i].id,
value:{
title:data[i].name,
items: [
{
text: data[i].columns,
},
],
}
}
graphData.nodes.push(nodesItem);
if(data[i].children){
for(let j in data[i].children){
graphData.edges.push({source: data[i].children[j].id,
target: data[i].id,
value: ''});
buildGraphData(data[i].children,graphData);
}
}
}
};
const config = {
data: oneTableCAData,
height:350,
nodeCfg: {
size: [160, 65],
items: {
autoEllipsis: false,
padding: [10],
containerStyle: {
// fill: '#fff',
width:'100px',
},
style: (cfg, group, type) => {
const styles = {
value: {
// fill: '#000',
},
text: {
// fill: '#222',
width:'100px',
},
};
return styles[type];
},
},
nodeStateStyles: {
hover: {
// stroke: '#1890ff',
lineWidth: 2,
},
},
style: {
// fill: '#40a9ff',
// stroke: '#1890ff',
},
},
edgeCfg: {
type: 'polyline',
label: {
style: {
// fill: '#666',
fontSize: 12,
fillOpacity: 1,
},
},
endArrow: {
// fill: '#333',
},
edgeStateStyles: {
hover: {
// stroke: '#1890ff',
lineWidth: 2,
},
},
},
markerCfg: (cfg) => {
const { edges } = oneTableCAData;
return {
position: 'right',
show: edges.find((item) => item.source === cfg.id),
collapsed: !edges.find((item) => item.source === cfg.id),
};
},
behaviors: ['drag-canvas', 'zoom-canvas', 'drag-node'],
/*layout: {
rankdir: 'TB',
ranksepFunc: () => 20,
},*/
};
const columnConfig = {
data:oneColumnCAData,
behaviors: ['drag-canvas', 'zoom-canvas', 'drag-node'],
bodyStyle: {
fill: '#aaa',
},
nodeStateStyles,
onReady: (graph) => {
graph.on('node:mouseenter', (evt) => {
const item = evt.item;
graph.setItemState(item, 'hover', true);
});
graph.on('node:mouseleave', (evt) => {
const item = evt.item;
graph.setItemState(item, 'hover', false);
});
},
edgeStyle: (item, graph) => {
/**
* graph.findById(item.target).getModel()
* item.source: 获取 source 数据
* item.target: 获取 target 数据
*/
// console.log(graph.findById(item.target).getModel());
return {
stroke: '#40a9ff',
// lineWidth: graph.findById(item.target).getModel().columnSize,
lineWidth: 1,
strokeOpacity: 0.5,
};
},
nodeStyle: () => {
return {
stroke: '#40a9ff',
};
},
};
const getOneTableCA=()=>{
const res = getCAByStatement({
statement:current.value, statement:current.value,
type: 1, type: 1,
}); });
res.then((result)=>{ res.then((result)=>{
if(result.code==0){ setData(result.datas);
let graphData = {};
buildGraphData(result.datas,graphData);
setOneTableCAData(graphData);
}else{
setOneTableCAData(null);
}
}) })
}; };
const getOneColumnCA=()=>{ return (<>
const res = getCAByStatement({ <Tabs defaultActiveKey="Lineage" size="small" tabPosition="top" style={{border: "1px solid #f0f0f0"}}
statement:current.value, tabBarExtraContent={<Tooltip title="重新计算血缘">
type: 2,
});
res.then((result)=>{
if(result.code==0){
setOneColumnCAData(buildGraphData(result.datas[0]));
}else{
setOneColumnCAData(null);
}
})
};
const fullTreeData=(node)=>{
if(node){
node.body=node.columns.toString();
for(let i in node.children){
node.children[i] = fullTreeData(node.children[i])
}
return node;
}
return null;
};
return (
<Tabs defaultActiveKey="OneTableCA" size="small" tabPosition="left" >
<TabPane
tab={
<span>
单任务表级血缘
</span>
}
key="OneTableCA"
>
<div>
<div>
<Tooltip title="重新计算血缘">
<Button <Button
type="text" type="text"
icon={<SearchOutlined />} icon={<SearchOutlined />}
onClick={getOneTableCA} onClick={handleLineage}
/>
</Tooltip>
</div>
{oneTableCAData!=null?<FlowAnalysisGraph {...config} />:<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />}
</div>
</TabPane>
{/*<TabPane
tab={
<span>
单任务字段级血缘
</span>
}
key="OneColumnCA"
> >
<div> 计算血缘
<div style={{float: "left"}}> </Button>
<Tooltip title="重新计算血缘"> </Tooltip>}
<Button
type="text"
icon={<SearchOutlined />}
onClick={getOneColumnCA}
/>
</Tooltip>
</div>
{oneColumnCAData!=null?<IndentedTreeGraph {...columnConfig} />:<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />}
</div>
</TabPane>*/}
{/*<TabPane
tab={
<span>
全局表级血缘
</span>
}
key="AllTableCA"
> >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} /> <TabPane tab={<span>血缘分析</span>} key="Lineage">
<Lineage datas={data}/>
</TabPane> </TabPane>
<TabPane
tab={
<span>
全局字段级血缘
</span>
}
key="AllColumnCA"
>
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane>*/}
</Tabs> </Tabs>
); </>)
}; };
export default connect(({ Studio }: { Studio: StateType }) => ({ export default connect(({ Studio }: { Studio: StateType }) => ({
......
import {Tabs, Empty} from 'antd'; import {Tabs, Empty} from 'antd';
import CodeShow from "@/components/Common/CodeShow"; import {getLineage} from "@/pages/DevOps/service";
import {LineageTable} from 'react-lineage-dag'; import {useEffect, useState} from "react";
import Lineage, {getInit} from "@/components/Lineage";
const {TabPane} = Tabs; const {TabPane} = Tabs;
const DataMap = (props: any) => { const DataMap = (props: any) => {
const {job} = props; const {job} = props;
const [data, setData] = useState(getInit());
const data = { const getData = () => {
tables: [ const res = getLineage(job.instance?.id);
{ res.then((result)=>{
id: '1', result.datas.tables.forEach(table => {
name: 'table-1', table.isExpand = true;
columns: [ table.isFold = false;
{ });
name: 'id', setData(result.datas);
title: 'id' });
},
{
name: 'age',
title: 'age'
}
]
},
{
id: '2',
name: 'table-2',
columns: [
{
name: 'id',
title: 'id'
},
{
name: 'age',
title: 'age'
}
]
},
{
id: '3',
name: 'table-3',
columns: [
{
name: 'id',
title: 'id'
},
{
name: 'age',
title: 'age'
}
]
}
],
relations: [
{
srcTableId: '1',
tgtTableId: '2',
// srcTableColName: 'id',
// tgtTableColName: 'age'
},
{
srcTableId: '1',
tgtTableId: '3',
// srcTableColName: 'id',
// tgtTableColName: 'age'
}
]
}; };
useEffect(() => {
getData();
}, []);
return (<> return (<>
<Tabs defaultActiveKey="OneCA" size="small" tabPosition="top" style={{ <Tabs defaultActiveKey="Lineage" size="small" tabPosition="top" style={{
border: "1px solid #f0f0f0" border: "1px solid #f0f0f0"
}}> }}>
<TabPane tab={<span>血缘分析</span>} key="OneCA"> <TabPane tab={<span>血缘分析</span>} key="Lineage">
<LineageTable {...data} onEachFrame={() => { }}/> <Lineage datas={data}/>
</TabPane> </TabPane>
</Tabs> </Tabs>
</>) </>)
......
...@@ -11,3 +11,7 @@ export function getJobInfoDetail(id: number) { ...@@ -11,3 +11,7 @@ export function getJobInfoDetail(id: number) {
export function refreshJobInfoDetail(id: number) { export function refreshJobInfoDetail(id: number) {
return getData("api/jobInstance/refreshJobInfoDetail",{id}); return getData("api/jobInstance/refreshJobInfoDetail",{id});
} }
export function getLineage(id: number) {
return getData("api/jobInstance/getLineage",{id});
}
...@@ -64,8 +64,8 @@ export async function getCatalogueTreeData(params?: StudioParam) { ...@@ -64,8 +64,8 @@ export async function getCatalogueTreeData(params?: StudioParam) {
}); });
} }
export async function getCAByStatement(params: CAParam) { export async function getLineage(params: CAParam) {
return request<API.Result>('/api/studio/getCAByStatement', { return request<API.Result>('/api/studio/getLineage', {
method: 'POST', method: 'POST',
data: { data: {
...params, ...params,
......
...@@ -764,6 +764,12 @@ export default (): React.ReactNode => { ...@@ -764,6 +764,12 @@ export default (): React.ReactNode => {
<li> <li>
<Link>修复 kubernetes集群配置相关显示bug</Link> <Link>修复 kubernetes集群配置相关显示bug</Link>
</li> </li>
<li>
<Link>新增 运维中心血缘分析——字段级</Link>
</li>
<li>
<Link>优化 Studio血缘分析为字段级</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment