Unverified Commit 8d6a9486 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-318][core] Add column lineage from db sql

[Feature-318][core] Add column lineage from db sql
parents 6a862447 4d87bbd8
......@@ -16,4 +16,5 @@ public class StudioCADTO extends AbstractStatementDTO {
private Boolean statementSet;
private Integer type;
private String dialect;
private Integer databaseId;
}
......@@ -257,10 +257,17 @@ public class StudioServiceImpl implements StudioService {
@Override
public LineageResult getLineage(StudioCADTO studioCADTO) {
if (Asserts.isNotNullString(studioCADTO.getDialect()) && !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) {
if(Asserts.isNull(studioCADTO.getDatabaseId())){
return null;
}
DataBase dataBase = dataBaseService.getById(studioCADTO.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return null;
}
if(studioCADTO.getDialect().equalsIgnoreCase("doris")){
return com.dlink.explainer.sqlLineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(),"mysql");
return com.dlink.explainer.sqlLineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql", dataBase.getDriverConfig());
} else {
return com.dlink.explainer.sqlLineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(),studioCADTO.getDialect().toLowerCase());
return com.dlink.explainer.sqlLineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
}
} else {
addFlinkSQLEnv(studioCADTO);
......
......@@ -11,6 +11,9 @@ import com.dlink.assertion.Asserts;
import com.dlink.explainer.lineage.LineageRelation;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.explainer.lineage.LineageTable;
import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.driver.DriverConfig;
import com.dlink.model.Column;
import java.util.*;
......@@ -122,30 +125,41 @@ public class LineageBuilder {
return LineageResult.build(tables, relations);
}
public static LineageResult getSqlLineage(String statement,String type) {
public static LineageResult getSqlLineage(String statement, String type, DriverConfig driverConfig) {
List<LineageTable> tables = new ArrayList<>();
List<LineageRelation> relations = new ArrayList<>();
Map<Integer, List<List<TableStat.Column>>> srcMap = new HashMap<>();
Map<Integer, List<TableStat.Column>> tgtMap = new HashMap<>();
Map<String, String> tableMap = new HashMap<>();
List<TableStat.Column> allColumnList = new ArrayList<>();
String[] sqls = statement.split(";");
try {
List<SQLStatement> sqlStatements = SQLUtils.parseStatements(statement.toLowerCase(), type);
List<SQLStatement> sqlStatements = SQLUtils.parseStatements(statement, type);
for (int n = 0; n < sqlStatements.size(); n++) {
SQLStatement sqlStatement = sqlStatements.get(n);
List<List<TableStat.Column>> srcLists = new ArrayList<>();
List<TableStat.Column> tgtList = new ArrayList<>();
//只考虑insert语句
if (sqlStatement instanceof SQLInsertStatement) {
String targetTable = ((SQLInsertStatement) sqlStatement).getTableName().toString();
String targetTable = ((SQLInsertStatement) sqlStatement).getTableName().toString().replace("`", "").replace("\"", "");
List<SQLExpr> columns = ((SQLInsertStatement) sqlStatement).getColumns();
//处理target表中字段
for (SQLExpr column : columns) {
if (column instanceof SQLPropertyExpr) {
tgtList.add(new TableStat.Column(targetTable, ((SQLPropertyExpr) column).getName().replace("`", "").replace("\"", "")));
} else if (column instanceof SQLIdentifierExpr) {
tgtList.add(new TableStat.Column(targetTable, ((SQLIdentifierExpr) column).getName().replace("`", "").replace("\"", "")));
if (columns.size() <= 0 || sqls[n].contains("*")) {
Driver driver = Driver.build(driverConfig);
if(!targetTable.contains(".")){
return null;
}
List<Column> columns1 = driver.listColumns(targetTable.split("\\.")[0], targetTable.split("\\.")[1]);
for (Column column : columns1) {
tgtList.add(new TableStat.Column(targetTable, column.getName()));
}
} else {
for (SQLExpr column : columns) {
if (column instanceof SQLPropertyExpr) {
tgtList.add(new TableStat.Column(targetTable, ((SQLPropertyExpr) column).getName().replace("`", "").replace("\"", "")));
} else if (column instanceof SQLIdentifierExpr) {
tgtList.add(new TableStat.Column(targetTable, ((SQLIdentifierExpr) column).getName().replace("`", "").replace("\"", "")));
}
}
}
//处理select 生成srcLists
......
......@@ -19,6 +19,7 @@ const StudioCA = (props: any) => {
statement:current.value,
statementSet:current.task.statementSet,
dialect:current.task.dialect,
databaseId:current.task.databaseId,
type: 1,
});
res.then((result)=>{
......
......@@ -48,4 +48,5 @@ export type CAParam = {
statementSet: boolean,
type: number,
dialect?: string,
databaseId?: number,
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment