Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
841296da
Unverified
Commit
841296da
authored
May 27, 2022
by
aiwenmo
Committed by
GitHub
May 27, 2022
Browse files
Options
Browse Files
Download
Plain Diff
[Feature-318][core] Add column lineage from db sql
[Feature-318][core] Add column lineage from db sql
parents
600c9ad4
a9f34dd0
Changes
12
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
1060 additions
and
4 deletions
+1060
-4
StudioController.java
.../src/main/java/com/dlink/controller/StudioController.java
+2
-0
StudioCADTO.java
dlink-admin/src/main/java/com/dlink/dto/StudioCADTO.java
+1
-0
StudioServiceImpl.java
...c/main/java/com/dlink/service/impl/StudioServiceImpl.java
+10
-2
pom.xml
dlink-core/pom.xml
+6
-1
LineageBuilder.java
...n/java/com/dlink/explainer/sqlLineage/LineageBuilder.java
+246
-0
LineageColumn.java
...in/java/com/dlink/explainer/sqlLineage/LineageColumn.java
+144
-0
LineageUtils.java
...ain/java/com/dlink/explainer/sqlLineage/LineageUtils.java
+390
-0
TreeNode.java
...rc/main/java/com/dlink/explainer/sqlLineage/TreeNode.java
+194
-0
TreeNodeIterator.java
...java/com/dlink/explainer/sqlLineage/TreeNodeIterator.java
+65
-0
index.tsx
dlink-web/src/components/Lineage/index.tsx
+0
-1
index.tsx
...eb/src/components/Studio/StudioConsole/StudioCA/index.tsx
+1
-0
data.d.ts
dlink-web/src/components/Studio/StudioEdit/data.d.ts
+1
-0
No files found.
dlink-admin/src/main/java/com/dlink/controller/StudioController.java
View file @
841296da
package
com
.
dlink
.
controller
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.common.result.Result
;
import
com.dlink.dto.SessionDTO
;
import
com.dlink.dto.StudioCADTO
;
import
com.dlink.dto.StudioDDLDTO
;
import
com.dlink.dto.StudioExecuteDTO
;
import
com.dlink.explainer.lineage.LineageResult
;
import
com.dlink.job.JobResult
;
import
com.dlink.result.IResult
;
import
com.dlink.service.StudioService
;
...
...
dlink-admin/src/main/java/com/dlink/dto/StudioCADTO.java
View file @
841296da
...
...
@@ -15,4 +15,5 @@ public class StudioCADTO extends AbstractStatementDTO {
// It's useless for the time being
private
Boolean
statementSet
;
private
Integer
type
;
private
String
dialect
;
}
dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java
View file @
841296da
...
...
@@ -256,8 +256,16 @@ public class StudioServiceImpl implements StudioService {
@Override
public
LineageResult
getLineage
(
StudioCADTO
studioCADTO
)
{
addFlinkSQLEnv
(
studioCADTO
);
return
LineageBuilder
.
getLineage
(
studioCADTO
.
getStatement
(),
studioCADTO
.
getStatementSet
());
if
(
Asserts
.
isNotNullString
(
studioCADTO
.
getDialect
())
&&
!
studioCADTO
.
getDialect
().
equalsIgnoreCase
(
"flinksql"
))
{
if
(
studioCADTO
.
getDialect
().
equalsIgnoreCase
(
"doris"
)){
return
com
.
dlink
.
explainer
.
sqlLineage
.
LineageBuilder
.
getSqlLineage
(
studioCADTO
.
getStatement
(),
"mysql"
);
}
else
{
return
com
.
dlink
.
explainer
.
sqlLineage
.
LineageBuilder
.
getSqlLineage
(
studioCADTO
.
getStatement
(),
studioCADTO
.
getDialect
().
toLowerCase
());
}
}
else
{
addFlinkSQLEnv
(
studioCADTO
);
return
LineageBuilder
.
getLineage
(
studioCADTO
.
getStatement
(),
studioCADTO
.
getStatementSet
());
}
}
@Override
...
...
dlink-core/pom.xml
View file @
841296da
...
...
@@ -147,5 +147,10 @@
<scope>
${scope.runtime}
</scope>
</dependency>
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
druid-spring-boot-starter
</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
</project>
dlink-core/src/main/java/com/dlink/explainer/sqlLineage/LineageBuilder.java
0 → 100644
View file @
841296da
package
com
.
dlink
.
explainer
.
sqlLineage
;
import
com.alibaba.druid.sql.SQLUtils
;
import
com.alibaba.druid.sql.ast.SQLExpr
;
import
com.alibaba.druid.sql.ast.SQLStatement
;
import
com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr
;
import
com.alibaba.druid.sql.ast.expr.SQLPropertyExpr
;
import
com.alibaba.druid.sql.ast.statement.SQLInsertStatement
;
import
com.alibaba.druid.stat.TableStat
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.explainer.lineage.LineageRelation
;
import
com.dlink.explainer.lineage.LineageResult
;
import
com.dlink.explainer.lineage.LineageTable
;
import
java.util.*
;
public
class
LineageBuilder
{
public
static
LineageResult
getSqlLineageByOne
(
String
statement
,
String
type
)
{
List
<
LineageTable
>
tables
=
new
ArrayList
<>();
List
<
LineageRelation
>
relations
=
new
ArrayList
<>();
try
{
List
<
SQLStatement
>
sqlStatements
=
SQLUtils
.
parseStatements
(
statement
.
toLowerCase
(),
type
);
// 只考虑一条语句
SQLStatement
sqlStatement
=
sqlStatements
.
get
(
0
);
List
<
List
<
TableStat
.
Column
>>
srcLists
=
new
ArrayList
<>();
List
<
TableStat
.
Column
>
tgtList
=
new
ArrayList
<>();
//只考虑insert语句
if
(
sqlStatement
instanceof
SQLInsertStatement
)
{
String
targetTable
=
((
SQLInsertStatement
)
sqlStatement
).
getTableName
().
toString
();
List
<
SQLExpr
>
columns
=
((
SQLInsertStatement
)
sqlStatement
).
getColumns
();
//处理target表中字段
for
(
SQLExpr
column
:
columns
)
{
if
(
column
instanceof
SQLPropertyExpr
)
{
tgtList
.
add
(
new
TableStat
.
Column
(
targetTable
,
((
SQLPropertyExpr
)
column
).
getName
().
replace
(
"`"
,
""
).
replace
(
"\""
,
""
)));
}
else
if
(
column
instanceof
SQLIdentifierExpr
)
{
tgtList
.
add
(
new
TableStat
.
Column
(
targetTable
,
((
SQLIdentifierExpr
)
column
).
getName
().
replace
(
"`"
,
""
).
replace
(
"\""
,
""
)));
}
}
//处理select 生成srcLists
LineageColumn
root
=
new
LineageColumn
();
TreeNode
<
LineageColumn
>
rootNode
=
new
TreeNode
<>(
root
);
LineageUtils
.
columnLineageAnalyzer
(((
SQLInsertStatement
)
sqlStatement
).
getQuery
().
toString
(),
type
,
rootNode
);
for
(
TreeNode
<
LineageColumn
>
e
:
rootNode
.
getChildren
())
{
Set
<
LineageColumn
>
leafNodes
=
e
.
getAllLeafData
();
List
<
TableStat
.
Column
>
srcList
=
new
ArrayList
<>();
for
(
LineageColumn
column
:
leafNodes
)
{
String
tableName
=
Asserts
.
isNotNullString
(
column
.
getSourceTableName
())
?
(
Asserts
.
isNotNullString
(
column
.
getSourceDbName
())
?
column
.
getSourceDbName
()
+
"."
+
column
.
getSourceTableName
()
:
column
.
getSourceTableName
())
:
""
;
srcList
.
add
(
new
TableStat
.
Column
(
tableName
,
column
.
getTargetColumnName
()));
}
srcLists
.
add
(
srcList
);
}
// 构建 List<LineageTable>
Map
<
String
,
String
>
tableMap
=
new
HashMap
<>();
List
<
TableStat
.
Column
>
allColumnList
=
new
ArrayList
<>();
int
tid
=
100
;
for
(
TableStat
.
Column
column
:
tgtList
)
{
if
(
Asserts
.
isNotNullString
(
column
.
getTable
())
&&
!
tableMap
.
containsKey
(
column
.
getTable
()))
{
tableMap
.
put
(
column
.
getTable
(),
String
.
valueOf
(
tid
++));
}
}
for
(
List
<
TableStat
.
Column
>
columnList
:
srcLists
)
{
allColumnList
.
addAll
(
columnList
);
for
(
TableStat
.
Column
column
:
columnList
)
{
if
(
Asserts
.
isNotNullString
(
column
.
getTable
())
&&
!
tableMap
.
containsKey
(
column
.
getTable
()))
{
tableMap
.
put
(
column
.
getTable
(),
String
.
valueOf
(
tid
++));
}
}
}
allColumnList
.
addAll
(
tgtList
);
for
(
String
tableName
:
tableMap
.
keySet
())
{
LineageTable
table
=
new
LineageTable
();
table
.
setId
(
tableMap
.
get
(
tableName
));
table
.
setName
(
tableName
);
List
<
com
.
dlink
.
explainer
.
lineage
.
LineageColumn
>
tableColumns
=
new
ArrayList
<>();
Set
<
String
>
tableSet
=
new
HashSet
<>();
for
(
TableStat
.
Column
column
:
allColumnList
)
{
if
(
tableName
.
equals
(
column
.
getTable
())
&&
!
tableSet
.
contains
(
column
.
getName
()))
{
tableColumns
.
add
(
new
com
.
dlink
.
explainer
.
lineage
.
LineageColumn
(
column
.
getName
(),
column
.
getName
()));
tableSet
.
add
(
column
.
getName
());
}
}
table
.
setColumns
(
tableColumns
);
tables
.
add
(
table
);
}
// 构建 LineageRelation
int
tSize
=
tgtList
.
size
();
int
sSize
=
srcLists
.
size
();
if
(
tSize
!=
sSize
&&
tSize
*
2
!=
sSize
)
{
System
.
out
.
println
(
"出现字段位数不相等错误"
);
return
null
;
}
for
(
int
i
=
0
;
i
<
tSize
;
i
++)
{
for
(
TableStat
.
Column
column
:
srcLists
.
get
(
i
))
{
if
(
Asserts
.
isNotNullString
(
column
.
getTable
()))
{
relations
.
add
(
LineageRelation
.
build
(
i
+
""
,
tableMap
.
get
(
column
.
getTable
()),
tableMap
.
get
(
tgtList
.
get
(
i
).
getTable
()),
column
.
getName
(),
tgtList
.
get
(
i
).
getName
()));
}
}
if
(
tSize
*
2
==
sSize
)
{
for
(
TableStat
.
Column
column
:
srcLists
.
get
(
i
+
tSize
))
{
if
(
Asserts
.
isNotNullString
(
column
.
getTable
()))
{
relations
.
add
(
LineageRelation
.
build
((
i
+
tSize
)
+
""
,
tableMap
.
get
(
column
.
getTable
()),
tableMap
.
get
(
tgtList
.
get
(
i
).
getTable
()),
column
.
getName
(),
tgtList
.
get
(
i
).
getName
()));
}
}
}
}
}
else
{
return
null
;
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
return
null
;
}
return
LineageResult
.
build
(
tables
,
relations
);
}
public
static
LineageResult
getSqlLineage
(
String
statement
,
String
type
)
{
List
<
LineageTable
>
tables
=
new
ArrayList
<>();
List
<
LineageRelation
>
relations
=
new
ArrayList
<>();
Map
<
Integer
,
List
<
List
<
TableStat
.
Column
>>>
srcMap
=
new
HashMap
<>();
Map
<
Integer
,
List
<
TableStat
.
Column
>>
tgtMap
=
new
HashMap
<>();
Map
<
String
,
String
>
tableMap
=
new
HashMap
<>();
List
<
TableStat
.
Column
>
allColumnList
=
new
ArrayList
<>();
try
{
List
<
SQLStatement
>
sqlStatements
=
SQLUtils
.
parseStatements
(
statement
.
toLowerCase
(),
type
);
for
(
int
n
=
0
;
n
<
sqlStatements
.
size
();
n
++)
{
SQLStatement
sqlStatement
=
sqlStatements
.
get
(
n
);
List
<
List
<
TableStat
.
Column
>>
srcLists
=
new
ArrayList
<>();
List
<
TableStat
.
Column
>
tgtList
=
new
ArrayList
<>();
//只考虑insert语句
if
(
sqlStatement
instanceof
SQLInsertStatement
)
{
String
targetTable
=
((
SQLInsertStatement
)
sqlStatement
).
getTableName
().
toString
();
List
<
SQLExpr
>
columns
=
((
SQLInsertStatement
)
sqlStatement
).
getColumns
();
//处理target表中字段
for
(
SQLExpr
column
:
columns
)
{
if
(
column
instanceof
SQLPropertyExpr
)
{
tgtList
.
add
(
new
TableStat
.
Column
(
targetTable
,
((
SQLPropertyExpr
)
column
).
getName
().
replace
(
"`"
,
""
).
replace
(
"\""
,
""
)));
}
else
if
(
column
instanceof
SQLIdentifierExpr
)
{
tgtList
.
add
(
new
TableStat
.
Column
(
targetTable
,
((
SQLIdentifierExpr
)
column
).
getName
().
replace
(
"`"
,
""
).
replace
(
"\""
,
""
)));
}
}
//处理select 生成srcLists
LineageColumn
root
=
new
LineageColumn
();
TreeNode
<
LineageColumn
>
rootNode
=
new
TreeNode
<>(
root
);
LineageUtils
.
columnLineageAnalyzer
(((
SQLInsertStatement
)
sqlStatement
).
getQuery
().
toString
(),
type
,
rootNode
);
for
(
TreeNode
<
LineageColumn
>
e
:
rootNode
.
getChildren
())
{
Set
<
LineageColumn
>
leafNodes
=
e
.
getAllLeafData
();
List
<
TableStat
.
Column
>
srcList
=
new
ArrayList
<>();
for
(
LineageColumn
column
:
leafNodes
)
{
String
tableName
=
Asserts
.
isNotNullString
(
column
.
getSourceTableName
())
?
(
Asserts
.
isNotNullString
(
column
.
getSourceDbName
())
?
column
.
getSourceDbName
()
+
"."
+
column
.
getSourceTableName
()
:
column
.
getSourceTableName
())
:
""
;
srcList
.
add
(
new
TableStat
.
Column
(
tableName
,
column
.
getTargetColumnName
()));
}
srcLists
.
add
(
srcList
);
}
srcMap
.
put
(
n
,
srcLists
);
tgtMap
.
put
(
n
,
tgtList
);
}
else
{
return
null
;
}
}
// 构建 List<LineageTable>
int
tid
=
100
;
for
(
Integer
i
:
tgtMap
.
keySet
())
{
allColumnList
.
addAll
(
tgtMap
.
get
(
i
));
for
(
TableStat
.
Column
column
:
tgtMap
.
get
(
i
))
{
if
(
Asserts
.
isNotNullString
(
column
.
getTable
())
&&
!
tableMap
.
containsKey
(
column
.
getTable
()))
{
tableMap
.
put
(
column
.
getTable
(),
String
.
valueOf
(
tid
++));
}
}
}
for
(
Integer
i
:
srcMap
.
keySet
())
{
for
(
List
<
TableStat
.
Column
>
columnList
:
srcMap
.
get
(
i
))
{
allColumnList
.
addAll
(
columnList
);
for
(
TableStat
.
Column
column
:
columnList
)
{
if
(
Asserts
.
isNotNullString
(
column
.
getTable
())
&&
!
tableMap
.
containsKey
(
column
.
getTable
()))
{
tableMap
.
put
(
column
.
getTable
(),
String
.
valueOf
(
tid
++));
}
}
}
}
for
(
String
tableName
:
tableMap
.
keySet
())
{
LineageTable
table
=
new
LineageTable
();
table
.
setId
(
tableMap
.
get
(
tableName
));
table
.
setName
(
tableName
);
List
<
com
.
dlink
.
explainer
.
lineage
.
LineageColumn
>
tableColumns
=
new
ArrayList
<>();
Set
<
String
>
tableSet
=
new
HashSet
<>();
for
(
TableStat
.
Column
column
:
allColumnList
)
{
if
(
tableName
.
equals
(
column
.
getTable
())
&&
!
tableSet
.
contains
(
column
.
getName
()))
{
tableColumns
.
add
(
new
com
.
dlink
.
explainer
.
lineage
.
LineageColumn
(
column
.
getName
(),
column
.
getName
()));
tableSet
.
add
(
column
.
getName
());
}
}
table
.
setColumns
(
tableColumns
);
tables
.
add
(
table
);
}
// 构建 LineageRelation
for
(
Integer
n
:
srcMap
.
keySet
())
{
List
<
List
<
TableStat
.
Column
>>
srcLists
=
srcMap
.
get
(
n
);
List
<
TableStat
.
Column
>
tgtList
=
tgtMap
.
get
(
n
);
int
tSize
=
tgtList
.
size
();
int
sSize
=
srcLists
.
size
();
if
(
tSize
!=
sSize
&&
tSize
*
2
!=
sSize
)
{
System
.
out
.
println
(
"出现字段位数不相等错误"
);
return
null
;
}
for
(
int
i
=
0
;
i
<
tSize
;
i
++)
{
for
(
TableStat
.
Column
column
:
srcLists
.
get
(
i
))
{
if
(
Asserts
.
isNotNullString
(
column
.
getTable
()))
{
relations
.
add
(
LineageRelation
.
build
(
n
+
"_"
+
i
,
tableMap
.
get
(
column
.
getTable
()),
tableMap
.
get
(
tgtList
.
get
(
i
).
getTable
()),
column
.
getName
(),
tgtList
.
get
(
i
).
getName
()));
}
}
if
(
tSize
*
2
==
sSize
)
{
for
(
TableStat
.
Column
column
:
srcLists
.
get
(
i
+
tSize
))
{
if
(
Asserts
.
isNotNullString
(
column
.
getTable
()))
{
relations
.
add
(
LineageRelation
.
build
(
n
+
"_"
+
(
i
+
tSize
),
tableMap
.
get
(
column
.
getTable
()),
tableMap
.
get
(
tgtList
.
get
(
i
).
getTable
()),
column
.
getName
(),
tgtList
.
get
(
i
).
getName
()));
}
}
}
}
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
return
null
;
}
return
LineageResult
.
build
(
tables
,
relations
);
}
}
dlink-core/src/main/java/com/dlink/explainer/sqlLineage/LineageColumn.java
0 → 100644
View file @
841296da
package
com
.
dlink
.
explainer
.
sqlLineage
;
import
com.dlink.assertion.Asserts
;
import
lombok.Data
;
@Data
public
class
LineageColumn
implements
Comparable
<
LineageColumn
>
{
public
String
getTargetColumnName
()
{
return
targetColumnName
;
}
public
void
setTargetColumnName
(
String
targetColumnName
)
{
this
.
targetColumnName
=
targetColumnName
;
}
private
String
targetColumnName
;
private
String
sourceDbName
;
public
String
getSourceDbName
()
{
return
sourceDbName
;
}
public
void
setSourceDbName
(
String
sourceDbName
)
{
this
.
sourceDbName
=
sourceDbName
;
}
public
String
getSourceTableName
()
{
return
sourceTableName
;
}
public
String
getSourceColumnName
()
{
return
sourceColumnName
;
}
public
void
setSourceColumnName
(
String
sourceColumnName
)
{
this
.
sourceColumnName
=
sourceColumnName
;
}
private
String
sourceTableName
;
private
String
sourceColumnName
;
public
String
getExpression
()
{
return
expression
;
}
public
void
setExpression
(
String
expression
)
{
this
.
expression
=
expression
;
}
private
String
expression
;
public
Boolean
getIsEnd
()
{
return
isEnd
;
}
public
void
setIsEnd
(
Boolean
end
)
{
isEnd
=
end
;
}
private
Boolean
isEnd
=
false
;
public
void
setSourceTableName
(
String
sourceTableName
)
{
sourceTableName
=
Asserts
.
isNotNullString
(
sourceTableName
)
?
sourceTableName
.
replace
(
"`"
,
""
)
:
sourceTableName
;
if
(
sourceTableName
.
contains
(
" "
)){
sourceTableName
=
sourceTableName
.
substring
(
0
,
sourceTableName
.
indexOf
(
" "
));
}
if
(
sourceTableName
.
contains
(
"."
))
{
if
(
Asserts
.
isNullString
(
this
.
sourceDbName
)){
this
.
sourceDbName
=
sourceTableName
.
substring
(
0
,
sourceTableName
.
indexOf
(
"."
));
}
// this.sourceDbName = sourceTableName.substring(0, sourceTableName.indexOf("."));
this
.
sourceTableName
=
sourceTableName
.
substring
(
sourceTableName
.
indexOf
(
"."
)
+
1
);
}
else
{
this
.
sourceTableName
=
sourceTableName
;
}
}
public
int
compareTo
(
LineageColumn
o
)
{
if
(
Asserts
.
isNotNullString
(
this
.
getSourceDbName
())&&
Asserts
.
isNotNullString
(
this
.
getSourceTableName
())){
if
(
this
.
getSourceDbName
().
equals
(
o
.
getSourceDbName
())&&
this
.
getSourceTableName
().
equals
(
o
.
getSourceTableName
())&&
this
.
getTargetColumnName
().
equals
(
o
.
getTargetColumnName
())){
return
0
;
}
}
else
if
(
Asserts
.
isNotNullString
(
this
.
getSourceTableName
())){
if
(
this
.
getSourceTableName
().
equals
(
o
.
getSourceTableName
())&&
this
.
getTargetColumnName
().
equals
(
o
.
getTargetColumnName
())){
return
0
;
}
}
else
{
if
(
this
.
getTargetColumnName
().
equals
(
o
.
getTargetColumnName
()))
{
return
0
;
}
}
return
-
1
;
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
{
return
true
;
}
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
{
return
false
;
}
LineageColumn
myColumn
=
(
LineageColumn
)
o
;
if
(!
this
.
getTargetColumnName
().
equals
(
myColumn
.
getTargetColumnName
()))
{
return
false
;
}
if
(
Asserts
.
isNotNullString
(
sourceTableName
)
&&
!
sourceTableName
.
equals
(
myColumn
.
sourceTableName
))
{
return
false
;
}
if
(
Asserts
.
isNotNullString
(
sourceColumnName
))
{
return
sourceColumnName
.
equals
(
myColumn
.
sourceColumnName
);
}
return
true
;
}
@Override
public
int
hashCode
()
{
int
result
=
getTargetColumnName
().
hashCode
();
if
(
Asserts
.
isNotNullString
(
sourceTableName
))
{
result
=
31
*
result
+
sourceTableName
.
hashCode
();
}
if
(
Asserts
.
isNotNullString
(
sourceColumnName
))
{
result
=
31
*
result
+
sourceColumnName
.
hashCode
();
}
if
(
Asserts
.
isNotNullString
(
sourceDbName
))
{
result
=
31
*
result
+
sourceDbName
.
hashCode
();
}
return
result
;
}
}
dlink-core/src/main/java/com/dlink/explainer/sqlLineage/LineageUtils.java
0 → 100644
View file @
841296da
package
com
.
dlink
.
explainer
.
sqlLineage
;
import
com.alibaba.druid.sql.SQLUtils
;
import
com.alibaba.druid.sql.ast.SQLExpr
;
import
com.alibaba.druid.sql.ast.SQLStatement
;
import
com.alibaba.druid.sql.ast.expr.*
;
import
com.alibaba.druid.sql.ast.statement.*
;
import
com.dlink.assertion.Asserts
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.concurrent.atomic.AtomicReference
;
public
class
LineageUtils
{
public
static
void
columnLineageAnalyzer
(
String
sql
,
String
type
,
TreeNode
<
LineageColumn
>
node
)
{
if
(
Asserts
.
isNullString
(
sql
))
{
return
;
}
AtomicReference
<
Boolean
>
isContinue
=
new
AtomicReference
<>(
false
);
List
<
SQLStatement
>
statements
=
new
ArrayList
<>();
// 解析
try
{
statements
=
SQLUtils
.
parseStatements
(
sql
,
type
);
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"can't parser by druid "
+
type
+
e
);
}
// 只考虑一条语句
SQLStatement
statement
=
statements
.
get
(
0
);
// 只考虑查询语句
SQLSelectStatement
sqlSelectStatement
=
(
SQLSelectStatement
)
statement
;
SQLSelectQuery
sqlSelectQuery
=
sqlSelectStatement
.
getSelect
().
getQuery
();
// 非union的查询语句
if
(
sqlSelectQuery
instanceof
SQLSelectQueryBlock
)
{
SQLSelectQueryBlock
sqlSelectQueryBlock
=
(
SQLSelectQueryBlock
)
sqlSelectQuery
;
// 获取字段列表
List
<
SQLSelectItem
>
selectItems
=
sqlSelectQueryBlock
.
getSelectList
();
selectItems
.
forEach
(
x
->
{
// 处理---------------------
String
column
=
Asserts
.
isNullString
(
x
.
getAlias
())
?
x
.
toString
()
:
x
.
getAlias
();
if
(
column
.
contains
(
"."
))
{
column
=
column
.
substring
(
column
.
indexOf
(
"."
)
+
1
);
}
column
=
column
.
replace
(
"`"
,
""
).
replace
(
"\""
,
""
);
String
expr
=
x
.
getExpr
().
toString
();
LineageColumn
myColumn
=
new
LineageColumn
();
myColumn
.
setTargetColumnName
(
column
);
myColumn
.
setExpression
(
expr
);
TreeNode
<
LineageColumn
>
itemNode
=
new
TreeNode
<>(
myColumn
);
SQLExpr
expr1
=
x
.
getExpr
();
// 解析表达式,添加解析结果子节点
handlerExpr
(
expr1
,
itemNode
);
if
(
node
.
getLevel
()
==
0
||
node
.
getData
().
getTargetColumnName
().
equals
(
column
))
{
node
.
addChild
(
itemNode
);
isContinue
.
set
(
true
);
}
});
if
(
isContinue
.
get
())
{
// 获取表
SQLTableSource
table
=
sqlSelectQueryBlock
.
getFrom
();
// 普通单表
if
(
table
instanceof
SQLExprTableSource
)
{
// 处理最终表---------------------
handlerSQLExprTableSource
(
node
,
(
SQLExprTableSource
)
table
);
}
else
if
(
table
instanceof
SQLJoinTableSource
)
{
// 处理join
handlerSQLJoinTableSource
(
node
,
(
SQLJoinTableSource
)
table
,
type
);
}
else
if
(
table
instanceof
SQLSubqueryTableSource
)
{
// 处理 subquery ---------------------
handlerSQLSubqueryTableSource
(
node
,
table
,
type
);
}
else
if
(
table
instanceof
SQLUnionQueryTableSource
)
{
// 处理 union ---------------------
handlerSQLUnionQueryTableSource
(
node
,
(
SQLUnionQueryTableSource
)
table
,
type
);
}
}
// 处理---------------------
// union的查询语句
}
else
if
(
sqlSelectQuery
instanceof
SQLUnionQuery
)
{
// 处理---------------------
columnLineageAnalyzer
(((
SQLUnionQuery
)
sqlSelectQuery
).
getLeft
().
toString
(),
type
,
node
);
columnLineageAnalyzer
(((
SQLUnionQuery
)
sqlSelectQuery
).
getRight
().
toString
(),
type
,
node
);
}
}
/**
* 处理UNION子句
*
* @param node
* @param table
*/
private
static
void
handlerSQLUnionQueryTableSource
(
TreeNode
<
LineageColumn
>
node
,
SQLUnionQueryTableSource
table
,
String
type
)
{
node
.
getAllLeafs
().
stream
().
filter
(
e
->
!
e
.
getData
().
getIsEnd
()).
forEach
(
e
->
{
columnLineageAnalyzer
(
table
.
getUnion
().
toString
(),
type
,
e
);
});
}
/**
* 处理sub子句
*
* @param node
* @param table
*/
private
static
void
handlerSQLSubqueryTableSource
(
TreeNode
<
LineageColumn
>
node
,
SQLTableSource
table
,
String
type
)
{
node
.
getAllLeafs
().
stream
().
filter
(
e
->
!
e
.
getData
().
getIsEnd
()).
forEach
(
e
->
{
if
(
Asserts
.
isNotNullString
(
e
.
getData
().
getSourceTableName
()))
{
if
(
e
.
getData
().
getSourceTableName
().
equals
(
table
.
getAlias
()))
{
columnLineageAnalyzer
(((
SQLSubqueryTableSource
)
table
).
getSelect
().
toString
(),
type
,
e
);
}
}
else
{
columnLineageAnalyzer
(((
SQLSubqueryTableSource
)
table
).
getSelect
().
toString
(),
type
,
e
);
}
});
}
/**
* 处理JOIN
*
* @param node
* @param table
*/
private
static
void
handlerSQLJoinTableSource
(
TreeNode
<
LineageColumn
>
node
,
SQLJoinTableSource
table
,
String
type
)
{
// 处理---------------------
// 子查询作为表
node
.
getAllLeafs
().
stream
().
filter
(
e
->
!
e
.
getData
().
getIsEnd
()).
forEach
(
e
->
{
if
(
table
.
getLeft
()
instanceof
SQLJoinTableSource
)
{
handlerSQLJoinTableSource
(
node
,
(
SQLJoinTableSource
)
table
.
getLeft
(),
type
);
}
else
if
(
table
.
getLeft
()
instanceof
SQLExprTableSource
)
{
handlerSQLExprTableSource
(
node
,
(
SQLExprTableSource
)
table
.
getLeft
());
}
else
if
(
table
.
getLeft
()
instanceof
SQLSubqueryTableSource
)
{
// 处理---------------------
handlerSQLSubqueryTableSource
(
node
,
table
.
getLeft
(),
type
);
}
else
if
(
table
.
getLeft
()
instanceof
SQLUnionQueryTableSource
)
{
// 处理---------------------
handlerSQLUnionQueryTableSource
(
node
,
(
SQLUnionQueryTableSource
)
table
.
getLeft
(),
type
);
}
});
node
.
getAllLeafs
().
stream
().
filter
(
e
->
!
e
.
getData
().
getIsEnd
()).
forEach
(
e
->
{
if
(
table
.
getRight
()
instanceof
SQLJoinTableSource
)
{
handlerSQLJoinTableSource
(
node
,
(
SQLJoinTableSource
)
table
.
getRight
(),
type
);
}
else
if
(
table
.
getRight
()
instanceof
SQLExprTableSource
)
{
handlerSQLExprTableSource
(
node
,
(
SQLExprTableSource
)
table
.
getRight
());
}
else
if
(
table
.
getRight
()
instanceof
SQLSubqueryTableSource
)
{
// 处理---------------------
handlerSQLSubqueryTableSource
(
node
,
table
.
getRight
(),
type
);
}
else
if
(
table
.
getRight
()
instanceof
SQLUnionQueryTableSource
)
{
// 处理---------------------
handlerSQLUnionQueryTableSource
(
node
,
(
SQLUnionQueryTableSource
)
table
.
getRight
(),
type
);
}
});
}
/**
* 处理最终表
*
* @param node
* @param table
*/
private
static
void
handlerSQLExprTableSource
(
TreeNode
<
LineageColumn
>
node
,
SQLExprTableSource
table
)
{
SQLExprTableSource
tableSource
=
table
;
String
tableName
=
tableSource
.
getExpr
()
instanceof
SQLPropertyExpr
?
((
SQLPropertyExpr
)
tableSource
.
getExpr
()).
getName
().
replace
(
"`"
,
""
).
replace
(
"\""
,
""
)
:
""
;
String
alias
=
Asserts
.
isNotNullString
(
tableSource
.
getAlias
())
?
tableSource
.
getAlias
().
replace
(
"`"
,
""
).
replace
(
"\""
,
""
)
:
""
;
node
.
getChildren
().
forEach
(
e
->
{
e
.
getChildren
().
forEach
(
f
->
{
if
(!
f
.
getData
().
getIsEnd
()
&&
(
f
.
getData
().
getSourceTableName
()
==
null
||
f
.
getData
().
getSourceTableName
().
equals
(
tableName
)
||
f
.
getData
().
getSourceTableName
().
equals
(
alias
)))
{
f
.
getData
().
setSourceTableName
(
tableSource
.
toString
());
f
.
getData
().
setIsEnd
(
true
);
f
.
getData
().
setExpression
(
e
.
getData
().
getExpression
());
}
});
});
}
/**
* 处理表达式
*
* @param sqlExpr
* @param itemNode
*/
private
static
void
handlerExpr
(
SQLExpr
sqlExpr
,
TreeNode
<
LineageColumn
>
itemNode
)
{
// 聚合
if
(
sqlExpr
instanceof
SQLAggregateExpr
)
{
visitSQLAggregateExpr
((
SQLAggregateExpr
)
sqlExpr
,
itemNode
);
}
// 方法
else
if
(
sqlExpr
instanceof
SQLMethodInvokeExpr
)
{
visitSQLMethodInvoke
((
SQLMethodInvokeExpr
)
sqlExpr
,
itemNode
);
}
// case
else
if
(
sqlExpr
instanceof
SQLCaseExpr
)
{
visitSQLCaseExpr
((
SQLCaseExpr
)
sqlExpr
,
itemNode
);
}
// 比较
else
if
(
sqlExpr
instanceof
SQLBinaryOpExpr
)
{
visitSQLBinaryOpExpr
((
SQLBinaryOpExpr
)
sqlExpr
,
itemNode
);
}
// 表达式
else
if
(
sqlExpr
instanceof
SQLPropertyExpr
)
{
visitSQLPropertyExpr
((
SQLPropertyExpr
)
sqlExpr
,
itemNode
);
}
// 列
else
if
(
sqlExpr
instanceof
SQLIdentifierExpr
)
{
visitSQLIdentifierExpr
((
SQLIdentifierExpr
)
sqlExpr
,
itemNode
);
}
// 赋值表达式
else
if
(
sqlExpr
instanceof
SQLIntegerExpr
)
{
visitSQLIntegerExpr
((
SQLIntegerExpr
)
sqlExpr
,
itemNode
);
}
// 数字
else
if
(
sqlExpr
instanceof
SQLNumberExpr
)
{
visitSQLNumberExpr
((
SQLNumberExpr
)
sqlExpr
,
itemNode
);
}
// 字符
else
if
(
sqlExpr
instanceof
SQLCharExpr
)
{
visitSQLCharExpr
((
SQLCharExpr
)
sqlExpr
,
itemNode
);
}
}
/**
* 方法
*
* @param expr
* @param node
*/
public
static
void
visitSQLMethodInvoke
(
SQLMethodInvokeExpr
expr
,
TreeNode
<
LineageColumn
>
node
)
{
if
(
expr
.
getArguments
().
size
()
==
0
)
{
// 计算表达式,没有更多列,结束循环
if
(
node
.
getData
().
getExpression
().
equals
(
expr
.
toString
()))
{
node
.
getData
().
setIsEnd
(
true
);
}
}
else
{
expr
.
getArguments
().
forEach
(
expr1
->
{
handlerExpr
(
expr1
,
node
);
});
}
}
/**
* 聚合
*
* @param expr
* @param node
*/
public
static
void
visitSQLAggregateExpr
(
SQLAggregateExpr
expr
,
TreeNode
<
LineageColumn
>
node
)
{
expr
.
getArguments
().
forEach
(
expr1
->
{
handlerExpr
(
expr1
,
node
);
});
}
/**
* 选择
*
* @param expr
* @param node
*/
public
static
void
visitSQLCaseExpr
(
SQLCaseExpr
expr
,
TreeNode
<
LineageColumn
>
node
)
{
handlerExpr
(
expr
.
getValueExpr
(),
node
);
expr
.
getItems
().
forEach
(
expr1
->
{
handlerExpr
(
expr1
.
getValueExpr
(),
node
);
});
handlerExpr
(
expr
.
getElseExpr
(),
node
);
}
/**
* 判断
*
* @param expr
* @param node
*/
public
static
void
visitSQLBinaryOpExpr
(
SQLBinaryOpExpr
expr
,
TreeNode
<
LineageColumn
>
node
)
{
handlerExpr
(
expr
.
getLeft
(),
node
);
handlerExpr
(
expr
.
getRight
(),
node
);
}
/**
* 表达式列
*
* @param expr
* @param node
*/
public
static
void
visitSQLPropertyExpr
(
SQLPropertyExpr
expr
,
TreeNode
<
LineageColumn
>
node
)
{
LineageColumn
project
=
new
LineageColumn
();
String
columnName
=
expr
.
getName
().
replace
(
"`"
,
""
).
replace
(
"\""
,
""
);
project
.
setTargetColumnName
(
columnName
);
project
.
setSourceTableName
(
expr
.
getOwner
().
toString
());
TreeNode
<
LineageColumn
>
search
=
node
.
findChildNode
(
project
);
if
(
Asserts
.
isNull
(
search
))
{
node
.
addChild
(
project
);
}
}
/**
* 列
*
* @param expr
* @param node
*/
public
static
void
visitSQLIdentifierExpr
(
SQLIdentifierExpr
expr
,
TreeNode
<
LineageColumn
>
node
)
{
LineageColumn
project
=
new
LineageColumn
();
project
.
setTargetColumnName
(
expr
.
getName
().
replace
(
"`"
,
""
).
replace
(
"\""
,
""
));
TreeNode
<
LineageColumn
>
search
=
node
.
findChildNode
(
project
);
if
(
Asserts
.
isNull
(
search
))
{
node
.
addChild
(
project
);
}
}
/**
* 整型赋值
*
* @param expr
* @param node
*/
public
static
void
visitSQLIntegerExpr
(
SQLIntegerExpr
expr
,
TreeNode
<
LineageColumn
>
node
)
{
LineageColumn
project
=
new
LineageColumn
();
project
.
setTargetColumnName
(
expr
.
getNumber
().
toString
());
// 常量不设置表信息
project
.
setSourceTableName
(
""
);
project
.
setIsEnd
(
true
);
TreeNode
<
LineageColumn
>
search
=
node
.
findChildNode
(
project
);
if
(
Asserts
.
isNull
(
search
))
{
node
.
addChild
(
project
);
}
}
/**
* 数字
*
* @param expr
* @param node
*/
public
static
void
visitSQLNumberExpr
(
SQLNumberExpr
expr
,
TreeNode
<
LineageColumn
>
node
)
{
LineageColumn
project
=
new
LineageColumn
();
project
.
setTargetColumnName
(
expr
.
getNumber
().
toString
());
// 常量不设置表信息
project
.
setSourceTableName
(
""
);
project
.
setIsEnd
(
true
);
TreeNode
<
LineageColumn
>
search
=
node
.
findChildNode
(
project
);
if
(
Asserts
.
isNull
(
search
))
{
node
.
addChild
(
project
);
}
}
/**
* 字符
*
* @param expr
* @param node
*/
public
static
void
visitSQLCharExpr
(
SQLCharExpr
expr
,
TreeNode
<
LineageColumn
>
node
)
{
LineageColumn
project
=
new
LineageColumn
();
project
.
setTargetColumnName
(
expr
.
toString
());
// 常量不设置表信息
project
.
setSourceTableName
(
""
);
project
.
setIsEnd
(
true
);
TreeNode
<
LineageColumn
>
search
=
node
.
findChildNode
(
project
);
if
(
Asserts
.
isNull
(
search
))
{
node
.
addChild
(
project
);
}
}
}
dlink-core/src/main/java/com/dlink/explainer/sqlLineage/TreeNode.java
0 → 100644
View file @
841296da
package
com
.
dlink
.
explainer
.
sqlLineage
;
import
java.util.*
;
public
class
TreeNode
<
T
>
implements
Iterable
<
TreeNode
<
T
>>
{
/**
* 树节点
*/
public
T
data
;
/**
* 父节点,根没有父节点
*/
public
TreeNode
<
T
>
parent
;
/**
* 子节点,叶子节点没有子节点
*/
public
List
<
TreeNode
<
T
>>
children
;
/**
* 保存了当前节点及其所有子节点,方便查询
*/
private
List
<
TreeNode
<
T
>>
elementsIndex
;
/**
* 构造函数
*
* @param data
*/
public
TreeNode
(
T
data
)
{
this
.
data
=
data
;
this
.
children
=
new
LinkedList
<
TreeNode
<
T
>>();
this
.
elementsIndex
=
new
LinkedList
<
TreeNode
<
T
>>();
this
.
elementsIndex
.
add
(
this
);
}
public
T
getData
()
{
return
data
;
}
public
List
<
TreeNode
<
T
>>
getChildren
()
{
return
children
;
}
/**
* 判断是否为根:根没有父节点
*
* @return
*/
public
boolean
isRoot
()
{
return
parent
==
null
;
}
/**
* 判断是否为叶子节点:子节点没有子节点
*
* @return
*/
public
boolean
isLeaf
()
{
return
children
.
size
()
==
0
;
}
/**
* 添加一个子节点
*
* @param child
* @return
*/
public
TreeNode
<
T
>
addChild
(
T
child
)
{
TreeNode
<
T
>
childNode
=
new
TreeNode
<
T
>(
child
);
childNode
.
parent
=
this
;
this
.
children
.
add
(
childNode
);
this
.
registerChildForSearch
(
childNode
);
return
childNode
;
}
public
TreeNode
<
T
>
addChild
(
TreeNode
childNode
)
{
childNode
.
parent
=
this
;
this
.
children
.
add
(
childNode
);
this
.
registerChildForSearch
(
childNode
);
return
childNode
;
}
/**
* 获取当前节点的层
*
* @return
*/
public
int
getLevel
()
{
if
(
this
.
isRoot
())
{
return
0
;
}
else
{
return
parent
.
getLevel
()
+
1
;
}
}
/**
* 递归为当前节点以及当前节点的所有父节点增加新的节点
*
* @param node
*/
private
void
registerChildForSearch
(
TreeNode
<
T
>
node
)
{
elementsIndex
.
add
(
node
);
if
(
parent
!=
null
)
{
parent
.
registerChildForSearch
(
node
);
}
}
/**
* 从当前节点及其所有子节点中搜索某节点
*
* @param cmp
* @return
*/
public
TreeNode
<
T
>
findTreeNode
(
Comparable
<
T
>
cmp
)
{
for
(
TreeNode
<
T
>
element
:
this
.
elementsIndex
)
{
T
elData
=
element
.
data
;
if
(
cmp
.
compareTo
(
elData
)
==
0
)
{
return
element
;
}
}
return
null
;
}
public
TreeNode
<
T
>
findChildNode
(
Comparable
<
T
>
cmp
)
{
for
(
TreeNode
<
T
>
element
:
this
.
getChildren
())
{
T
elData
=
element
.
data
;
if
(
cmp
.
compareTo
(
elData
)
==
0
)
{
return
element
;
}
}
return
null
;
}
/**
* 获取当前节点的迭代器
*
* @return
*/
public
Iterator
<
TreeNode
<
T
>>
iterator
()
{
TreeNodeIterator
<
T
>
iterator
=
new
TreeNodeIterator
<
T
>(
this
);
return
iterator
;
}
@Override
public
String
toString
()
{
return
data
!=
null
?
data
.
toString
()
:
"[tree data null]"
;
}
/**
* 获取所有叶子节点的数据
*
* @return
*/
public
Set
<
TreeNode
<
T
>>
getAllLeafs
()
{
Set
<
TreeNode
<
T
>>
leafNodes
=
new
HashSet
<
TreeNode
<
T
>>();
if
(
this
.
children
.
isEmpty
())
{
leafNodes
.
add
(
this
);
}
else
{
for
(
TreeNode
<
T
>
child
:
this
.
children
)
{
leafNodes
.
addAll
(
child
.
getAllLeafs
());
}
}
return
leafNodes
;
}
/**
* 获取所有叶子节点的数据
*
* @return
*/
public
Set
<
T
>
getAllLeafData
()
{
Set
<
T
>
leafNodes
=
new
HashSet
<
T
>();
if
(
this
.
children
.
isEmpty
())
{
leafNodes
.
add
(
this
.
data
);
}
else
{
for
(
TreeNode
<
T
>
child
:
this
.
children
)
{
leafNodes
.
addAll
(
child
.
getAllLeafData
());
}
}
return
leafNodes
;
}
}
dlink-core/src/main/java/com/dlink/explainer/sqlLineage/TreeNodeIterator.java
0 → 100644
View file @
841296da
package
com
.
dlink
.
explainer
.
sqlLineage
;
import
java.util.Iterator
;
public
class
TreeNodeIterator
<
T
>
implements
Iterator
<
TreeNode
<
T
>>
{
private
ProcessStages
doNext
;
private
TreeNode
<
T
>
next
;
private
Iterator
<
TreeNode
<
T
>>
childrenCurNodeIter
;
private
Iterator
<
TreeNode
<
T
>>
childrenSubNodeIter
;
private
TreeNode
<
T
>
treeNode
;
public
TreeNodeIterator
(
TreeNode
<
T
>
treeNode
)
{
this
.
treeNode
=
treeNode
;
this
.
doNext
=
ProcessStages
.
ProcessParent
;
this
.
childrenCurNodeIter
=
treeNode
.
children
.
iterator
();
}
public
boolean
hasNext
()
{
if
(
this
.
doNext
==
ProcessStages
.
ProcessParent
)
{
this
.
next
=
this
.
treeNode
;
this
.
doNext
=
ProcessStages
.
ProcessChildCurNode
;
return
true
;
}
if
(
this
.
doNext
==
ProcessStages
.
ProcessChildCurNode
)
{
if
(
childrenCurNodeIter
.
hasNext
())
{
TreeNode
<
T
>
childDirect
=
childrenCurNodeIter
.
next
();
childrenSubNodeIter
=
childDirect
.
iterator
();
this
.
doNext
=
ProcessStages
.
ProcessChildSubNode
;
return
hasNext
();
}
else
{
this
.
doNext
=
null
;
return
false
;
}
}
if
(
this
.
doNext
==
ProcessStages
.
ProcessChildSubNode
)
{
if
(
childrenSubNodeIter
.
hasNext
())
{
this
.
next
=
childrenSubNodeIter
.
next
();
return
true
;
}
else
{
this
.
next
=
null
;
this
.
doNext
=
ProcessStages
.
ProcessChildCurNode
;
return
hasNext
();
}
}
return
false
;
}
public
TreeNode
<
T
>
next
()
{
return
this
.
next
;
}
/**
* 目前不支持删除节点
*/
public
void
remove
()
{
throw
new
UnsupportedOperationException
();
}
enum
ProcessStages
{
ProcessParent
,
ProcessChildCurNode
,
ProcessChildSubNode
}
}
dlink-web/src/components/Lineage/index.tsx
View file @
841296da
...
...
@@ -25,7 +25,6 @@ const Lineage = (props: any) => {
tables
:
[],
relations
:
[]
};
debugger
;
allData
.
relations
.
forEach
(
relation
=>
{
if
(
relation
.
srcTableId
!==
tableId
)
{
return
;
...
...
dlink-web/src/components/Studio/StudioConsole/StudioCA/index.tsx
View file @
841296da
...
...
@@ -18,6 +18,7 @@ const StudioCA = (props: any) => {
const
res
=
getLineage
({
statement
:
current
.
value
,
statementSet
:
current
.
task
.
statementSet
,
dialect
:
current
.
task
.
dialect
,
type
:
1
,
});
res
.
then
((
result
)
=>
{
...
...
dlink-web/src/components/Studio/StudioEdit/data.d.ts
View file @
841296da
...
...
@@ -47,4 +47,5 @@ export type CAParam = {
statement
:
string
,
statementSet
:
boolean
,
type
:
number
,
dialect
?:
string
,
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment