Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
85b04877
Commit
85b04877
authored
May 08, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[Fix-484] [core] Fix to job plan info was executed twice
parent
596167e2
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
56 additions
and
66 deletions
+56
-66
StudioController.java
.../src/main/java/com/dlink/controller/StudioController.java
+2
-1
Explainer.java
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
+43
-64
JobParam.java
dlink-core/src/main/java/com/dlink/job/JobParam.java
+11
-1
No files found.
dlink-admin/src/main/java/com/dlink/controller/StudioController.java
View file @
85b04877
...
...
@@ -65,7 +65,8 @@ public class StudioController {
try
{
return
Result
.
succeed
(
studioService
.
getJobPlan
(
studioExecuteDTO
),
"获取作业计划成功"
);
}
catch
(
Exception
e
)
{
return
Result
.
failed
(
"目前只支持获取 INSERT 语句的作业计划"
);
e
.
printStackTrace
();
return
Result
.
failed
(
e
.
getMessage
());
}
}
...
...
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
View file @
85b04877
package
com
.
dlink
.
explainer
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
java.time.LocalDateTime
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.Map
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.constant.FlinkSQLConstant
;
import
com.dlink.executor.Executor
;
import
com.dlink.explainer.ca.*
;
import
com.dlink.explainer.ca.ColumnCA
;
import
com.dlink.explainer.ca.ColumnCAResult
;
import
com.dlink.explainer.ca.NodeRel
;
import
com.dlink.explainer.ca.TableCA
;
import
com.dlink.explainer.ca.TableCAGenerator
;
import
com.dlink.explainer.ca.TableCAResult
;
import
com.dlink.explainer.lineage.LineageColumnGenerator
;
import
com.dlink.explainer.lineage.LineageTableGenerator
;
import
com.dlink.explainer.trans.Trans
;
...
...
@@ -21,13 +35,6 @@ import com.dlink.utils.LogUtil;
import
com.dlink.utils.SqlUtil
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
java.time.LocalDateTime
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
/**
* Explainer
...
...
@@ -94,7 +101,7 @@ public class Explainer {
ddl
.
add
(
new
StatementParam
(
statement
,
operationType
));
}
}
return
new
JobParam
(
ddl
,
trans
,
execute
);
return
new
JobParam
(
Arrays
.
asList
(
statements
),
ddl
,
trans
,
execute
);
}
public
List
<
SqlExplainResult
>
explainSqlResult
(
String
statement
)
{
...
...
@@ -226,7 +233,6 @@ public class Explainer {
record
=
executor
.
explainSqlRecord
(
item
.
getValue
());
if
(
Asserts
.
isNull
(
record
))
{
record
=
new
SqlExplainResult
();
executor
.
getStreamGraph
();
}
else
{
executor
.
executeSql
(
item
.
getValue
());
}
...
...
@@ -252,59 +258,31 @@ public class Explainer {
}
public
ObjectNode
getStreamGraph
(
String
statement
)
{
List
<
SqlExplainResult
>
sqlExplainRecords
=
explainSql
(
statement
).
getSqlExplainResults
();
List
<
String
>
sqlPlans
=
new
ArrayList
<>();
JobParam
jobParam
=
pretreatStatements
(
SqlUtil
.
getStatements
(
statement
,
sqlSeparator
));
if
(
jobParam
.
getTrans
().
size
()
>
0
)
{
return
executor
.
getStreamGraph
(
jobParam
.
getStatements
());
}
else
if
(
jobParam
.
getExecute
().
size
()
>
0
)
{
List
<
String
>
datastreamPlans
=
new
ArrayList
<>();
for
(
SqlExplainResult
item
:
sqlExplainRecords
)
{
if
(
Asserts
.
isNotNull
(
item
.
getType
())
&&
item
.
getType
().
contains
(
FlinkSQLConstant
.
DML
))
{
String
[]
statements
=
SqlUtil
.
getStatements
(
item
.
getSql
(),
sqlSeparator
);
for
(
String
str
:
statements
)
{
sqlPlans
.
add
(
str
);
}
continue
;
}
if
(
Asserts
.
isNotNull
(
item
.
getType
())
&&
item
.
getType
().
equals
(
FlinkSQLConstant
.
DATASTREAM
))
{
String
[]
statements
=
SqlUtil
.
getStatements
(
item
.
getSql
(),
sqlSeparator
);
for
(
String
str
:
statements
)
{
datastreamPlans
.
add
(
str
);
}
}
for
(
StatementParam
item
:
jobParam
.
getExecute
())
{
datastreamPlans
.
add
(
item
.
getValue
());
}
if
(
sqlPlans
.
size
()
>
0
)
{
return
executor
.
getStreamGraph
(
sqlPlans
);
}
else
if
(
datastreamPlans
.
size
()
>
0
)
{
return
executor
.
getStreamGraphFromDataStream
(
sqlPlans
);
return
executor
.
getStreamGraphFromDataStream
(
datastreamPlans
);
}
else
{
return
mapper
.
createObjectNode
();
}
}
public
JobPlanInfo
getJobPlanInfo
(
String
statement
)
{
List
<
SqlExplainResult
>
sqlExplainRecords
=
explainSql
(
statement
).
getSqlExplainResults
();
List
<
String
>
sqlPlans
=
new
ArrayList
<>();
JobParam
jobParam
=
pretreatStatements
(
SqlUtil
.
getStatements
(
statement
,
sqlSeparator
));
if
(
jobParam
.
getTrans
().
size
()
>
0
)
{
return
executor
.
getJobPlanInfo
(
jobParam
.
getStatements
());
}
else
if
(
jobParam
.
getExecute
().
size
()
>
0
)
{
List
<
String
>
datastreamPlans
=
new
ArrayList
<>();
for
(
SqlExplainResult
item
:
sqlExplainRecords
)
{
if
(
Asserts
.
isNotNull
(
item
.
getType
())
&&
item
.
getType
().
contains
(
FlinkSQLConstant
.
DML
))
{
String
[]
statements
=
SqlUtil
.
getStatements
(
item
.
getSql
(),
sqlSeparator
);
for
(
String
str
:
statements
)
{
sqlPlans
.
add
(
str
);
}
continue
;
}
if
(
Asserts
.
isNotNull
(
item
.
getType
())
&&
item
.
getType
().
equals
(
FlinkSQLConstant
.
DATASTREAM
))
{
String
[]
statements
=
SqlUtil
.
getStatements
(
item
.
getSql
(),
sqlSeparator
);
for
(
String
str
:
statements
)
{
datastreamPlans
.
add
(
str
);
}
}
for
(
StatementParam
item
:
jobParam
.
getExecute
())
{
datastreamPlans
.
add
(
item
.
getValue
());
}
if
(
sqlPlans
.
size
()
>
0
)
{
return
executor
.
getJobPlanInfo
(
sqlPlans
);
}
else
if
(
datastreamPlans
.
size
()
>
0
)
{
return
executor
.
getJobPlanInfoFromDataStream
(
datastreamPlans
);
}
else
{
return
new
JobPlanInfo
(
""
);
...
...
@@ -451,20 +429,21 @@ public class Explainer {
for
(
NodeRel
nodeRel
:
columnCAResult
.
getColumnCASRelChain
())
{
if
(
nodeRel
.
getPreId
().
equals
(
item
.
getValue
().
getId
()))
{
for
(
NodeRel
nodeRel2
:
columnCAResult
.
getColumnCASRelChain
())
{
if
(
columnCAResult
.
getColumnCASMaps
().
containsKey
(
nodeRel2
.
getSufId
())
&&
columnCAResult
.
getColumnCASMaps
().
containsKey
(
nodeRel2
.
getPreId
())
&&
columnCAResult
.
getColumnCASMaps
().
containsKey
(
nodeRel
.
getSufId
())
&&
if
(
columnCAResult
.
getColumnCASMaps
().
containsKey
(
nodeRel2
.
getSufId
())
&&
columnCAResult
.
getColumnCASMaps
().
containsKey
(
nodeRel2
.
getPreId
())
&&
columnCAResult
.
getColumnCASMaps
().
containsKey
(
nodeRel
.
getSufId
())
&&
columnCAResult
.
getColumnCASMaps
().
get
(
nodeRel2
.
getSufId
()).
getTableId
().
equals
(
columnCAResult
.
getColumnCASMaps
().
get
(
nodeRel
.
getSufId
()).
getTableId
())
&&
columnCAResult
.
getColumnCASMaps
().
get
(
nodeRel2
.
getSufId
()).
getName
().
equals
(
columnCAResult
.
getColumnCASMaps
().
get
(
nodeRel
.
getSufId
()).
getName
())
&&
!
columnCAResult
.
getColumnCASMaps
().
get
(
nodeRel2
.
getPreId
()).
getType
().
equals
(
"Data Sink"
))
{
addNodeRels
.
add
(
new
NodeRel
(
nodeRel2
.
getPreId
(),
nodeRel
.
getPreId
()));
addNodeRels
.
add
(
new
NodeRel
(
nodeRel2
.
getPreId
(),
nodeRel
.
getPreId
()));
}
}
delNodeRels
.
add
(
nodeRel
);
}
}
for
(
NodeRel
nodeRel
:
addNodeRels
){
for
(
NodeRel
nodeRel
:
addNodeRels
)
{
columnCAResult
.
getColumnCASRelChain
().
add
(
nodeRel
);
}
for
(
NodeRel
nodeRel
:
delNodeRels
){
for
(
NodeRel
nodeRel
:
delNodeRels
)
{
columnCAResult
.
getColumnCASRelChain
().
remove
(
nodeRel
);
}
}
...
...
dlink-core/src/main/java/com/dlink/job/JobParam.java
View file @
85b04877
...
...
@@ -9,6 +9,7 @@ import java.util.List;
* @since 2021/11/16
*/
public
class
JobParam
{
private
List
<
String
>
statements
;
private
List
<
StatementParam
>
ddl
;
private
List
<
StatementParam
>
trans
;
private
List
<
StatementParam
>
execute
;
...
...
@@ -18,12 +19,21 @@ public class JobParam {
this
.
trans
=
trans
;
}
public
JobParam
(
List
<
StatementParam
>
ddl
,
List
<
StatementParam
>
trans
,
List
<
StatementParam
>
execute
)
{
public
JobParam
(
List
<
String
>
statements
,
List
<
StatementParam
>
ddl
,
List
<
StatementParam
>
trans
,
List
<
StatementParam
>
execute
)
{
this
.
statements
=
statements
;
this
.
ddl
=
ddl
;
this
.
trans
=
trans
;
this
.
execute
=
execute
;
}
public
List
<
String
>
getStatements
()
{
return
statements
;
}
public
void
setStatements
(
List
<
String
>
statements
)
{
this
.
statements
=
statements
;
}
public
List
<
StatementParam
>
getDdl
()
{
return
ddl
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment