Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
13c6222f
Commit
13c6222f
authored
Mar 19, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fixbug 字段血缘
parent
df3c6187
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
190 additions
and
80 deletions
+190
-80
Explainer.java
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
+39
-8
ColumnCA.java
...k-core/src/main/java/com/dlink/explainer/ca/ColumnCA.java
+5
-30
ColumnCAGenerator.java
...c/main/java/com/dlink/explainer/ca/ColumnCAGenerator.java
+10
-10
TableCA.java
dlink-core/src/main/java/com/dlink/explainer/ca/TableCA.java
+52
-8
LineageColumnGenerator.java
...a/com/dlink/explainer/lineage/LineageColumnGenerator.java
+22
-19
LineageTableGenerator.java
...va/com/dlink/explainer/lineage/LineageTableGenerator.java
+9
-2
OperatorTrans.java
...rc/main/java/com/dlink/explainer/trans/OperatorTrans.java
+12
-0
MapParseUtils.java
dlink-core/src/main/java/com/dlink/utils/MapParseUtils.java
+41
-3
No files found.
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
View file @
13c6222f
...
@@ -357,29 +357,60 @@ public class Explainer {
...
@@ -357,29 +357,60 @@ public class Explainer {
generator
.
setTableCAS
(
tableGenerator
.
getTables
());
generator
.
setTableCAS
(
tableGenerator
.
getTables
());
generator
.
translate
();
generator
.
translate
();
ColumnCAResult
columnCAResult
=
new
ColumnCAResult
(
generator
);
ColumnCAResult
columnCAResult
=
new
ColumnCAResult
(
generator
);
modifySink
Column
(
columnCAResult
);
correct
Column
(
columnCAResult
);
results
.
add
(
columnCAResult
);
results
.
add
(
columnCAResult
);
}
}
return
results
;
return
results
;
}
}
private
void
modifySink
Column
(
ColumnCAResult
columnCAResult
)
{
private
void
correct
Column
(
ColumnCAResult
columnCAResult
)
{
for
(
TableCA
tableCA
:
columnCAResult
.
getTableCAS
())
{
for
(
TableCA
tableCA
:
columnCAResult
.
getTableCAS
())
{
CatalogManager
catalogManager
=
executor
.
getCatalogManager
();
List
<
String
>
columnList
=
FlinkUtil
.
getFieldNamesFromCatalogManager
(
catalogManager
,
tableCA
.
getCatalog
(),
tableCA
.
getDatabase
(),
tableCA
.
getTable
());
List
<
String
>
fields
=
tableCA
.
getFields
();
List
<
String
>
oldFields
=
new
ArrayList
<>();
oldFields
.
addAll
(
fields
);
if
(
tableCA
.
getType
().
equals
(
"Data Sink"
))
{
if
(
tableCA
.
getType
().
equals
(
"Data Sink"
))
{
CatalogManager
catalogManager
=
executor
.
getCatalogManager
();
List
<
String
>
columnList
=
FlinkUtil
.
getFieldNamesFromCatalogManager
(
catalogManager
,
tableCA
.
getCatalog
(),
tableCA
.
getDatabase
(),
tableCA
.
getTable
());
List
<
String
>
fields
=
tableCA
.
getFields
();
for
(
int
i
=
0
;
i
<
columnList
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
columnList
.
size
();
i
++)
{
String
sinkColumnName
=
columnList
.
get
(
i
);
String
sinkColumnName
=
columnList
.
get
(
i
);
if
(!
sinkColumnName
.
equals
(
fields
.
get
(
i
)))
{
if
(!
sinkColumnName
.
equals
(
oldFields
.
get
(
i
)))
{
for
(
Map
.
Entry
<
Integer
,
ColumnCA
>
item
:
columnCAResult
.
getColumnCASMaps
().
entrySet
())
{
for
(
Map
.
Entry
<
Integer
,
ColumnCA
>
item
:
columnCAResult
.
getColumnCASMaps
().
entrySet
())
{
ColumnCA
columnCA
=
item
.
getValue
();
ColumnCA
columnCA
=
item
.
getValue
();
if
(
columnCA
.
getTableId
()==
tableCA
.
getId
()&&
columnCA
.
getName
().
equals
(
fields
.
get
(
i
)))
{
if
(
columnCA
.
getTableId
()
==
tableCA
.
getId
()
&&
columnCA
.
getName
().
equals
(
oldFields
.
get
(
i
)))
{
columnCA
.
setName
(
sinkColumnName
);
columnCA
.
setName
(
sinkColumnName
);
fields
.
set
(
i
,
sinkColumnName
);
fields
.
set
(
i
,
sinkColumnName
);
}
}
}
}
}
}
for
(
TableCA
tableCA
:
columnCAResult
.
getTableCAS
())
{
CatalogManager
catalogManager
=
executor
.
getCatalogManager
();
List
<
String
>
columnList
=
FlinkUtil
.
getFieldNamesFromCatalogManager
(
catalogManager
,
tableCA
.
getCatalog
(),
tableCA
.
getDatabase
(),
tableCA
.
getTable
());
List
<
String
>
fields
=
tableCA
.
getFields
();
int
i
=
0
;
while
(
i
<
fields
.
size
())
{
if
(!
columnList
.
contains
(
fields
.
get
(
i
)))
{
List
<
Integer
>
idList
=
new
ArrayList
<>();
for
(
Map
.
Entry
<
Integer
,
ColumnCA
>
item
:
columnCAResult
.
getColumnCASMaps
().
entrySet
())
{
if
(
item
.
getValue
().
getName
().
equals
(
fields
.
get
(
i
))
&&
item
.
getValue
().
getTableId
()
==
tableCA
.
getId
())
{
idList
.
add
(
item
.
getValue
().
getId
());
break
;
}
}
for
(
Integer
id
:
idList
)
{
for
(
NodeRel
nodeRel
:
columnCAResult
.
getColumnCASRelChain
())
{
if
(
nodeRel
.
getPreId
()
==
id
)
{
columnCAResult
.
getColumnCASMaps
().
remove
(
id
);
columnCAResult
.
getColumnCASRelChain
().
remove
(
nodeRel
);
break
;
}
}
}
}
}
}
fields
.
remove
(
i
);
}
else
{
i
++;
}
}
}
}
}
}
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/ColumnCA.java
View file @
13c6222f
...
@@ -12,9 +12,7 @@ import java.util.List;
...
@@ -12,9 +12,7 @@ import java.util.List;
* @author wenmo
* @author wenmo
* @since 2021/6/22
* @since 2021/6/22
**/
**/
@Getter
public
class
ColumnCA
implements
ICA
{
@Setter
public
class
ColumnCA
implements
ICA
{
private
Integer
id
;
private
Integer
id
;
private
Integer
tableId
;
private
Integer
tableId
;
private
List
<
Integer
>
parentId
;
private
List
<
Integer
>
parentId
;
...
@@ -25,11 +23,10 @@ public class ColumnCA implements ICA{
...
@@ -25,11 +23,10 @@ public class ColumnCA implements ICA{
private
String
familyName
;
private
String
familyName
;
private
String
type
;
private
String
type
;
private
String
columnType
;
private
String
columnType
;
private
Trans
trans
;
private
TableCA
tableCA
;
private
TableCA
tableCA
;
private
String
tableName
;
private
String
tableName
;
public
ColumnCA
(
Integer
id
,
String
name
,
String
alias
,
String
columnName
,
String
familyName
,
String
operation
,
TableCA
tableCA
,
Trans
trans
)
{
public
ColumnCA
(
Integer
id
,
String
name
,
String
alias
,
String
columnName
,
String
familyName
,
String
operation
,
TableCA
tableCA
)
{
this
.
id
=
id
;
this
.
id
=
id
;
this
.
name
=
name
;
this
.
name
=
name
;
this
.
alias
=
alias
;
this
.
alias
=
alias
;
...
@@ -39,11 +36,10 @@ public class ColumnCA implements ICA{
...
@@ -39,11 +36,10 @@ public class ColumnCA implements ICA{
this
.
tableCA
=
tableCA
;
this
.
tableCA
=
tableCA
;
this
.
tableId
=
tableCA
.
getId
();
this
.
tableId
=
tableCA
.
getId
();
this
.
tableName
=
tableCA
.
getName
();
this
.
tableName
=
tableCA
.
getName
();
this
.
trans
=
trans
;
this
.
type
=
tableCA
.
getType
();
this
.
type
=
trans
.
getPact
();
}
}
public
ColumnCA
(
Integer
id
,
List
<
Integer
>
parentId
,
String
name
,
String
alias
,
String
columnName
,
String
familyName
,
String
type
,
TableCA
tableCA
)
{
/*
public ColumnCA(Integer id, List<Integer> parentId, String name, String alias, String columnName, String familyName, String type, TableCA tableCA) {
this.id = id;
this.id = id;
this.parentId = parentId;
this.parentId = parentId;
this.name = name;
this.name = name;
...
@@ -52,7 +48,7 @@ public class ColumnCA implements ICA{
...
@@ -52,7 +48,7 @@ public class ColumnCA implements ICA{
this.familyName = familyName;
this.familyName = familyName;
this.type = type;
this.type = type;
this.tableCA = tableCA;
this.tableCA = tableCA;
}
}
*/
public
Integer
getId
()
{
public
Integer
getId
()
{
return
id
;
return
id
;
...
@@ -134,14 +130,6 @@ public class ColumnCA implements ICA{
...
@@ -134,14 +130,6 @@ public class ColumnCA implements ICA{
this
.
columnType
=
columnType
;
this
.
columnType
=
columnType
;
}
}
public
Trans
getTrans
()
{
return
trans
;
}
public
void
setTrans
(
Trans
trans
)
{
this
.
trans
=
trans
;
}
public
String
getTableName
()
{
public
String
getTableName
()
{
return
tableName
;
return
tableName
;
}
}
...
@@ -157,17 +145,4 @@ public class ColumnCA implements ICA{
...
@@ -157,17 +145,4 @@ public class ColumnCA implements ICA{
public
void
setOperation
(
String
operation
)
{
public
void
setOperation
(
String
operation
)
{
this
.
operation
=
operation
;
this
.
operation
=
operation
;
}
}
@Override
public
String
toString
()
{
return
"ColumnCA{"
+
"id="
+
id
+
", parentId="
+
parentId
+
", name='"
+
name
+
'\''
+
", columnName='"
+
columnName
+
'\''
+
", familyName='"
+
familyName
+
'\''
+
", type='"
+
type
+
'\''
+
", tableCA="
+
tableCA
+
'}'
;
}
}
}
dlink-core/src/main/java/com/dlink/explainer/ca/ColumnCAGenerator.java
View file @
13c6222f
...
@@ -65,7 +65,7 @@ public class ColumnCAGenerator implements CAGenerator {
...
@@ -65,7 +65,7 @@ public class ColumnCAGenerator implements CAGenerator {
for
(
int
j
=
0
;
j
<
sourceFields
.
size
();
j
++)
{
for
(
int
j
=
0
;
j
<
sourceFields
.
size
();
j
++)
{
String
fieldName
=
sourceFields
.
get
(
j
);
String
fieldName
=
sourceFields
.
get
(
j
);
Integer
id
=
index
++;
Integer
id
=
index
++;
ColumnCA
columnCA
=
new
ColumnCA
(
id
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
tableCA
,
transList
.
get
(
i
)
);
ColumnCA
columnCA
=
new
ColumnCA
(
id
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
tableCA
);
this
.
columnCASMaps
.
put
(
id
,
columnCA
);
this
.
columnCASMaps
.
put
(
id
,
columnCA
);
this
.
columnCAS
.
add
(
columnCA
);
this
.
columnCAS
.
add
(
columnCA
);
}
}
...
@@ -87,34 +87,34 @@ public class ColumnCAGenerator implements CAGenerator {
...
@@ -87,34 +87,34 @@ public class ColumnCAGenerator implements CAGenerator {
}
}
}
}
private
void
searchColumnCAId
(
TableCA
tableCA
){
private
void
searchColumnCAId
(
TableCA
tableCA
)
{
List
<
Integer
>
sufOnly
=
new
ArrayList
<>();
List
<
Integer
>
sufOnly
=
new
ArrayList
<>();
for
(
NodeRel
nodeRel
:
this
.
columnCASRel
)
{
for
(
NodeRel
nodeRel
:
this
.
columnCASRel
)
{
if
(!
sufOnly
.
contains
(
nodeRel
.
getSufId
()))
{
if
(!
sufOnly
.
contains
(
nodeRel
.
getSufId
()))
{
sufOnly
.
add
(
nodeRel
.
getSufId
());
sufOnly
.
add
(
nodeRel
.
getSufId
());
}
}
}
}
for
(
NodeRel
nodeRel
:
this
.
columnCASRel
)
{
for
(
NodeRel
nodeRel
:
this
.
columnCASRel
)
{
if
(
sufOnly
.
contains
(
nodeRel
.
getPreId
()))
{
if
(
sufOnly
.
contains
(
nodeRel
.
getPreId
()))
{
sufOnly
.
remove
(
nodeRel
.
getPreId
());
sufOnly
.
remove
(
nodeRel
.
getPreId
());
}
}
}
}
List
<
Integer
>
preOnly
=
new
ArrayList
<>();
List
<
Integer
>
preOnly
=
new
ArrayList
<>();
for
(
NodeRel
nodeRel
:
this
.
columnCASRel
)
{
for
(
NodeRel
nodeRel
:
this
.
columnCASRel
)
{
if
(!
preOnly
.
contains
(
nodeRel
.
getPreId
()))
{
if
(!
preOnly
.
contains
(
nodeRel
.
getPreId
()))
{
preOnly
.
add
(
nodeRel
.
getPreId
());
preOnly
.
add
(
nodeRel
.
getPreId
());
}
}
}
}
for
(
NodeRel
nodeRel
:
this
.
columnCASRel
)
{
for
(
NodeRel
nodeRel
:
this
.
columnCASRel
)
{
if
(
preOnly
.
contains
(
nodeRel
.
getSufId
()))
{
if
(
preOnly
.
contains
(
nodeRel
.
getSufId
()))
{
preOnly
.
remove
(
nodeRel
.
getSufId
());
preOnly
.
remove
(
nodeRel
.
getSufId
());
}
}
}
}
for
(
int
i
=
0
;
i
<
sufOnly
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
sufOnly
.
size
();
i
++)
{
ColumnCA
columnCA
=
(
ColumnCA
)
this
.
columnCASMaps
.
get
(
sufOnly
.
get
(
i
));
ColumnCA
columnCA
=
(
ColumnCA
)
this
.
columnCASMaps
.
get
(
sufOnly
.
get
(
i
));
List
<
String
>
fields
=
tableCA
.
getFields
();
List
<
String
>
fields
=
tableCA
.
getFields
();
for
(
int
j
=
0
;
j
<
fields
.
size
();
j
++)
{
for
(
int
j
=
0
;
j
<
fields
.
size
();
j
++)
{
if
(
columnCA
.
getAlias
().
equals
(
fields
.
get
(
j
)))
{
if
(
columnCA
.
getAlias
().
equals
(
fields
.
get
(
j
)))
{
tableCA
.
getColumnCAIds
().
add
(
sufOnly
.
get
(
i
));
tableCA
.
getColumnCAIds
().
add
(
sufOnly
.
get
(
i
));
break
;
break
;
}
}
...
@@ -153,7 +153,7 @@ public class ColumnCAGenerator implements CAGenerator {
...
@@ -153,7 +153,7 @@ public class ColumnCAGenerator implements CAGenerator {
}
}
private
void
searchSelect
(
TableCA
tableCA
,
ColumnCA
columnCA
,
OperatorTrans
trans
,
String
operation
,
String
alias
)
{
private
void
searchSelect
(
TableCA
tableCA
,
ColumnCA
columnCA
,
OperatorTrans
trans
,
String
operation
,
String
alias
)
{
if
(
MapParseUtils
.
hasField
(
operation
,
columnCA
.
getAlias
()))
{
if
(
MapParseUtils
.
hasField
(
operation
,
columnCA
.
getAlias
()))
{
boolean
isHad
=
false
;
boolean
isHad
=
false
;
Integer
cid
=
null
;
Integer
cid
=
null
;
for
(
int
j
=
0
;
j
<
this
.
columnCAS
.
size
();
j
++)
{
for
(
int
j
=
0
;
j
<
this
.
columnCAS
.
size
();
j
++)
{
...
@@ -167,7 +167,7 @@ public class ColumnCAGenerator implements CAGenerator {
...
@@ -167,7 +167,7 @@ public class ColumnCAGenerator implements CAGenerator {
if
(!
isHad
)
{
if
(!
isHad
)
{
cid
=
index
++;
cid
=
index
++;
// String columnOperation = MapParseUtils.replaceField(operation,columnCA.getAlias(),columnCA.getOperation());
// String columnOperation = MapParseUtils.replaceField(operation,columnCA.getAlias(),columnCA.getOperation());
ColumnCA
columnCA2
=
new
ColumnCA
(
cid
,
alias
,
alias
,
alias
,
alias
,
operation
,
tableCA
,
trans
);
ColumnCA
columnCA2
=
new
ColumnCA
(
cid
,
alias
,
alias
,
alias
,
alias
,
operation
,
tableCA
);
this
.
columnCASMaps
.
put
(
cid
,
columnCA2
);
this
.
columnCASMaps
.
put
(
cid
,
columnCA2
);
this
.
columnCAS
.
add
(
columnCA2
);
this
.
columnCAS
.
add
(
columnCA2
);
buildColumnCAFields
(
tableCA
,
trans
.
getParentId
(),
columnCA2
);
buildColumnCAFields
(
tableCA
,
trans
.
getParentId
(),
columnCA2
);
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/TableCA.java
View file @
13c6222f
package
com
.
dlink
.
explainer
.
ca
;
package
com
.
dlink
.
explainer
.
ca
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.explainer.trans.Field
;
import
com.dlink.explainer.trans.OperatorTrans
;
import
com.dlink.explainer.trans.SinkTrans
;
import
com.dlink.explainer.trans.SinkTrans
;
import
com.dlink.explainer.trans.SourceTrans
;
import
com.dlink.explainer.trans.SourceTrans
;
import
com.dlink.explainer.trans.Trans
;
import
com.dlink.explainer.trans.Trans
;
import
lombok.Getter
;
import
lombok.Getter
;
import
lombok.Setter
;
import
lombok.Setter
;
import
java.util.ArrayList
;
import
java.util.HashSet
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Set
;
import
java.util.Set
;
...
@@ -18,7 +22,7 @@ import java.util.Set;
...
@@ -18,7 +22,7 @@ import java.util.Set;
**/
**/
@Getter
@Getter
@Setter
@Setter
public
class
TableCA
implements
ICA
{
public
class
TableCA
implements
ICA
{
private
Integer
id
;
private
Integer
id
;
private
Integer
parentId
;
private
Integer
parentId
;
private
String
name
;
private
String
name
;
...
@@ -63,13 +67,53 @@ public class TableCA implements ICA{
...
@@ -63,13 +67,53 @@ public class TableCA implements ICA{
this
.
type
=
trans
.
getPact
();
this
.
type
=
trans
.
getPact
();
}
}
public
static
TableCA
build
(
Trans
trans
){
public
TableCA
(
OperatorTrans
trans
)
{
if
(
trans
instanceof
SourceTrans
){
List
<
String
>
tableList
=
trans
.
getTable
();
return
new
TableCA
((
SourceTrans
)
trans
);
this
.
id
=
trans
.
getId
();
}
else
if
(
trans
instanceof
SinkTrans
){
this
.
parentId
=
trans
.
getParentId
();
return
new
TableCA
((
SinkTrans
)
trans
);
this
.
name
=
trans
.
getName
();
}
else
{
List
<
Field
>
select
=
trans
.
getSelect
();
return
TableCA
.
EMPTY
;
List
<
String
>
fieldList
=
new
ArrayList
<>();
for
(
Field
field
:
select
)
{
fieldList
.
add
(
field
.
getAlias
());
}
this
.
fields
=
fieldList
;
this
.
useFields
=
fieldList
;
this
.
parallelism
=
trans
.
getParallelism
();
this
.
type
=
trans
.
getPact
();
if
(
tableList
.
size
()
>
0
)
{
String
tableStr
=
tableList
.
get
(
0
);
String
[]
strings
=
tableStr
.
split
(
"\\."
);
if
(
strings
.
length
>
2
)
{
this
.
catalog
=
strings
[
0
];
this
.
database
=
strings
[
1
];
this
.
table
=
strings
[
2
];
}
else
if
(
strings
.
length
==
2
)
{
this
.
catalog
=
"default_catalog"
;
this
.
database
=
strings
[
0
];
this
.
table
=
strings
[
1
];
}
else
if
(
strings
.
length
==
1
)
{
this
.
catalog
=
"default_catalog"
;
this
.
database
=
"default_database"
;
this
.
table
=
strings
[
0
];
}
}
}
public
static
TableCA
build
(
Trans
trans
)
{
if
(
trans
instanceof
SourceTrans
)
{
return
new
TableCA
((
SourceTrans
)
trans
);
}
else
if
(
trans
instanceof
SinkTrans
)
{
return
new
TableCA
((
SinkTrans
)
trans
);
}
else
if
(
trans
instanceof
OperatorTrans
)
{
OperatorTrans
operatorTrans
=
(
OperatorTrans
)
trans
;
if
(
Asserts
.
isNotNullCollection
(
operatorTrans
.
getTable
()))
{
return
new
TableCA
(
operatorTrans
);
}
else
{
return
null
;
}
}
else
{
return
null
;
}
}
}
}
...
...
dlink-core/src/main/java/com/dlink/explainer/lineage/LineageColumnGenerator.java
View file @
13c6222f
...
@@ -47,40 +47,40 @@ public class LineageColumnGenerator {
...
@@ -47,40 +47,40 @@ public class LineageColumnGenerator {
}
}
public
void
translate
()
{
public
void
translate
()
{
for
(
Map
.
Entry
<
Integer
,
Trans
>
entry
:
transMaps
.
entrySet
())
{
for
(
TableCA
tableCA
:
tableCAS
)
{
Trans
trans
=
entry
.
getValue
();
for
(
String
fieldName
:
tableCA
.
getFields
())
{
if
(
trans
instanceof
SourceTrans
)
{
int
id
=
index
++;
TableCA
tableCA
=
new
TableCA
((
SourceTrans
)
trans
);
ColumnCA
columnCA
=
new
ColumnCA
(
id
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
tableCA
);
for
(
String
fieldName
:
tableCA
.
getFields
())
{
columnCASMaps
.
put
(
id
,
columnCA
);
int
id
=
index
++;
columnCAS
.
add
(
columnCA
);
ColumnCA
columnCA
=
new
ColumnCA
(
id
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
tableCA
,
trans
);
buildColumnCAFields
(
tableCA
,
tableCA
.
getParentId
(),
columnCA
);
columnCASMaps
.
put
(
id
,
columnCA
);
columnCAS
.
add
(
columnCA
);
}
for
(
ColumnCA
columnCA
:
columnCAS
)
{
if
(
columnCA
.
getTableCA
().
getId
()
==
tableCA
.
getId
())
{
buildColumnCAFields
(
tableCA
,
tableCA
.
getParentId
(),
columnCA
);
}
}
}
}
/*for (ColumnCA columnCA : columnCAS) {
if (columnCA.getTableCA().getId() == tableCA.getId()) {
buildColumnCAFields(tableCA, tableCA.getParentId(), columnCA);
}
}*/
}
}
for
(
Map
.
Entry
<
Integer
,
Trans
>
entry
:
transMaps
.
entrySet
())
{
for
(
Map
.
Entry
<
Integer
,
Trans
>
entry
:
transMaps
.
entrySet
())
{
Trans
trans
=
entry
.
getValue
();
Trans
trans
=
entry
.
getValue
();
if
(
trans
instanceof
SinkTrans
)
{
if
(
trans
instanceof
SinkTrans
)
{
TableCA
tableCA
=
new
TableCA
((
SinkTrans
)
trans
);
TableCA
tableCA
=
new
TableCA
((
SinkTrans
)
trans
);
matchSinkField
(
tableCA
,
trans
);
matchSinkField
(
tableCA
);
searchColumnCAId
(
tableCA
);
searchColumnCAId
(
tableCA
);
}
}
}
}
chainRelation
();
chainRelation
();
}
}
private
void
matchSinkField
(
TableCA
tableCA
,
Trans
trans
)
{
private
void
matchSinkField
(
TableCA
tableCA
)
{
for
(
ColumnCA
columnCA
:
columnCAS
)
{
for
(
ColumnCA
columnCA
:
columnCAS
)
{
if
(
columnCA
.
getTableId
()
==
tableCA
.
getId
())
{
continue
;
}
for
(
String
fieldName
:
tableCA
.
getFields
())
{
for
(
String
fieldName
:
tableCA
.
getFields
())
{
if
(
columnCA
.
getName
().
equals
(
fieldName
))
{
if
(
columnCA
.
getName
().
equals
(
fieldName
))
{
int
cid
=
index
++;
int
cid
=
index
++;
ColumnCA
sinkColumnCA
=
new
ColumnCA
(
cid
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
tableCA
,
trans
);
ColumnCA
sinkColumnCA
=
new
ColumnCA
(
cid
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
fieldName
,
tableCA
);
columnCASMaps
.
put
(
cid
,
sinkColumnCA
);
columnCASMaps
.
put
(
cid
,
sinkColumnCA
);
columnCASRel
.
add
(
new
NodeRel
(
columnCA
.
getId
(),
cid
));
columnCASRel
.
add
(
new
NodeRel
(
columnCA
.
getId
(),
cid
));
}
}
...
@@ -90,6 +90,9 @@ public class LineageColumnGenerator {
...
@@ -90,6 +90,9 @@ public class LineageColumnGenerator {
private
void
buildColumnCAFields
(
TableCA
tableCA
,
Integer
id
,
ColumnCA
columnCA
)
{
private
void
buildColumnCAFields
(
TableCA
tableCA
,
Integer
id
,
ColumnCA
columnCA
)
{
if
(
transMaps
.
get
(
id
)
instanceof
OperatorTrans
)
{
if
(
transMaps
.
get
(
id
)
instanceof
OperatorTrans
)
{
if
(
tableCA
.
getId
()
==
id
)
{
return
;
}
OperatorTrans
trans
=
(
OperatorTrans
)
transMaps
.
get
(
id
);
OperatorTrans
trans
=
(
OperatorTrans
)
transMaps
.
get
(
id
);
List
<
Field
>
selects
=
trans
.
getSelect
();
List
<
Field
>
selects
=
trans
.
getSelect
();
if
(
Asserts
.
isNotNull
(
selects
))
{
if
(
Asserts
.
isNotNull
(
selects
))
{
...
@@ -129,7 +132,7 @@ public class LineageColumnGenerator {
...
@@ -129,7 +132,7 @@ public class LineageColumnGenerator {
}
}
if
(!
isHad
)
{
if
(!
isHad
)
{
cid
=
index
++;
cid
=
index
++;
ColumnCA
columnCA2
=
new
ColumnCA
(
cid
,
alias
,
alias
,
alias
,
alias
,
operation
,
tableCA
,
trans
);
ColumnCA
columnCA2
=
new
ColumnCA
(
cid
,
alias
,
alias
,
alias
,
alias
,
operation
,
tableCA
);
columnCASMaps
.
put
(
cid
,
columnCA2
);
columnCASMaps
.
put
(
cid
,
columnCA2
);
buildColumnCAFields
(
tableCA
,
trans
.
getParentId
(),
columnCA2
);
buildColumnCAFields
(
tableCA
,
trans
.
getParentId
(),
columnCA2
);
}
}
...
...
dlink-core/src/main/java/com/dlink/explainer/lineage/LineageTableGenerator.java
View file @
13c6222f
package
com
.
dlink
.
explainer
.
lineage
;
package
com
.
dlink
.
explainer
.
lineage
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.explainer.ca.TableCA
;
import
com.dlink.explainer.ca.TableCA
;
import
com.dlink.explainer.trans.OperatorTrans
;
import
com.dlink.explainer.trans.SinkTrans
;
import
com.dlink.explainer.trans.SinkTrans
;
import
com.dlink.explainer.trans.SourceTrans
;
import
com.dlink.explainer.trans.SourceTrans
;
import
com.dlink.explainer.trans.Trans
;
import
com.dlink.explainer.trans.Trans
;
...
@@ -24,10 +26,10 @@ public class LineageTableGenerator {
...
@@ -24,10 +26,10 @@ public class LineageTableGenerator {
public
LineageTableGenerator
()
{
public
LineageTableGenerator
()
{
}
}
public
static
LineageTableGenerator
build
(
List
<
Trans
>
transList
){
public
static
LineageTableGenerator
build
(
List
<
Trans
>
transList
)
{
LineageTableGenerator
generator
=
new
LineageTableGenerator
();
LineageTableGenerator
generator
=
new
LineageTableGenerator
();
Map
<
Integer
,
Trans
>
map
=
new
HashMap
<>();
Map
<
Integer
,
Trans
>
map
=
new
HashMap
<>();
for
(
Trans
trans:
transList
)
{
for
(
Trans
trans
:
transList
)
{
map
.
put
(
trans
.
getId
(),
trans
);
map
.
put
(
trans
.
getId
(),
trans
);
}
}
generator
.
setTransMaps
(
map
);
generator
.
setTransMaps
(
map
);
...
@@ -40,6 +42,11 @@ public class LineageTableGenerator {
...
@@ -40,6 +42,11 @@ public class LineageTableGenerator {
tables
.
add
(
TableCA
.
build
(
entry
.
getValue
()));
tables
.
add
(
TableCA
.
build
(
entry
.
getValue
()));
}
else
if
(
entry
.
getValue
()
instanceof
SinkTrans
)
{
}
else
if
(
entry
.
getValue
()
instanceof
SinkTrans
)
{
tables
.
add
(
TableCA
.
build
(
entry
.
getValue
()));
tables
.
add
(
TableCA
.
build
(
entry
.
getValue
()));
}
else
if
(
entry
.
getValue
()
instanceof
OperatorTrans
)
{
TableCA
tableCA
=
TableCA
.
build
(
entry
.
getValue
());
if
(
Asserts
.
isNotNull
(
tableCA
))
{
tables
.
add
(
tableCA
);
}
}
}
}
}
}
}
...
...
dlink-core/src/main/java/com/dlink/explainer/trans/OperatorTrans.java
View file @
13c6222f
...
@@ -16,8 +16,10 @@ import java.util.Map;
...
@@ -16,8 +16,10 @@ import java.util.Map;
public
class
OperatorTrans
extends
AbstractTrans
implements
Trans
{
public
class
OperatorTrans
extends
AbstractTrans
implements
Trans
{
private
List
<
Field
>
select
;
private
List
<
Field
>
select
;
private
List
<
String
>
table
;
private
List
<
String
>
fields
;
private
List
<
String
>
fields
;
private
List
<
String
>
joinType
;
private
List
<
String
>
joinType
;
private
List
<
String
>
lookup
;
private
String
where
;
private
String
where
;
private
List
<
String
>
leftInputSpec
;
private
List
<
String
>
leftInputSpec
;
private
List
<
String
>
rightInputSpec
;
private
List
<
String
>
rightInputSpec
;
...
@@ -49,6 +51,14 @@ public class OperatorTrans extends AbstractTrans implements Trans {
...
@@ -49,6 +51,14 @@ public class OperatorTrans extends AbstractTrans implements Trans {
return
rightInputSpec
;
return
rightInputSpec
;
}
}
public
List
<
String
>
getTable
()
{
return
table
;
}
public
List
<
String
>
getLookup
()
{
return
lookup
;
}
@Override
@Override
public
String
getHandle
()
{
public
String
getHandle
()
{
return
TRANS_TYPE
;
return
TRANS_TYPE
;
...
@@ -66,7 +76,9 @@ public class OperatorTrans extends AbstractTrans implements Trans {
...
@@ -66,7 +76,9 @@ public class OperatorTrans extends AbstractTrans implements Trans {
Map
map
=
MapParseUtils
.
parseForSelect
(
contents
);
Map
map
=
MapParseUtils
.
parseForSelect
(
contents
);
translateSelect
((
ArrayList
<
String
>)
map
.
get
(
"select"
));
translateSelect
((
ArrayList
<
String
>)
map
.
get
(
"select"
));
fields
=
(
ArrayList
<
String
>)
map
.
get
(
"fields"
);
fields
=
(
ArrayList
<
String
>)
map
.
get
(
"fields"
);
table
=
(
ArrayList
<
String
>)
map
.
get
(
"table"
);
joinType
=
(
ArrayList
<
String
>)
map
.
get
(
"joinType"
);
joinType
=
(
ArrayList
<
String
>)
map
.
get
(
"joinType"
);
lookup
=
(
ArrayList
<
String
>)
map
.
get
(
"lookup"
);
where
=
map
.
containsKey
(
"where"
)
?
map
.
get
(
"where"
).
toString
()
:
null
;
where
=
map
.
containsKey
(
"where"
)
?
map
.
get
(
"where"
).
toString
()
:
null
;
leftInputSpec
=
(
ArrayList
<
String
>)
map
.
get
(
"leftInputSpec"
);
leftInputSpec
=
(
ArrayList
<
String
>)
map
.
get
(
"leftInputSpec"
);
rightInputSpec
=
(
ArrayList
<
String
>)
map
.
get
(
"rightInputSpec"
);
rightInputSpec
=
(
ArrayList
<
String
>)
map
.
get
(
"rightInputSpec"
);
...
...
dlink-core/src/main/java/com/dlink/utils/MapParseUtils.java
View file @
13c6222f
...
@@ -145,6 +145,44 @@ public class MapParseUtils {
...
@@ -145,6 +145,44 @@ public class MapParseUtils {
return
selects
;
return
selects
;
}
}
private
static
Map
<
String
,
List
<
String
>>
getKeyAndValues
(
String
inStr
)
{
Map
<
String
,
List
<
String
>>
map
=
new
HashMap
<>();
if
(
inStr
==
null
||
inStr
.
isEmpty
())
{
return
map
;
}
Deque
<
Integer
>
stack
=
new
LinkedList
<>();
int
startIndex
=
0
;
String
key
=
null
;
for
(
int
i
=
0
;
i
<
inStr
.
length
();
i
++)
{
char
currentChar
=
inStr
.
charAt
(
i
);
if
(
stack
.
size
()
==
0
&&
currentChar
==
'['
)
{
key
=
inStr
.
substring
(
startIndex
,
i
-
1
).
trim
();
map
.
put
(
key
,
new
ArrayList
<>());
startIndex
=
i
+
1
;
continue
;
}
if
(
stack
.
size
()
==
0
&&
currentChar
==
']'
)
{
map
.
get
(
key
).
add
(
inStr
.
substring
(
startIndex
,
i
).
trim
());
startIndex
=
i
+
2
;
key
=
null
;
continue
;
}
if
(
key
!=
null
&&
stack
.
size
()
==
0
&&
currentChar
==
','
)
{
map
.
get
(
key
).
add
(
inStr
.
substring
(
startIndex
,
i
).
trim
());
startIndex
=
i
+
1
;
continue
;
}
if
(
currentChar
==
'('
)
{
stack
.
push
(
i
);
continue
;
}
if
(
currentChar
==
')'
)
{
stack
.
pop
();
}
}
return
map
;
}
public
static
boolean
hasField
(
String
fragement
,
String
field
)
{
public
static
boolean
hasField
(
String
fragement
,
String
field
)
{
if
(
field
.
startsWith
(
"$"
))
{
if
(
field
.
startsWith
(
"$"
))
{
field
=
field
.
substring
(
1
,
field
.
length
());
field
=
field
.
substring
(
1
,
field
.
length
());
...
@@ -242,9 +280,9 @@ public class MapParseUtils {
...
@@ -242,9 +280,9 @@ public class MapParseUtils {
* @date 2021/8/20 15:03
* @date 2021/8/20 15:03
*/
*/
public
static
Map
parseForSelect
(
String
inStr
)
{
public
static
Map
parseForSelect
(
String
inStr
)
{
Map
map
=
new
HashMap
();
//
Map map = new HashMap();
map
.
put
(
getMapKeyOnlySelectOrField
(
inStr
),
getSelectList
(
inStr
));
//
map.put(getMapKeyOnlySelectOrField(inStr), getSelectList(inStr));
return
map
;
return
getKeyAndValues
(
inStr
)
;
}
}
/**
/**
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment