Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
04ef64e0
Commit
04ef64e0
authored
Mar 19, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
core format
parent
cab8d6a4
Changes
49
Show whitespace changes
Inline
Side-by-side
Showing
49 changed files
with
243 additions
and
305 deletions
+243
-305
FlinkCluster.java
dlink-core/src/main/java/com/dlink/cluster/FlinkCluster.java
+9
-11
FlinkClusterInfo.java
...ore/src/main/java/com/dlink/cluster/FlinkClusterInfo.java
+1
-1
Dialect.java
dlink-core/src/main/java/com/dlink/config/Dialect.java
+19
-11
Explainer.java
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
+4
-2
CABuilder.java
...-core/src/main/java/com/dlink/explainer/ca/CABuilder.java
+18
-18
ColumnCA.java
...k-core/src/main/java/com/dlink/explainer/ca/ColumnCA.java
+0
-4
ColumnCAGenerator.java
...c/main/java/com/dlink/explainer/ca/ColumnCAGenerator.java
+2
-12
ColumnCANode.java
...re/src/main/java/com/dlink/explainer/ca/ColumnCANode.java
+1
-1
ColumnCAResult.java
.../src/main/java/com/dlink/explainer/ca/ColumnCAResult.java
+0
-2
ICA.java
dlink-core/src/main/java/com/dlink/explainer/ca/ICA.java
+2
-1
TableCA.java
dlink-core/src/main/java/com/dlink/explainer/ca/TableCA.java
+1
-5
TableCAGenerator.java
...rc/main/java/com/dlink/explainer/ca/TableCAGenerator.java
+2
-12
TableCANode.java
...ore/src/main/java/com/dlink/explainer/ca/TableCANode.java
+2
-2
TableCAResult.java
...e/src/main/java/com/dlink/explainer/ca/TableCAResult.java
+1
-1
LineageBuilder.java
...main/java/com/dlink/explainer/lineage/LineageBuilder.java
+7
-7
LineageColumn.java
.../main/java/com/dlink/explainer/lineage/LineageColumn.java
+2
-2
LineageColumnGenerator.java
...a/com/dlink/explainer/lineage/LineageColumnGenerator.java
+1
-7
LineageRelation.java
...ain/java/com/dlink/explainer/lineage/LineageRelation.java
+3
-2
LineageResult.java
.../main/java/com/dlink/explainer/lineage/LineageResult.java
+2
-2
LineageTable.java
...c/main/java/com/dlink/explainer/lineage/LineageTable.java
+3
-3
AbstractTrans.java
...rc/main/java/com/dlink/explainer/trans/AbstractTrans.java
+11
-11
SinkTrans.java
...re/src/main/java/com/dlink/explainer/trans/SinkTrans.java
+2
-2
SourceTrans.java
.../src/main/java/com/dlink/explainer/trans/SourceTrans.java
+2
-6
TransGenerator.java
...c/main/java/com/dlink/explainer/trans/TransGenerator.java
+1
-1
JobConfig.java
dlink-core/src/main/java/com/dlink/job/JobConfig.java
+33
-37
JobHandler.java
dlink-core/src/main/java/com/dlink/job/JobHandler.java
+10
-3
JobParam.java
dlink-core/src/main/java/com/dlink/job/JobParam.java
+1
-0
JobResult.java
dlink-core/src/main/java/com/dlink/job/JobResult.java
+1
-3
FlinkSqlPlus.java
dlink-core/src/main/java/com/dlink/plus/FlinkSqlPlus.java
+2
-3
SqlResult.java
dlink-core/src/main/java/com/dlink/plus/SqlResult.java
+1
-1
DDLResult.java
dlink-core/src/main/java/com/dlink/result/DDLResult.java
+1
-1
InsertResult.java
dlink-core/src/main/java/com/dlink/result/InsertResult.java
+3
-2
InsertResultBuilder.java
...e/src/main/java/com/dlink/result/InsertResultBuilder.java
+4
-5
ResultBuilder.java
dlink-core/src/main/java/com/dlink/result/ResultBuilder.java
+3
-3
ResultPool.java
dlink-core/src/main/java/com/dlink/result/ResultPool.java
+9
-9
ResultRunnable.java
...k-core/src/main/java/com/dlink/result/ResultRunnable.java
+0
-1
RunResult.java
dlink-core/src/main/java/com/dlink/result/RunResult.java
+1
-1
SelectResult.java
dlink-core/src/main/java/com/dlink/result/SelectResult.java
+10
-10
SelectResultBuilder.java
...e/src/main/java/com/dlink/result/SelectResultBuilder.java
+2
-9
SubmitResult.java
dlink-core/src/main/java/com/dlink/result/SubmitResult.java
+2
-2
SessionConfig.java
...k-core/src/main/java/com/dlink/session/SessionConfig.java
+4
-4
SessionInfo.java
dlink-core/src/main/java/com/dlink/session/SessionInfo.java
+2
-2
SessionPool.java
dlink-core/src/main/java/com/dlink/session/SessionPool.java
+10
-10
MapParseUtils.java
dlink-core/src/main/java/com/dlink/utils/MapParseUtils.java
+1
-8
UDFUtil.java
dlink-core/src/main/java/com/dlink/utils/UDFUtil.java
+4
-4
FlinkRestAPITest.java
...k-core/src/test/java/com/dlink/core/FlinkRestAPITest.java
+22
-22
FlinkSqlPlusTest.java
...k-core/src/test/java/com/dlink/core/FlinkSqlPlusTest.java
+1
-13
JobManagerTest.java
dlink-core/src/test/java/com/dlink/core/JobManagerTest.java
+7
-13
SqlParserTest.java
dlink-core/src/test/java/com/dlink/core/SqlParserTest.java
+13
-13
No files found.
dlink-core/src/main/java/com/dlink/cluster/FlinkCluster.java
View file @
04ef64e0
...
@@ -6,8 +6,6 @@ import com.dlink.assertion.Asserts;
...
@@ -6,8 +6,6 @@ import com.dlink.assertion.Asserts;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
java.net.SocketTimeoutException
;
/**
/**
* FlinkCluster
* FlinkCluster
*
*
...
@@ -18,33 +16,33 @@ public class FlinkCluster {
...
@@ -18,33 +16,33 @@ public class FlinkCluster {
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
FlinkCluster
.
class
);
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
FlinkCluster
.
class
);
public
static
FlinkClusterInfo
testFlinkJobManagerIP
(
String
hosts
,
String
host
)
{
public
static
FlinkClusterInfo
testFlinkJobManagerIP
(
String
hosts
,
String
host
)
{
if
(
Asserts
.
isNotNullString
(
host
))
{
if
(
Asserts
.
isNotNullString
(
host
))
{
FlinkClusterInfo
info
=
executeSocketTest
(
host
);
FlinkClusterInfo
info
=
executeSocketTest
(
host
);
if
(
info
.
isEffective
())
{
if
(
info
.
isEffective
())
{
return
info
;
return
info
;
}
}
}
}
String
[]
servers
=
hosts
.
split
(
","
);
String
[]
servers
=
hosts
.
split
(
","
);
for
(
String
server
:
servers
)
{
for
(
String
server
:
servers
)
{
FlinkClusterInfo
info
=
executeSocketTest
(
server
);
FlinkClusterInfo
info
=
executeSocketTest
(
server
);
if
(
info
.
isEffective
())
{
if
(
info
.
isEffective
())
{
return
info
;
return
info
;
}
}
}
}
return
FlinkClusterInfo
.
INEFFECTIVE
;
return
FlinkClusterInfo
.
INEFFECTIVE
;
}
}
private
static
FlinkClusterInfo
executeSocketTest
(
String
host
){
private
static
FlinkClusterInfo
executeSocketTest
(
String
host
)
{
try
{
try
{
String
res
=
FlinkAPI
.
build
(
host
).
getVersion
();
String
res
=
FlinkAPI
.
build
(
host
).
getVersion
();
if
(
Asserts
.
isNotNullString
(
res
))
{
if
(
Asserts
.
isNotNullString
(
res
))
{
return
FlinkClusterInfo
.
build
(
host
,
res
);
return
FlinkClusterInfo
.
build
(
host
,
res
);
}
}
}
catch
(
IORuntimeException
e
)
{
}
catch
(
IORuntimeException
e
)
{
logger
.
info
(
"Flink jobManager 地址排除 -- "
+
host
);
logger
.
info
(
"Flink jobManager 地址排除 -- "
+
host
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
return
FlinkClusterInfo
.
INEFFECTIVE
;
return
FlinkClusterInfo
.
INEFFECTIVE
;
}
}
...
...
dlink-core/src/main/java/com/dlink/cluster/FlinkClusterInfo.java
View file @
04ef64e0
...
@@ -28,7 +28,7 @@ public class FlinkClusterInfo {
...
@@ -28,7 +28,7 @@ public class FlinkClusterInfo {
this
.
version
=
version
;
this
.
version
=
version
;
}
}
public
static
FlinkClusterInfo
build
(
String
jobManagerAddress
,
String
version
){
public
static
FlinkClusterInfo
build
(
String
jobManagerAddress
,
String
version
)
{
return
new
FlinkClusterInfo
(
true
,
jobManagerAddress
,
version
);
return
new
FlinkClusterInfo
(
true
,
jobManagerAddress
,
version
);
}
}
}
}
dlink-core/src/main/java/com/dlink/config/Dialect.java
View file @
04ef64e0
...
@@ -10,9 +10,9 @@ import com.dlink.assertion.Asserts;
...
@@ -10,9 +10,9 @@ import com.dlink.assertion.Asserts;
**/
**/
public
enum
Dialect
{
public
enum
Dialect
{
FLINKSQL
(
"FlinkSql"
),
FLINKJAR
(
"FlinkJar"
),
FLINKSQLENV
(
"FlinkSqlEnv"
),
SQL
(
"Sql"
),
JAVA
(
"Java"
),
FLINKSQL
(
"FlinkSql"
),
FLINKJAR
(
"FlinkJar"
),
FLINKSQLENV
(
"FlinkSqlEnv"
),
SQL
(
"Sql"
),
JAVA
(
"Java"
),
MYSQL
(
"Mysql"
),
ORACLE
(
"Oracle"
),
SQLSERVER
(
"SqlServer"
),
POSTGRESQL
(
"PostGreSql"
),
CLICKHOUSE
(
"ClickHouse"
),
MYSQL
(
"Mysql"
),
ORACLE
(
"Oracle"
),
SQLSERVER
(
"SqlServer"
),
POSTGRESQL
(
"PostGreSql"
),
CLICKHOUSE
(
"ClickHouse"
),
DORIS
(
"Doris"
),
PHOENIX
(
"Phoenix"
),
HIVE
(
"Hive"
);
DORIS
(
"Doris"
),
PHOENIX
(
"Phoenix"
),
HIVE
(
"Hive"
);
private
String
value
;
private
String
value
;
...
@@ -26,23 +26,31 @@ public enum Dialect {
...
@@ -26,23 +26,31 @@ public enum Dialect {
return
value
;
return
value
;
}
}
public
boolean
equalsVal
(
String
valueText
){
public
boolean
equalsVal
(
String
valueText
)
{
return
Asserts
.
isEqualsIgnoreCase
(
value
,
valueText
);
return
Asserts
.
isEqualsIgnoreCase
(
value
,
valueText
);
}
}
public
static
Dialect
get
(
String
value
){
public
static
Dialect
get
(
String
value
)
{
for
(
Dialect
type
:
Dialect
.
values
())
{
for
(
Dialect
type
:
Dialect
.
values
())
{
if
(
Asserts
.
isEqualsIgnoreCase
(
type
.
getValue
(),
value
))
{
if
(
Asserts
.
isEqualsIgnoreCase
(
type
.
getValue
(),
value
))
{
return
type
;
return
type
;
}
}
}
}
return
Dialect
.
FLINKSQL
;
return
Dialect
.
FLINKSQL
;
}
}
public
static
boolean
isSql
(
String
value
){
public
static
boolean
isSql
(
String
value
)
{
Dialect
dialect
=
Dialect
.
get
(
value
);
Dialect
dialect
=
Dialect
.
get
(
value
);
switch
(
dialect
){
switch
(
dialect
)
{
case
SQL:
case
MYSQL:
case
ORACLE:
case
SQLSERVER:
case
POSTGRESQL:
case
CLICKHOUSE:
case
DORIS:
case
PHOENIX:
case
HIVE:
case
SQL:
case
MYSQL:
case
ORACLE:
case
SQLSERVER:
case
POSTGRESQL:
case
CLICKHOUSE:
case
DORIS:
case
PHOENIX:
case
HIVE:
return
true
;
return
true
;
default
:
default
:
return
false
;
return
false
;
...
...
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
View file @
04ef64e0
...
@@ -24,7 +24,9 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
...
@@ -24,7 +24,9 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
java.time.LocalDateTime
;
import
java.time.LocalDateTime
;
import
java.util.*
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
/**
/**
* Explainer
* Explainer
...
@@ -50,7 +52,7 @@ public class Explainer {
...
@@ -50,7 +52,7 @@ public class Explainer {
this
.
sqlSeparator
=
sqlSeparator
;
this
.
sqlSeparator
=
sqlSeparator
;
}
}
public
void
init
(){
public
void
init
()
{
sqlSeparator
=
SystemConfiguration
.
getInstances
().
getSqlSeparator
();
sqlSeparator
=
SystemConfiguration
.
getInstances
().
getSqlSeparator
();
}
}
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/CABuilder.java
View file @
04ef64e0
...
@@ -15,14 +15,14 @@ import java.util.Set;
...
@@ -15,14 +15,14 @@ import java.util.Set;
@Deprecated
@Deprecated
public
class
CABuilder
{
public
class
CABuilder
{
public
static
List
<
TableCANode
>
getOneTableCAByStatement
(
String
statement
){
public
static
List
<
TableCANode
>
getOneTableCAByStatement
(
String
statement
)
{
List
<
TableCANode
>
tableCANodes
=
new
ArrayList
<>();
List
<
TableCANode
>
tableCANodes
=
new
ArrayList
<>();
FlinkSqlPlus
plus
=
FlinkSqlPlus
.
build
();
FlinkSqlPlus
plus
=
FlinkSqlPlus
.
build
();
List
<
TableCAResult
>
results
=
plus
.
generateTableCA
(
statement
);
List
<
TableCAResult
>
results
=
plus
.
generateTableCA
(
statement
);
for
(
int
j
=
0
;
j
<
results
.
size
();
j
++)
{
for
(
int
j
=
0
;
j
<
results
.
size
();
j
++)
{
TableCAResult
result
=
results
.
get
(
j
);
TableCAResult
result
=
results
.
get
(
j
);
TableCANode
node
=
new
TableCANode
();
TableCANode
node
=
new
TableCANode
();
TableCA
sinkTableCA
=
(
TableCA
)
result
.
getSinkTableCA
();
TableCA
sinkTableCA
=
(
TableCA
)
result
.
getSinkTableCA
();
node
.
setName
(
sinkTableCA
.
getTableName
());
node
.
setName
(
sinkTableCA
.
getTableName
());
List
<
TableCANode
>
children
=
new
ArrayList
<>();
List
<
TableCANode
>
children
=
new
ArrayList
<>();
for
(
int
k
=
0
;
k
<
result
.
getSourceTableCAS
().
size
();
k
++)
{
for
(
int
k
=
0
;
k
<
result
.
getSourceTableCAS
().
size
();
k
++)
{
...
@@ -34,19 +34,19 @@ public class CABuilder {
...
@@ -34,19 +34,19 @@ public class CABuilder {
return
tableCANodes
;
return
tableCANodes
;
}
}
public
static
List
<
TableCANode
>
getOneTableColumnCAByStatement
(
String
statement
){
public
static
List
<
TableCANode
>
getOneTableColumnCAByStatement
(
String
statement
)
{
List
<
TableCANode
>
tableCANodes
=
new
ArrayList
<>();
List
<
TableCANode
>
tableCANodes
=
new
ArrayList
<>();
FlinkSqlPlus
plus
=
FlinkSqlPlus
.
build
();
FlinkSqlPlus
plus
=
FlinkSqlPlus
.
build
();
int
id
=
1
;
int
id
=
1
;
List
<
TableCAResult
>
results
=
plus
.
explainSqlTableColumnCA
(
statement
);
List
<
TableCAResult
>
results
=
plus
.
explainSqlTableColumnCA
(
statement
);
for
(
int
j
=
0
;
j
<
results
.
size
();
j
++)
{
for
(
int
j
=
0
;
j
<
results
.
size
();
j
++)
{
TableCAResult
result
=
results
.
get
(
j
);
TableCAResult
result
=
results
.
get
(
j
);
TableCA
sinkTableCA
=
(
TableCA
)
result
.
getSinkTableCA
();
TableCA
sinkTableCA
=
(
TableCA
)
result
.
getSinkTableCA
();
TableCANode
node
=
new
TableCANode
(
id
++,
sinkTableCA
.
getTableName
(),
sinkTableCA
.
getFields
());
TableCANode
node
=
new
TableCANode
(
id
++,
sinkTableCA
.
getTableName
(),
sinkTableCA
.
getFields
());
List
<
TableCANode
>
children
=
new
ArrayList
<>();
List
<
TableCANode
>
children
=
new
ArrayList
<>();
for
(
int
k
=
0
;
k
<
result
.
getSourceTableCAS
().
size
();
k
++)
{
for
(
int
k
=
0
;
k
<
result
.
getSourceTableCAS
().
size
();
k
++)
{
TableCA
tableCA
=
(
TableCA
)
result
.
getSourceTableCAS
().
get
(
k
);
TableCA
tableCA
=
(
TableCA
)
result
.
getSourceTableCAS
().
get
(
k
);
children
.
add
(
new
TableCANode
(
id
++,
tableCA
.
getTableName
(),
tableCA
.
getFields
()));
children
.
add
(
new
TableCANode
(
id
++,
tableCA
.
getTableName
(),
tableCA
.
getFields
()));
}
}
node
.
setChildren
(
children
);
node
.
setChildren
(
children
);
tableCANodes
.
add
(
node
);
tableCANodes
.
add
(
node
);
...
@@ -55,7 +55,7 @@ public class CABuilder {
...
@@ -55,7 +55,7 @@ public class CABuilder {
}
}
@Deprecated
@Deprecated
public
static
List
<
ColumnCANode
>
getColumnCAByStatement
(
String
statement
){
public
static
List
<
ColumnCANode
>
getColumnCAByStatement
(
String
statement
)
{
List
<
ColumnCANode
>
columnCANodes
=
new
ArrayList
<>();
List
<
ColumnCANode
>
columnCANodes
=
new
ArrayList
<>();
FlinkSqlPlus
plus
=
FlinkSqlPlus
.
build
();
FlinkSqlPlus
plus
=
FlinkSqlPlus
.
build
();
List
<
ColumnCAResult
>
columnCAResults
=
plus
.
explainSqlColumnCA
(
statement
);
List
<
ColumnCAResult
>
columnCAResults
=
plus
.
explainSqlColumnCA
(
statement
);
...
@@ -63,14 +63,14 @@ public class CABuilder {
...
@@ -63,14 +63,14 @@ public class CABuilder {
ColumnCAResult
result
=
columnCAResults
.
get
(
j
);
ColumnCAResult
result
=
columnCAResults
.
get
(
j
);
List
<
Integer
>
sinkColumns
=
result
.
getSinkColumns
();
List
<
Integer
>
sinkColumns
=
result
.
getSinkColumns
();
for
(
int
k
=
0
;
k
<
sinkColumns
.
size
();
k
++)
{
for
(
int
k
=
0
;
k
<
sinkColumns
.
size
();
k
++)
{
ColumnCA
columnCA
=
(
ColumnCA
)
result
.
getColumnCASMaps
().
get
(
sinkColumns
.
get
(
k
));
ColumnCA
columnCA
=
(
ColumnCA
)
result
.
getColumnCASMaps
().
get
(
sinkColumns
.
get
(
k
));
ColumnCANode
node
=
new
ColumnCANode
();
ColumnCANode
node
=
new
ColumnCANode
();
node
.
setName
(
columnCA
.
getAlias
());
node
.
setName
(
columnCA
.
getAlias
());
node
.
setType
(
columnCA
.
getType
());
node
.
setType
(
columnCA
.
getType
());
node
.
setTitle
(
columnCA
.
getAlias
());
node
.
setTitle
(
columnCA
.
getAlias
());
node
.
setOperation
(
columnCA
.
getOperation
());
node
.
setOperation
(
columnCA
.
getOperation
());
List
<
ColumnCANode
>
children
=
new
ArrayList
<>();
List
<
ColumnCANode
>
children
=
new
ArrayList
<>();
buildColumnCANodeChildren
(
children
,
result
,
sinkColumns
.
get
(
k
),
columnCA
.
getOperation
());
buildColumnCANodeChildren
(
children
,
result
,
sinkColumns
.
get
(
k
),
columnCA
.
getOperation
());
node
.
setChildren
(
children
);
node
.
setChildren
(
children
);
columnCANodes
.
add
(
node
);
columnCANodes
.
add
(
node
);
}
}
...
@@ -78,22 +78,22 @@ public class CABuilder {
...
@@ -78,22 +78,22 @@ public class CABuilder {
return
columnCANodes
;
return
columnCANodes
;
}
}
private
static
void
buildColumnCANodeChildren
(
List
<
ColumnCANode
>
children
,
ColumnCAResult
result
,
Integer
columnId
,
String
operation
)
{
private
static
void
buildColumnCANodeChildren
(
List
<
ColumnCANode
>
children
,
ColumnCAResult
result
,
Integer
columnId
,
String
operation
)
{
Set
<
NodeRel
>
columnCASRel
=
result
.
getColumnCASRel
();
Set
<
NodeRel
>
columnCASRel
=
result
.
getColumnCASRel
();
boolean
hasChildren
=
false
;
boolean
hasChildren
=
false
;
for
(
NodeRel
nodeRel
:
columnCASRel
)
{
for
(
NodeRel
nodeRel
:
columnCASRel
)
{
if
(
columnId
==
nodeRel
.
getSufId
())
{
if
(
columnId
==
nodeRel
.
getSufId
())
{
ColumnCA
childca
=
(
ColumnCA
)
result
.
getColumnCASMaps
().
get
(
nodeRel
.
getPreId
());
ColumnCA
childca
=
(
ColumnCA
)
result
.
getColumnCASMaps
().
get
(
nodeRel
.
getPreId
());
// operation = operation.replaceAll(childca.getAlias().replaceAll("\\$","\\\\$"),childca.getOperation());
// operation = operation.replaceAll(childca.getAlias().replaceAll("\\$","\\\\$"),childca.getOperation());
operation
=
operation
.
replaceAll
(
childca
.
getAlias
()
operation
=
operation
.
replaceAll
(
childca
.
getAlias
()
.
replaceAll
(
"\\)"
,
""
),
childca
.
getOperation
());
.
replaceAll
(
"\\)"
,
""
),
childca
.
getOperation
());
buildColumnCANodeChildren
(
children
,
result
,
nodeRel
.
getPreId
(),
operation
);
buildColumnCANodeChildren
(
children
,
result
,
nodeRel
.
getPreId
(),
operation
);
hasChildren
=
true
;
hasChildren
=
true
;
}
}
}
}
if
(!
hasChildren
)
{
if
(!
hasChildren
)
{
ColumnCA
columnCA
=
(
ColumnCA
)
result
.
getColumnCASMaps
().
get
(
columnId
);
ColumnCA
columnCA
=
(
ColumnCA
)
result
.
getColumnCASMaps
().
get
(
columnId
);
if
(
result
.
getSourColumns
().
contains
(
columnCA
.
getId
()))
{
if
(
result
.
getSourColumns
().
contains
(
columnCA
.
getId
()))
{
ColumnCANode
columnCANode
=
new
ColumnCANode
();
ColumnCANode
columnCANode
=
new
ColumnCANode
();
columnCANode
.
setName
(
columnCA
.
getName
());
columnCANode
.
setName
(
columnCA
.
getName
());
columnCANode
.
setTitle
(
columnCA
.
getName
());
columnCANode
.
setTitle
(
columnCA
.
getName
());
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/ColumnCA.java
View file @
04ef64e0
package
com
.
dlink
.
explainer
.
ca
;
package
com
.
dlink
.
explainer
.
ca
;
import
com.dlink.explainer.trans.Trans
;
import
lombok.Getter
;
import
lombok.Setter
;
import
java.util.List
;
import
java.util.List
;
/**
/**
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/ColumnCAGenerator.java
View file @
04ef64e0
...
@@ -2,21 +2,11 @@ package com.dlink.explainer.ca;
...
@@ -2,21 +2,11 @@ package com.dlink.explainer.ca;
import
com.dlink.assertion.Asserts
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.explainer.trans.Field
;
import
com.dlink.explainer.trans.*
;
import
com.dlink.explainer.trans.OperatorTrans
;
import
com.dlink.explainer.trans.SinkTrans
;
import
com.dlink.explainer.trans.SourceTrans
;
import
com.dlink.explainer.trans.Trans
;
import
com.dlink.utils.MapParseUtils
;
import
com.dlink.utils.MapParseUtils
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.collections.CollectionUtils
;
import
java.util.ArrayList
;
import
java.util.*
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
/**
/**
* ColumnCAGenerator
* ColumnCAGenerator
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/ColumnCANode.java
View file @
04ef64e0
...
@@ -23,7 +23,7 @@ public class ColumnCANode implements Serializable {
...
@@ -23,7 +23,7 @@ public class ColumnCANode implements Serializable {
private
String
value
;
private
String
value
;
private
String
type
;
private
String
type
;
private
String
operation
;
private
String
operation
;
// private Tables tables;
// private Tables tables;
// private Columns columns;
// private Columns columns;
private
List
<
ColumnCANode
>
children
;
private
List
<
ColumnCANode
>
children
;
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/ColumnCAResult.java
View file @
04ef64e0
package
com
.
dlink
.
explainer
.
ca
;
package
com
.
dlink
.
explainer
.
ca
;
import
com.dlink.explainer.lineage.LineageColumnGenerator
;
import
com.dlink.explainer.lineage.LineageColumnGenerator
;
import
lombok.Getter
;
import
lombok.Setter
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/ICA.java
View file @
04ef64e0
package
com
.
dlink
.
explainer
.
ca
;
package
com
.
dlink
.
explainer
.
ca
;
public
interface
ICA
{
public
interface
ICA
{
Integer
getId
()
;
Integer
getId
();
String
getTableName
();
String
getTableName
();
}
}
dlink-core/src/main/java/com/dlink/explainer/ca/TableCA.java
View file @
04ef64e0
package
com
.
dlink
.
explainer
.
ca
;
package
com
.
dlink
.
explainer
.
ca
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.explainer.trans.Field
;
import
com.dlink.explainer.trans.*
;
import
com.dlink.explainer.trans.OperatorTrans
;
import
com.dlink.explainer.trans.SinkTrans
;
import
com.dlink.explainer.trans.SourceTrans
;
import
com.dlink.explainer.trans.Trans
;
import
lombok.Getter
;
import
lombok.Getter
;
import
lombok.Setter
;
import
lombok.Setter
;
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/TableCAGenerator.java
View file @
04ef64e0
package
com
.
dlink
.
explainer
.
ca
;
package
com
.
dlink
.
explainer
.
ca
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.explainer.trans.Field
;
import
com.dlink.explainer.trans.*
;
import
com.dlink.explainer.trans.OperatorTrans
;
import
com.dlink.explainer.trans.SinkTrans
;
import
com.dlink.explainer.trans.SourceTrans
;
import
com.dlink.explainer.trans.Trans
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.collections.CollectionUtils
;
import
java.util.ArrayList
;
import
java.util.*
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.regex.Matcher
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
java.util.regex.Pattern
;
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/TableCANode.java
View file @
04ef64e0
...
@@ -23,7 +23,7 @@ public class TableCANode implements Serializable {
...
@@ -23,7 +23,7 @@ public class TableCANode implements Serializable {
private
String
value
;
private
String
value
;
private
String
type
;
private
String
type
;
private
Integer
columnSize
;
private
Integer
columnSize
;
// private Tables tables;
// private Tables tables;
private
List
<
String
>
columns
;
private
List
<
String
>
columns
;
private
List
<
TableCANode
>
children
;
private
List
<
TableCANode
>
children
;
...
@@ -39,7 +39,7 @@ public class TableCANode implements Serializable {
...
@@ -39,7 +39,7 @@ public class TableCANode implements Serializable {
this
.
value
=
value
;
this
.
value
=
value
;
}
}
public
TableCANode
(
Integer
id
,
String
name
,
List
<
String
>
columns
)
{
public
TableCANode
(
Integer
id
,
String
name
,
List
<
String
>
columns
)
{
this
.
id
=
id
.
toString
();
this
.
id
=
id
.
toString
();
this
.
name
=
name
;
this
.
name
=
name
;
this
.
title
=
name
;
this
.
title
=
name
;
...
...
dlink-core/src/main/java/com/dlink/explainer/ca/TableCAResult.java
View file @
04ef64e0
...
@@ -10,7 +10,7 @@ import java.util.List;
...
@@ -10,7 +10,7 @@ import java.util.List;
**/
**/
public
class
TableCAResult
{
public
class
TableCAResult
{
private
String
sinkName
;
private
String
sinkName
;
private
List
<
ICA
>
sourceTableCAS
;
private
List
<
ICA
>
sourceTableCAS
;
private
ICA
sinkTableCA
;
private
ICA
sinkTableCA
;
public
TableCAResult
(
TableCAGenerator
generator
)
{
public
TableCAResult
(
TableCAGenerator
generator
)
{
...
...
dlink-core/src/main/java/com/dlink/explainer/lineage/LineageBuilder.java
View file @
04ef64e0
...
@@ -16,19 +16,19 @@ import java.util.List;
...
@@ -16,19 +16,19 @@ import java.util.List;
*/
*/
public
class
LineageBuilder
{
public
class
LineageBuilder
{
public
static
LineageResult
getLineage
(
String
statement
){
public
static
LineageResult
getLineage
(
String
statement
)
{
FlinkSqlPlus
plus
=
FlinkSqlPlus
.
build
();
FlinkSqlPlus
plus
=
FlinkSqlPlus
.
build
();
List
<
ColumnCAResult
>
columnCAResults
=
plus
.
explainSqlColumnCA
(
statement
);
List
<
ColumnCAResult
>
columnCAResults
=
plus
.
explainSqlColumnCA
(
statement
);
List
<
LineageTable
>
tables
=
new
ArrayList
<>();
List
<
LineageTable
>
tables
=
new
ArrayList
<>();
List
<
LineageRelation
>
relations
=
new
ArrayList
<>();
List
<
LineageRelation
>
relations
=
new
ArrayList
<>();
int
index
=
0
;
int
index
=
0
;
for
(
ColumnCAResult
item:
columnCAResults
)
{
for
(
ColumnCAResult
item
:
columnCAResults
)
{
for
(
TableCA
tableCA:
item
.
getTableCAS
())
{
for
(
TableCA
tableCA
:
item
.
getTableCAS
())
{
tables
.
add
(
LineageTable
.
build
(
tableCA
));
tables
.
add
(
LineageTable
.
build
(
tableCA
));
}
}
for
(
NodeRel
nodeRel:
item
.
getColumnCASRelChain
())
{
for
(
NodeRel
nodeRel
:
item
.
getColumnCASRelChain
())
{
index
++;
index
++;
relations
.
add
(
LineageRelation
.
build
(
index
+
""
,
relations
.
add
(
LineageRelation
.
build
(
index
+
""
,
item
.
getColumnCASMaps
().
get
(
nodeRel
.
getPreId
()).
getTableId
().
toString
(),
item
.
getColumnCASMaps
().
get
(
nodeRel
.
getPreId
()).
getTableId
().
toString
(),
item
.
getColumnCASMaps
().
get
(
nodeRel
.
getSufId
()).
getTableId
().
toString
(),
item
.
getColumnCASMaps
().
get
(
nodeRel
.
getSufId
()).
getTableId
().
toString
(),
item
.
getColumnCASMaps
().
get
(
nodeRel
.
getPreId
()).
getName
(),
item
.
getColumnCASMaps
().
get
(
nodeRel
.
getPreId
()).
getName
(),
...
@@ -36,6 +36,6 @@ public class LineageBuilder {
...
@@ -36,6 +36,6 @@ public class LineageBuilder {
));
));
}
}
}
}
return
LineageResult
.
build
(
tables
,
relations
);
return
LineageResult
.
build
(
tables
,
relations
);
}
}
}
}
dlink-core/src/main/java/com/dlink/explainer/lineage/LineageColumn.java
View file @
04ef64e0
...
@@ -18,8 +18,8 @@ public class LineageColumn {
...
@@ -18,8 +18,8 @@ public class LineageColumn {
this
.
title
=
title
;
this
.
title
=
title
;
}
}
public
static
LineageColumn
build
(
String
name
,
String
title
){
public
static
LineageColumn
build
(
String
name
,
String
title
)
{
return
new
LineageColumn
(
name
,
title
);
return
new
LineageColumn
(
name
,
title
);
}
}
public
String
getName
()
{
public
String
getName
()
{
...
...
dlink-core/src/main/java/com/dlink/explainer/lineage/LineageColumnGenerator.java
View file @
04ef64e0
...
@@ -7,16 +7,10 @@ import com.dlink.explainer.ca.TableCA;
...
@@ -7,16 +7,10 @@ import com.dlink.explainer.ca.TableCA;
import
com.dlink.explainer.trans.Field
;
import
com.dlink.explainer.trans.Field
;
import
com.dlink.explainer.trans.OperatorTrans
;
import
com.dlink.explainer.trans.OperatorTrans
;
import
com.dlink.explainer.trans.SinkTrans
;
import
com.dlink.explainer.trans.SinkTrans
;
import
com.dlink.explainer.trans.SourceTrans
;
import
com.dlink.explainer.trans.Trans
;
import
com.dlink.explainer.trans.Trans
;
import
com.dlink.utils.MapParseUtils
;
import
com.dlink.utils.MapParseUtils
;
import
java.util.ArrayList
;
import
java.util.*
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
/**
/**
* LineageColumnGenerator
* LineageColumnGenerator
...
...
dlink-core/src/main/java/com/dlink/explainer/lineage/LineageRelation.java
View file @
04ef64e0
...
@@ -24,9 +24,10 @@ public class LineageRelation {
...
@@ -24,9 +24,10 @@ public class LineageRelation {
this
.
tgtTableColName
=
tgtTableColName
;
this
.
tgtTableColName
=
tgtTableColName
;
}
}
public
static
LineageRelation
build
(
String
id
,
String
srcTableId
,
String
tgtTableId
,
String
srcTableColName
,
String
tgtTableColName
){
public
static
LineageRelation
build
(
String
id
,
String
srcTableId
,
String
tgtTableId
,
String
srcTableColName
,
String
tgtTableColName
)
{
return
new
LineageRelation
(
id
,
srcTableId
,
tgtTableId
,
srcTableColName
,
tgtTableColName
);
return
new
LineageRelation
(
id
,
srcTableId
,
tgtTableId
,
srcTableColName
,
tgtTableColName
);
}
}
public
String
getId
()
{
public
String
getId
()
{
return
id
;
return
id
;
}
}
...
...
dlink-core/src/main/java/com/dlink/explainer/lineage/LineageResult.java
View file @
04ef64e0
...
@@ -20,8 +20,8 @@ public class LineageResult {
...
@@ -20,8 +20,8 @@ public class LineageResult {
this
.
relations
=
relations
;
this
.
relations
=
relations
;
}
}
public
static
LineageResult
build
(
List
<
LineageTable
>
tables
,
List
<
LineageRelation
>
relations
){
public
static
LineageResult
build
(
List
<
LineageTable
>
tables
,
List
<
LineageRelation
>
relations
)
{
return
new
LineageResult
(
tables
,
relations
);
return
new
LineageResult
(
tables
,
relations
);
}
}
public
List
<
LineageTable
>
getTables
()
{
public
List
<
LineageTable
>
getTables
()
{
...
...
dlink-core/src/main/java/com/dlink/explainer/lineage/LineageTable.java
View file @
04ef64e0
...
@@ -19,13 +19,13 @@ public class LineageTable {
...
@@ -19,13 +19,13 @@ public class LineageTable {
public
LineageTable
()
{
public
LineageTable
()
{
}
}
public
static
LineageTable
build
(
TableCA
tableCA
){
public
static
LineageTable
build
(
TableCA
tableCA
)
{
LineageTable
lineageTable
=
new
LineageTable
();
LineageTable
lineageTable
=
new
LineageTable
();
lineageTable
.
setId
(
tableCA
.
getId
().
toString
());
lineageTable
.
setId
(
tableCA
.
getId
().
toString
());
lineageTable
.
setName
(
tableCA
.
getName
());
lineageTable
.
setName
(
tableCA
.
getName
());
List
<
LineageColumn
>
columnList
=
new
ArrayList
<>();
List
<
LineageColumn
>
columnList
=
new
ArrayList
<>();
for
(
String
columnName:
tableCA
.
getFields
())
{
for
(
String
columnName
:
tableCA
.
getFields
())
{
columnList
.
add
(
LineageColumn
.
build
(
columnName
,
columnName
));
columnList
.
add
(
LineageColumn
.
build
(
columnName
,
columnName
));
}
}
lineageTable
.
setColumns
(
columnList
);
lineageTable
.
setColumns
(
columnList
);
return
lineageTable
;
return
lineageTable
;
...
...
dlink-core/src/main/java/com/dlink/explainer/trans/AbstractTrans.java
View file @
04ef64e0
...
@@ -47,48 +47,48 @@ public abstract class AbstractTrans {
...
@@ -47,48 +47,48 @@ public abstract class AbstractTrans {
abstract
void
translate
();
abstract
void
translate
();
public
static
String
matchType
(
String
str
){
public
static
String
matchType
(
String
str
)
{
Pattern
p
=
Pattern
.
compile
(
"(.*?)\\("
);
Pattern
p
=
Pattern
.
compile
(
"(.*?)\\("
);
Matcher
m
=
p
.
matcher
(
str
);
Matcher
m
=
p
.
matcher
(
str
);
String
type
=
null
;
String
type
=
null
;
if
(
m
.
find
())
{
if
(
m
.
find
())
{
type
=
m
.
group
(
0
).
replaceAll
(
"\\("
,
""
).
trim
();
type
=
m
.
group
(
0
).
replaceAll
(
"\\("
,
""
).
trim
();
}
else
{
}
else
{
type
=
str
;
type
=
str
;
}
}
return
type
;
return
type
;
}
}
public
static
String
matchPact
(
String
str
){
public
static
String
matchPact
(
String
str
)
{
Pattern
p
=
Pattern
.
compile
(
": (.*?)$"
);
Pattern
p
=
Pattern
.
compile
(
": (.*?)$"
);
Matcher
m
=
p
.
matcher
(
str
);
Matcher
m
=
p
.
matcher
(
str
);
String
pact
=
null
;
String
pact
=
null
;
if
(
m
.
find
())
{
if
(
m
.
find
())
{
pact
=
m
.
group
(
0
).
replaceAll
(
": "
,
""
).
trim
();
pact
=
m
.
group
(
0
).
replaceAll
(
": "
,
""
).
trim
();
}
else
{
}
else
{
pact
=
str
;
pact
=
str
;
}
}
return
pact
;
return
pact
;
}
}
public
static
String
matchContents
(
String
str
){
public
static
String
matchContents
(
String
str
)
{
Pattern
p
=
Pattern
.
compile
(
"\\((.*?)$"
);
Pattern
p
=
Pattern
.
compile
(
"\\((.*?)$"
);
Matcher
m
=
p
.
matcher
(
str
);
Matcher
m
=
p
.
matcher
(
str
);
String
contents
=
null
;
String
contents
=
null
;
if
(
m
.
find
())
{
if
(
m
.
find
())
{
contents
=
m
.
group
(
0
).
replaceFirst
(
"\\("
,
""
).
trim
();
contents
=
m
.
group
(
0
).
replaceFirst
(
"\\("
,
""
).
trim
();
contents
=
contents
.
substring
(
0
,
contents
.
lastIndexOf
(
")"
));
contents
=
contents
.
substring
(
0
,
contents
.
lastIndexOf
(
")"
));
}
else
{
}
else
{
contents
=
str
;
contents
=
str
;
}
}
return
contents
;
return
contents
;
}
}
public
static
String
matchStage
(
String
str
){
public
static
String
matchStage
(
String
str
)
{
Pattern
p
=
Pattern
.
compile
(
"Stage (.*?) :"
);
Pattern
p
=
Pattern
.
compile
(
"Stage (.*?) :"
);
Matcher
m
=
p
.
matcher
(
str
);
Matcher
m
=
p
.
matcher
(
str
);
String
type
=
null
;
String
type
=
null
;
if
(
m
.
find
())
{
if
(
m
.
find
())
{
type
=
m
.
group
(
0
).
replaceFirst
(
"Stage "
,
""
).
replaceFirst
(
" :"
,
""
).
trim
();
type
=
m
.
group
(
0
).
replaceFirst
(
"Stage "
,
""
).
replaceFirst
(
" :"
,
""
).
trim
();
}
}
return
type
;
return
type
;
...
...
dlink-core/src/main/java/com/dlink/explainer/trans/SinkTrans.java
View file @
04ef64e0
...
@@ -71,9 +71,9 @@ public class SinkTrans extends AbstractTrans implements Trans {
...
@@ -71,9 +71,9 @@ public class SinkTrans extends AbstractTrans implements Trans {
public
void
translate
()
{
public
void
translate
()
{
Map
map
=
MapParseUtils
.
parse
(
contents
);
Map
map
=
MapParseUtils
.
parse
(
contents
);
ArrayList
<
String
>
tables
=
(
ArrayList
<
String
>)
map
.
get
(
"table"
);
ArrayList
<
String
>
tables
=
(
ArrayList
<
String
>)
map
.
get
(
"table"
);
if
(
tables
!=
null
&&
tables
.
size
()>
0
)
{
if
(
tables
!=
null
&&
tables
.
size
()
>
0
)
{
name
=
tables
.
get
(
0
);
name
=
tables
.
get
(
0
);
String
[]
names
=
tables
.
get
(
0
).
split
(
"\\."
);
String
[]
names
=
tables
.
get
(
0
).
split
(
"\\."
);
if
(
names
.
length
>=
3
)
{
if
(
names
.
length
>=
3
)
{
catalog
=
names
[
0
];
catalog
=
names
[
0
];
database
=
names
[
1
];
database
=
names
[
1
];
...
...
dlink-core/src/main/java/com/dlink/explainer/trans/SourceTrans.java
View file @
04ef64e0
...
@@ -5,11 +5,7 @@ import org.apache.commons.lang3.StringUtils;
...
@@ -5,11 +5,7 @@ import org.apache.commons.lang3.StringUtils;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.OperationUtils
;
import
org.apache.flink.table.operations.OperationUtils
;
import
java.util.ArrayList
;
import
java.util.*
;
import
java.util.Collections
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
/**
* SourceTrans
* SourceTrans
...
@@ -79,7 +75,7 @@ public class SourceTrans extends AbstractTrans implements Trans {
...
@@ -79,7 +75,7 @@ public class SourceTrans extends AbstractTrans implements Trans {
ArrayList
<
ArrayList
<
Object
>>
tables
=
(
ArrayList
<
ArrayList
<
Object
>>)
map
.
get
(
"table"
);
ArrayList
<
ArrayList
<
Object
>>
tables
=
(
ArrayList
<
ArrayList
<
Object
>>)
map
.
get
(
"table"
);
ArrayList
<
Object
>
names
=
tables
.
get
(
0
);
ArrayList
<
Object
>
names
=
tables
.
get
(
0
);
if
(
names
.
size
()
==
4
)
{
if
(
names
.
size
()
==
4
)
{
project
=
(
ArrayList
<
String
>)((
Map
)
names
.
get
(
3
)).
get
(
"project"
);
project
=
(
ArrayList
<
String
>)
((
Map
)
names
.
get
(
3
)).
get
(
"project"
);
names
.
remove
(
3
);
names
.
remove
(
3
);
}
}
name
=
StringUtils
.
join
(
names
,
"."
);
name
=
StringUtils
.
join
(
names
,
"."
);
...
...
dlink-core/src/main/java/com/dlink/explainer/trans/TransGenerator.java
View file @
04ef64e0
...
@@ -43,7 +43,7 @@ public class TransGenerator {
...
@@ -43,7 +43,7 @@ public class TransGenerator {
for
(
JsonNode
node
:
nodes
)
{
for
(
JsonNode
node
:
nodes
)
{
String
pact
=
node
.
get
(
"pact"
).
asText
();
String
pact
=
node
.
get
(
"pact"
).
asText
();
Trans
trans
=
getTrans
(
pact
);
Trans
trans
=
getTrans
(
pact
);
Asserts
.
checkNotNull
(
trans
,
"该转换无法被解析,原文如下:"
+
pact
);
Asserts
.
checkNotNull
(
trans
,
"该转换无法被解析,原文如下:"
+
pact
);
trans
.
build
(
node
);
trans
.
build
(
node
);
nodemap
.
put
(
trans
.
getId
(),
trans
);
nodemap
.
put
(
trans
.
getId
(),
trans
);
}
}
...
...
dlink-core/src/main/java/com/dlink/job/JobConfig.java
View file @
04ef64e0
...
@@ -3,11 +3,7 @@ package com.dlink.job;
...
@@ -3,11 +3,7 @@ package com.dlink.job;
import
com.dlink.assertion.Asserts
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.executor.ExecutorSetting
;
import
com.dlink.executor.ExecutorSetting
;
import
com.dlink.gateway.GatewayType
;
import
com.dlink.gateway.GatewayType
;
import
com.dlink.gateway.config.AppConfig
;
import
com.dlink.gateway.config.*
;
import
com.dlink.gateway.config.ClusterConfig
;
import
com.dlink.gateway.config.FlinkConfig
;
import
com.dlink.gateway.config.GatewayConfig
;
import
com.dlink.gateway.config.SavePointStrategy
;
import
com.dlink.session.SessionConfig
;
import
com.dlink.session.SessionConfig
;
import
lombok.Getter
;
import
lombok.Getter
;
import
lombok.Setter
;
import
lombok.Setter
;
...
@@ -38,7 +34,7 @@ public class JobConfig {
...
@@ -38,7 +34,7 @@ public class JobConfig {
private
Integer
clusterId
;
private
Integer
clusterId
;
private
Integer
clusterConfigurationId
;
private
Integer
clusterConfigurationId
;
private
Integer
jarId
;
private
Integer
jarId
;
private
boolean
isJarTask
=
false
;
private
boolean
isJarTask
=
false
;
private
String
address
;
private
String
address
;
private
Integer
taskId
;
private
Integer
taskId
;
private
String
jobName
;
private
String
jobName
;
...
@@ -52,7 +48,7 @@ public class JobConfig {
...
@@ -52,7 +48,7 @@ public class JobConfig {
private
String
savePointPath
;
private
String
savePointPath
;
private
GatewayConfig
gatewayConfig
;
private
GatewayConfig
gatewayConfig
;
private
Map
<
String
,
String
>
config
;
private
Map
<
String
,
String
>
config
;
public
JobConfig
()
{
public
JobConfig
()
{
}
}
...
@@ -70,7 +66,7 @@ public class JobConfig {
...
@@ -70,7 +66,7 @@ public class JobConfig {
public
JobConfig
(
String
type
,
boolean
useResult
,
boolean
useChangeLog
,
boolean
useAutoCancel
,
boolean
useSession
,
String
session
,
Integer
clusterId
,
public
JobConfig
(
String
type
,
boolean
useResult
,
boolean
useChangeLog
,
boolean
useAutoCancel
,
boolean
useSession
,
String
session
,
Integer
clusterId
,
Integer
clusterConfigurationId
,
Integer
jarId
,
Integer
taskId
,
String
jobName
,
boolean
useSqlFragment
,
Integer
clusterConfigurationId
,
Integer
jarId
,
Integer
taskId
,
String
jobName
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
boolean
useBatchModel
,
Integer
maxRowNum
,
Integer
checkpoint
,
Integer
parallelism
,
boolean
useStatementSet
,
boolean
useBatchModel
,
Integer
maxRowNum
,
Integer
checkpoint
,
Integer
parallelism
,
Integer
savePointStrategyValue
,
String
savePointPath
,
Map
<
String
,
String
>
config
)
{
Integer
savePointStrategyValue
,
String
savePointPath
,
Map
<
String
,
String
>
config
)
{
this
.
type
=
type
;
this
.
type
=
type
;
this
.
useResult
=
useResult
;
this
.
useResult
=
useResult
;
this
.
useChangeLog
=
useChangeLog
;
this
.
useChangeLog
=
useChangeLog
;
...
@@ -94,10 +90,10 @@ public class JobConfig {
...
@@ -94,10 +90,10 @@ public class JobConfig {
this
.
config
=
config
;
this
.
config
=
config
;
}
}
public
JobConfig
(
String
type
,
boolean
useResult
,
boolean
useChangeLog
,
boolean
useAutoCancel
,
boolean
useSession
,
String
session
,
boolean
useRemote
,
String
address
,
public
JobConfig
(
String
type
,
boolean
useResult
,
boolean
useChangeLog
,
boolean
useAutoCancel
,
boolean
useSession
,
String
session
,
boolean
useRemote
,
String
address
,
String
jobName
,
boolean
useSqlFragment
,
String
jobName
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
Integer
maxRowNum
,
Integer
checkpoint
,
Integer
parallelism
,
boolean
useStatementSet
,
Integer
maxRowNum
,
Integer
checkpoint
,
Integer
parallelism
,
Integer
savePointStrategyValue
,
String
savePointPath
,
Map
<
String
,
String
>
config
,
GatewayConfig
gatewayConfig
)
{
Integer
savePointStrategyValue
,
String
savePointPath
,
Map
<
String
,
String
>
config
,
GatewayConfig
gatewayConfig
)
{
this
.
type
=
type
;
this
.
type
=
type
;
this
.
useResult
=
useResult
;
this
.
useResult
=
useResult
;
this
.
useChangeLog
=
useChangeLog
;
this
.
useChangeLog
=
useChangeLog
;
...
@@ -118,7 +114,7 @@ public class JobConfig {
...
@@ -118,7 +114,7 @@ public class JobConfig {
this
.
gatewayConfig
=
gatewayConfig
;
this
.
gatewayConfig
=
gatewayConfig
;
}
}
public
JobConfig
(
String
type
,
boolean
useResult
,
boolean
useSession
,
String
session
,
boolean
useRemote
,
Integer
clusterId
,
Integer
maxRowNum
)
{
public
JobConfig
(
String
type
,
boolean
useResult
,
boolean
useSession
,
String
session
,
boolean
useRemote
,
Integer
clusterId
,
Integer
maxRowNum
)
{
this
.
type
=
type
;
this
.
type
=
type
;
this
.
useResult
=
useResult
;
this
.
useResult
=
useResult
;
this
.
useSession
=
useSession
;
this
.
useSession
=
useSession
;
...
@@ -128,10 +124,10 @@ public class JobConfig {
...
@@ -128,10 +124,10 @@ public class JobConfig {
this
.
maxRowNum
=
maxRowNum
;
this
.
maxRowNum
=
maxRowNum
;
}
}
public
JobConfig
(
String
type
,
Integer
step
,
boolean
useResult
,
boolean
useSession
,
boolean
useRemote
,
Integer
clusterId
,
public
JobConfig
(
String
type
,
Integer
step
,
boolean
useResult
,
boolean
useSession
,
boolean
useRemote
,
Integer
clusterId
,
Integer
clusterConfigurationId
,
Integer
jarId
,
Integer
taskId
,
String
jobName
,
boolean
useSqlFragment
,
Integer
clusterConfigurationId
,
Integer
jarId
,
Integer
taskId
,
String
jobName
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
boolean
useBatchModel
,
Integer
checkpoint
,
Integer
parallelism
,
Integer
savePointStrategyValue
,
boolean
useStatementSet
,
boolean
useBatchModel
,
Integer
checkpoint
,
Integer
parallelism
,
Integer
savePointStrategyValue
,
String
savePointPath
,
Map
<
String
,
String
>
config
)
{
String
savePointPath
,
Map
<
String
,
String
>
config
)
{
this
.
type
=
type
;
this
.
type
=
type
;
this
.
step
=
step
;
this
.
step
=
step
;
this
.
useResult
=
useResult
;
this
.
useResult
=
useResult
;
...
@@ -152,68 +148,68 @@ public class JobConfig {
...
@@ -152,68 +148,68 @@ public class JobConfig {
this
.
config
=
config
;
this
.
config
=
config
;
}
}
public
ExecutorSetting
getExecutorSetting
(){
public
ExecutorSetting
getExecutorSetting
()
{
return
new
ExecutorSetting
(
checkpoint
,
parallelism
,
useSqlFragment
,
useStatementSet
,
useBatchModel
,
savePointPath
,
jobName
,
config
);
return
new
ExecutorSetting
(
checkpoint
,
parallelism
,
useSqlFragment
,
useStatementSet
,
useBatchModel
,
savePointPath
,
jobName
,
config
);
}
}
public
void
setSessionConfig
(
SessionConfig
sessionConfig
){
public
void
setSessionConfig
(
SessionConfig
sessionConfig
)
{
if
(
sessionConfig
!=
null
)
{
if
(
sessionConfig
!=
null
)
{
address
=
sessionConfig
.
getAddress
();
address
=
sessionConfig
.
getAddress
();
clusterId
=
sessionConfig
.
getClusterId
();
clusterId
=
sessionConfig
.
getClusterId
();
useRemote
=
sessionConfig
.
isUseRemote
();
useRemote
=
sessionConfig
.
isUseRemote
();
}
}
}
}
public
void
buildGatewayConfig
(
Map
<
String
,
Object
>
config
)
{
public
void
buildGatewayConfig
(
Map
<
String
,
Object
>
config
)
{
gatewayConfig
=
new
GatewayConfig
();
gatewayConfig
=
new
GatewayConfig
();
if
(
config
.
containsKey
(
"hadoopConfigPath"
))
{
if
(
config
.
containsKey
(
"hadoopConfigPath"
))
{
gatewayConfig
.
setClusterConfig
(
ClusterConfig
.
build
(
config
.
get
(
"flinkConfigPath"
).
toString
(),
gatewayConfig
.
setClusterConfig
(
ClusterConfig
.
build
(
config
.
get
(
"flinkConfigPath"
).
toString
(),
config
.
get
(
"flinkLibPath"
).
toString
(),
config
.
get
(
"flinkLibPath"
).
toString
(),
config
.
get
(
"hadoopConfigPath"
).
toString
()));
config
.
get
(
"hadoopConfigPath"
).
toString
()));
}
else
{
}
else
{
gatewayConfig
.
setClusterConfig
(
ClusterConfig
.
build
(
config
.
get
(
"flinkConfigPath"
).
toString
(),
gatewayConfig
.
setClusterConfig
(
ClusterConfig
.
build
(
config
.
get
(
"flinkConfigPath"
).
toString
(),
config
.
get
(
"flinkLibPath"
).
toString
(),
config
.
get
(
"flinkLibPath"
).
toString
(),
""
));
""
));
}
}
AppConfig
appConfig
=
new
AppConfig
();
AppConfig
appConfig
=
new
AppConfig
();
if
(
config
.
containsKey
(
"userJarPath"
)
&&
Asserts
.
isNotNullString
((
String
)
config
.
get
(
"userJarPath"
)))
{
if
(
config
.
containsKey
(
"userJarPath"
)
&&
Asserts
.
isNotNullString
((
String
)
config
.
get
(
"userJarPath"
)))
{
appConfig
.
setUserJarPath
(
config
.
get
(
"userJarPath"
).
toString
());
appConfig
.
setUserJarPath
(
config
.
get
(
"userJarPath"
).
toString
());
if
(
config
.
containsKey
(
"userJarMainAppClass"
)
&&
Asserts
.
isNotNullString
((
String
)
config
.
get
(
"userJarMainAppClass"
)))
{
if
(
config
.
containsKey
(
"userJarMainAppClass"
)
&&
Asserts
.
isNotNullString
((
String
)
config
.
get
(
"userJarMainAppClass"
)))
{
appConfig
.
setUserJarMainAppClass
(
config
.
get
(
"userJarMainAppClass"
).
toString
());
appConfig
.
setUserJarMainAppClass
(
config
.
get
(
"userJarMainAppClass"
).
toString
());
}
}
if
(
config
.
containsKey
(
"userJarParas"
)
&&
Asserts
.
isNotNullString
((
String
)
config
.
get
(
"userJarParas"
)))
{
if
(
config
.
containsKey
(
"userJarParas"
)
&&
Asserts
.
isNotNullString
((
String
)
config
.
get
(
"userJarParas"
)))
{
appConfig
.
setUserJarParas
(
config
.
get
(
"userJarParas"
).
toString
().
split
(
" "
));
appConfig
.
setUserJarParas
(
config
.
get
(
"userJarParas"
).
toString
().
split
(
" "
));
}
}
gatewayConfig
.
setAppConfig
(
appConfig
);
gatewayConfig
.
setAppConfig
(
appConfig
);
}
}
if
(
config
.
containsKey
(
"flinkConfig"
)
&&
Asserts
.
isNotNullMap
((
Map
<
String
,
String
>)
config
.
get
(
"flinkConfig"
)))
{
if
(
config
.
containsKey
(
"flinkConfig"
)
&&
Asserts
.
isNotNullMap
((
Map
<
String
,
String
>)
config
.
get
(
"flinkConfig"
)))
{
gatewayConfig
.
setFlinkConfig
(
FlinkConfig
.
build
((
Map
<
String
,
String
>)
config
.
get
(
"flinkConfig"
)));
gatewayConfig
.
setFlinkConfig
(
FlinkConfig
.
build
((
Map
<
String
,
String
>)
config
.
get
(
"flinkConfig"
)));
}
}
if
(
config
.
containsKey
(
"kubernetesConfig"
))
{
if
(
config
.
containsKey
(
"kubernetesConfig"
))
{
Map
kubernetesConfig
=
(
Map
)
config
.
get
(
"kubernetesConfig"
);
Map
kubernetesConfig
=
(
Map
)
config
.
get
(
"kubernetesConfig"
);
if
(
kubernetesConfig
.
containsKey
(
"kubernetes.namespace"
))
{
if
(
kubernetesConfig
.
containsKey
(
"kubernetes.namespace"
))
{
gatewayConfig
.
getFlinkConfig
().
getConfiguration
().
put
(
"kubernetes.namespace"
,
kubernetesConfig
.
get
(
"kubernetes.namespace"
).
toString
());
gatewayConfig
.
getFlinkConfig
().
getConfiguration
().
put
(
"kubernetes.namespace"
,
kubernetesConfig
.
get
(
"kubernetes.namespace"
).
toString
());
}
}
if
(
kubernetesConfig
.
containsKey
(
"kubernetes.cluster-id"
))
{
if
(
kubernetesConfig
.
containsKey
(
"kubernetes.cluster-id"
))
{
gatewayConfig
.
getFlinkConfig
().
getConfiguration
().
put
(
"kubernetes.cluster-id"
,
kubernetesConfig
.
get
(
"kubernetes.cluster-id"
).
toString
());
gatewayConfig
.
getFlinkConfig
().
getConfiguration
().
put
(
"kubernetes.cluster-id"
,
kubernetesConfig
.
get
(
"kubernetes.cluster-id"
).
toString
());
}
}
if
(
kubernetesConfig
.
containsKey
(
"kubernetes.container.image"
))
{
if
(
kubernetesConfig
.
containsKey
(
"kubernetes.container.image"
))
{
gatewayConfig
.
getFlinkConfig
().
getConfiguration
().
put
(
"kubernetes.container.image"
,
kubernetesConfig
.
get
(
"kubernetes.container.image"
).
toString
());
gatewayConfig
.
getFlinkConfig
().
getConfiguration
().
put
(
"kubernetes.container.image"
,
kubernetesConfig
.
get
(
"kubernetes.container.image"
).
toString
());
}
}
}
}
}
}
public
void
addGatewayConfig
(
List
<
Map
<
String
,
String
>>
configList
){
public
void
addGatewayConfig
(
List
<
Map
<
String
,
String
>>
configList
)
{
if
(
Asserts
.
isNull
(
gatewayConfig
))
{
if
(
Asserts
.
isNull
(
gatewayConfig
))
{
gatewayConfig
=
new
GatewayConfig
();
gatewayConfig
=
new
GatewayConfig
();
}
}
for
(
Map
<
String
,
String
>
item
:
configList
)
{
for
(
Map
<
String
,
String
>
item
:
configList
)
{
gatewayConfig
.
getFlinkConfig
().
getConfiguration
().
put
(
item
.
get
(
"key"
),
item
.
get
(
"value"
));
gatewayConfig
.
getFlinkConfig
().
getConfiguration
().
put
(
item
.
get
(
"key"
),
item
.
get
(
"value"
));
}
}
}
}
public
void
addGatewayConfig
(
Map
<
String
,
Object
>
config
){
public
void
addGatewayConfig
(
Map
<
String
,
Object
>
config
)
{
if
(
Asserts
.
isNull
(
gatewayConfig
))
{
if
(
Asserts
.
isNull
(
gatewayConfig
))
{
gatewayConfig
=
new
GatewayConfig
();
gatewayConfig
=
new
GatewayConfig
();
}
}
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
config
.
entrySet
())
{
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
config
.
entrySet
())
{
...
@@ -225,7 +221,7 @@ public class JobConfig {
...
@@ -225,7 +221,7 @@ public class JobConfig {
return
!
GatewayType
.
LOCAL
.
equalsValue
(
type
);
return
!
GatewayType
.
LOCAL
.
equalsValue
(
type
);
}
}
public
void
buildLocal
(){
public
void
buildLocal
()
{
type
=
GatewayType
.
LOCAL
.
getLongValue
();
type
=
GatewayType
.
LOCAL
.
getLongValue
();
}
}
}
}
dlink-core/src/main/java/com/dlink/job/JobHandler.java
View file @
04ef64e0
...
@@ -12,15 +12,22 @@ import java.util.ServiceLoader;
...
@@ -12,15 +12,22 @@ import java.util.ServiceLoader;
*/
*/
public
interface
JobHandler
{
public
interface
JobHandler
{
boolean
init
();
boolean
init
();
boolean
ready
();
boolean
ready
();
boolean
running
();
boolean
running
();
boolean
success
();
boolean
success
();
boolean
failed
();
boolean
failed
();
boolean
callback
();
boolean
callback
();
boolean
close
();
boolean
close
();
static
JobHandler
build
(){
static
JobHandler
build
()
{
ServiceLoader
<
JobHandler
>
jobHandlers
=
ServiceLoader
.
load
(
JobHandler
.
class
);
ServiceLoader
<
JobHandler
>
jobHandlers
=
ServiceLoader
.
load
(
JobHandler
.
class
);
for
(
JobHandler
jobHandler
:
jobHandlers
)
{
for
(
JobHandler
jobHandler
:
jobHandlers
)
{
return
jobHandler
;
return
jobHandler
;
}
}
throw
new
JobException
(
"There is no corresponding implementation class for this interface!"
);
throw
new
JobException
(
"There is no corresponding implementation class for this interface!"
);
...
...
dlink-core/src/main/java/com/dlink/job/JobParam.java
View file @
04ef64e0
...
@@ -17,6 +17,7 @@ public class JobParam {
...
@@ -17,6 +17,7 @@ public class JobParam {
this
.
ddl
=
ddl
;
this
.
ddl
=
ddl
;
this
.
trans
=
trans
;
this
.
trans
=
trans
;
}
}
public
JobParam
(
List
<
StatementParam
>
ddl
,
List
<
StatementParam
>
trans
,
List
<
StatementParam
>
execute
)
{
public
JobParam
(
List
<
StatementParam
>
ddl
,
List
<
StatementParam
>
trans
,
List
<
StatementParam
>
execute
)
{
this
.
ddl
=
ddl
;
this
.
ddl
=
ddl
;
this
.
trans
=
trans
;
this
.
trans
=
trans
;
...
...
dlink-core/src/main/java/com/dlink/job/JobResult.java
View file @
04ef64e0
package
com
.
dlink
.
job
;
package
com
.
dlink
.
job
;
import
com.dlink.executor.ExecutorSetting
;
import
com.dlink.result.IResult
;
import
com.dlink.result.IResult
;
import
lombok.Getter
;
import
lombok.Getter
;
import
lombok.Setter
;
import
lombok.Setter
;
import
java.time.LocalDate
;
import
java.time.LocalDateTime
;
import
java.time.LocalDateTime
;
/**
/**
...
@@ -39,7 +37,7 @@ public class JobResult {
...
@@ -39,7 +37,7 @@ public class JobResult {
this
.
jobConfig
=
jobConfig
;
this
.
jobConfig
=
jobConfig
;
this
.
jobManagerAddress
=
jobManagerAddress
;
this
.
jobManagerAddress
=
jobManagerAddress
;
this
.
status
=
status
;
this
.
status
=
status
;
this
.
success
=
(
status
==(
Job
.
JobStatus
.
SUCCESS
))?
true
:
false
;
this
.
success
=
(
status
==
(
Job
.
JobStatus
.
SUCCESS
))
?
true
:
false
;
this
.
statement
=
statement
;
this
.
statement
=
statement
;
this
.
jobId
=
jobId
;
this
.
jobId
=
jobId
;
this
.
error
=
error
;
this
.
error
=
error
;
...
...
dlink-core/src/main/java/com/dlink/plus/FlinkSqlPlus.java
View file @
04ef64e0
...
@@ -7,7 +7,6 @@ import com.dlink.explainer.ca.TableCAResult;
...
@@ -7,7 +7,6 @@ import com.dlink.explainer.ca.TableCAResult;
import
com.dlink.result.SqlExplainResult
;
import
com.dlink.result.SqlExplainResult
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.table.api.ExplainDetail
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.List
;
...
@@ -28,7 +27,7 @@ public class FlinkSqlPlus {
...
@@ -28,7 +27,7 @@ public class FlinkSqlPlus {
this
.
explainer
=
new
Explainer
(
executor
);
this
.
explainer
=
new
Explainer
(
executor
);
}
}
public
static
FlinkSqlPlus
build
(){
public
static
FlinkSqlPlus
build
()
{
return
new
FlinkSqlPlus
(
Executor
.
build
());
return
new
FlinkSqlPlus
(
Executor
.
build
());
}
}
...
@@ -57,7 +56,7 @@ public class FlinkSqlPlus {
...
@@ -57,7 +56,7 @@ public class FlinkSqlPlus {
try
{
try
{
return
new
SqlResult
(
executor
.
executeSql
(
sql
));
return
new
SqlResult
(
executor
.
executeSql
(
sql
));
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
return
new
SqlResult
(
false
,
e
.
getMessage
());
return
new
SqlResult
(
false
,
e
.
getMessage
());
}
}
}
}
...
...
dlink-core/src/main/java/com/dlink/plus/SqlResult.java
View file @
04ef64e0
...
@@ -13,7 +13,7 @@ public class SqlResult {
...
@@ -13,7 +13,7 @@ public class SqlResult {
private
boolean
isSuccess
=
true
;
private
boolean
isSuccess
=
true
;
private
String
errorMsg
;
private
String
errorMsg
;
public
static
SqlResult
NULL
=
new
SqlResult
(
false
,
"未检测到有效的Sql"
);
public
static
SqlResult
NULL
=
new
SqlResult
(
false
,
"未检测到有效的Sql"
);
public
SqlResult
(
TableResult
tableResult
)
{
public
SqlResult
(
TableResult
tableResult
)
{
this
.
tableResult
=
tableResult
;
this
.
tableResult
=
tableResult
;
...
...
dlink-core/src/main/java/com/dlink/result/DDLResult.java
View file @
04ef64e0
...
@@ -18,7 +18,7 @@ import java.util.Set;
...
@@ -18,7 +18,7 @@ import java.util.Set;
@Getter
@Getter
public
class
DDLResult
extends
AbstractResult
implements
IResult
{
public
class
DDLResult
extends
AbstractResult
implements
IResult
{
private
List
<
Map
<
String
,
Object
>>
rowData
;
private
List
<
Map
<
String
,
Object
>>
rowData
;
private
Integer
total
;
private
Integer
total
;
private
Set
<
String
>
columns
;
private
Set
<
String
>
columns
;
...
...
dlink-core/src/main/java/com/dlink/result/InsertResult.java
View file @
04ef64e0
...
@@ -23,9 +23,10 @@ public class InsertResult extends AbstractResult implements IResult {
...
@@ -23,9 +23,10 @@ public class InsertResult extends AbstractResult implements IResult {
this
.
endTime
=
LocalDateTime
.
now
();
this
.
endTime
=
LocalDateTime
.
now
();
}
}
public
static
InsertResult
success
(
String
jobID
){
public
static
InsertResult
success
(
String
jobID
)
{
return
new
InsertResult
(
jobID
,
true
);
return
new
InsertResult
(
jobID
,
true
);
}
}
@Override
@Override
public
String
getJobId
()
{
public
String
getJobId
()
{
return
jobID
;
return
jobID
;
...
...
dlink-core/src/main/java/com/dlink/result/InsertResultBuilder.java
View file @
04ef64e0
package
com
.
dlink
.
result
;
package
com
.
dlink
.
result
;
import
com.dlink.constant.FlinkSQLConstant
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.api.TableResult
;
/**
/**
...
@@ -13,11 +12,11 @@ public class InsertResultBuilder implements ResultBuilder {
...
@@ -13,11 +12,11 @@ public class InsertResultBuilder implements ResultBuilder {
@Override
@Override
public
IResult
getResult
(
TableResult
tableResult
)
{
public
IResult
getResult
(
TableResult
tableResult
)
{
if
(
tableResult
.
getJobClient
().
isPresent
())
{
if
(
tableResult
.
getJobClient
().
isPresent
())
{
String
jobId
=
tableResult
.
getJobClient
().
get
().
getJobID
().
toHexString
();
String
jobId
=
tableResult
.
getJobClient
().
get
().
getJobID
().
toHexString
();
return
new
InsertResult
(
jobId
,
true
);
return
new
InsertResult
(
jobId
,
true
);
}
else
{
}
else
{
return
new
InsertResult
(
null
,
false
);
return
new
InsertResult
(
null
,
false
);
}
}
}
}
}
}
dlink-core/src/main/java/com/dlink/result/ResultBuilder.java
View file @
04ef64e0
...
@@ -11,10 +11,10 @@ import org.apache.flink.table.api.TableResult;
...
@@ -11,10 +11,10 @@ import org.apache.flink.table.api.TableResult;
**/
**/
public
interface
ResultBuilder
{
public
interface
ResultBuilder
{
static
ResultBuilder
build
(
SqlType
operationType
,
Integer
maxRowNum
,
boolean
isChangeLog
,
boolean
isAutoCancel
){
static
ResultBuilder
build
(
SqlType
operationType
,
Integer
maxRowNum
,
boolean
isChangeLog
,
boolean
isAutoCancel
)
{
switch
(
operationType
){
switch
(
operationType
)
{
case
SELECT:
case
SELECT:
return
new
SelectResultBuilder
(
maxRowNum
,
isChangeLog
,
isAutoCancel
);
return
new
SelectResultBuilder
(
maxRowNum
,
isChangeLog
,
isAutoCancel
);
case
SHOW:
case
SHOW:
case
DESC:
case
DESC:
case
DESCRIBE:
case
DESCRIBE:
...
...
dlink-core/src/main/java/com/dlink/result/ResultPool.java
View file @
04ef64e0
...
@@ -11,33 +11,33 @@ import java.util.Map;
...
@@ -11,33 +11,33 @@ import java.util.Map;
*/
*/
public
class
ResultPool
{
public
class
ResultPool
{
private
static
volatile
Map
<
String
,
SelectResult
>
results
=
new
HashMap
<
String
,
SelectResult
>();
private
static
volatile
Map
<
String
,
SelectResult
>
results
=
new
HashMap
<
String
,
SelectResult
>();
public
static
boolean
containsKey
(
String
key
){
public
static
boolean
containsKey
(
String
key
)
{
return
results
.
containsKey
(
key
);
return
results
.
containsKey
(
key
);
}
}
public
static
void
put
(
SelectResult
result
)
{
public
static
void
put
(
SelectResult
result
)
{
results
.
put
(
result
.
getJobId
(),
result
);
results
.
put
(
result
.
getJobId
(),
result
);
}
}
public
static
SelectResult
get
(
String
key
){
public
static
SelectResult
get
(
String
key
)
{
if
(
results
.
containsKey
(
key
))
{
if
(
results
.
containsKey
(
key
))
{
return
results
.
get
(
key
);
return
results
.
get
(
key
);
}
else
{
}
else
{
return
SelectResult
.
buildDestruction
(
key
);
return
SelectResult
.
buildDestruction
(
key
);
}
}
}
}
public
static
boolean
remove
(
String
key
){
public
static
boolean
remove
(
String
key
)
{
if
(
results
.
containsKey
(
key
))
{
if
(
results
.
containsKey
(
key
))
{
results
.
remove
(
key
);
results
.
remove
(
key
);
return
true
;
return
true
;
}
}
return
false
;
return
false
;
}
}
public
static
void
clear
(){
public
static
void
clear
()
{
results
.
clear
();
results
.
clear
();
}
}
...
...
dlink-core/src/main/java/com/dlink/result/ResultRunnable.java
View file @
04ef64e0
...
@@ -5,7 +5,6 @@ import com.dlink.utils.FlinkUtil;
...
@@ -5,7 +5,6 @@ import com.dlink.utils.FlinkUtil;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.StringUtils
;
import
java.util.*
;
import
java.util.*
;
...
...
dlink-core/src/main/java/com/dlink/result/RunResult.java
View file @
04ef64e0
...
@@ -28,7 +28,7 @@ public class RunResult {
...
@@ -28,7 +28,7 @@ public class RunResult {
public
RunResult
()
{
public
RunResult
()
{
}
}
public
RunResult
(
String
sessionId
,
String
statement
,
String
flinkHost
,
Integer
flinkPort
,
ExecutorSetting
setting
,
String
jobName
)
{
public
RunResult
(
String
sessionId
,
String
statement
,
String
flinkHost
,
Integer
flinkPort
,
ExecutorSetting
setting
,
String
jobName
)
{
this
.
sessionId
=
sessionId
;
this
.
sessionId
=
sessionId
;
this
.
statement
=
statement
;
this
.
statement
=
statement
;
this
.
flinkHost
=
flinkHost
;
this
.
flinkHost
=
flinkHost
;
...
...
dlink-core/src/main/java/com/dlink/result/SelectResult.java
View file @
04ef64e0
...
@@ -16,17 +16,17 @@ import java.util.Set;
...
@@ -16,17 +16,17 @@ import java.util.Set;
**/
**/
@Setter
@Setter
@Getter
@Getter
public
class
SelectResult
extends
AbstractResult
implements
IResult
{
public
class
SelectResult
extends
AbstractResult
implements
IResult
{
private
String
jobID
;
private
String
jobID
;
private
List
<
Map
<
String
,
Object
>>
rowData
;
private
List
<
Map
<
String
,
Object
>>
rowData
;
private
Integer
total
;
private
Integer
total
;
private
Integer
currentCount
;
private
Integer
currentCount
;
private
Set
<
String
>
columns
;
private
Set
<
String
>
columns
;
private
boolean
isDestroyed
;
private
boolean
isDestroyed
;
public
SelectResult
(
List
<
Map
<
String
,
Object
>>
rowData
,
Integer
total
,
Integer
currentCount
,
Set
<
String
>
columns
,
public
SelectResult
(
List
<
Map
<
String
,
Object
>>
rowData
,
Integer
total
,
Integer
currentCount
,
Set
<
String
>
columns
,
String
jobID
,
boolean
success
)
{
String
jobID
,
boolean
success
)
{
this
.
rowData
=
rowData
;
this
.
rowData
=
rowData
;
this
.
total
=
total
;
this
.
total
=
total
;
this
.
currentCount
=
currentCount
;
this
.
currentCount
=
currentCount
;
...
@@ -37,7 +37,7 @@ public class SelectResult extends AbstractResult implements IResult{
...
@@ -37,7 +37,7 @@ public class SelectResult extends AbstractResult implements IResult{
this
.
isDestroyed
=
false
;
this
.
isDestroyed
=
false
;
}
}
public
SelectResult
(
String
jobID
,
List
<
Map
<
String
,
Object
>>
rowData
,
Set
<
String
>
columns
)
{
public
SelectResult
(
String
jobID
,
List
<
Map
<
String
,
Object
>>
rowData
,
Set
<
String
>
columns
)
{
this
.
jobID
=
jobID
;
this
.
jobID
=
jobID
;
this
.
rowData
=
rowData
;
this
.
rowData
=
rowData
;
this
.
total
=
rowData
.
size
();
this
.
total
=
rowData
.
size
();
...
@@ -58,15 +58,15 @@ public class SelectResult extends AbstractResult implements IResult{
...
@@ -58,15 +58,15 @@ public class SelectResult extends AbstractResult implements IResult{
return
jobID
;
return
jobID
;
}
}
public
static
SelectResult
buildDestruction
(
String
jobID
){
public
static
SelectResult
buildDestruction
(
String
jobID
)
{
return
new
SelectResult
(
jobID
,
true
,
false
);
return
new
SelectResult
(
jobID
,
true
,
false
);
}
}
public
static
SelectResult
buildSuccess
(
String
jobID
){
public
static
SelectResult
buildSuccess
(
String
jobID
)
{
return
new
SelectResult
(
jobID
,
false
,
true
);
return
new
SelectResult
(
jobID
,
false
,
true
);
}
}
public
static
SelectResult
buildFailed
(){
public
static
SelectResult
buildFailed
()
{
return
new
SelectResult
(
null
,
false
,
false
);
return
new
SelectResult
(
null
,
false
,
false
);
}
}
}
}
dlink-core/src/main/java/com/dlink/result/SelectResultBuilder.java
View file @
04ef64e0
package
com
.
dlink
.
result
;
package
com
.
dlink
.
result
;
import
com.dlink.constant.FlinkSQLConstant
;
import
org.apache.flink.table.api.TableColumn
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.util.StringUtils
;
import
java.util.*
;
import
java.util.stream.Stream
;
/**
/**
* SelectBuilder
* SelectBuilder
...
@@ -31,11 +24,11 @@ public class SelectResultBuilder implements ResultBuilder {
...
@@ -31,11 +24,11 @@ public class SelectResultBuilder implements ResultBuilder {
public
IResult
getResult
(
TableResult
tableResult
)
{
public
IResult
getResult
(
TableResult
tableResult
)
{
if
(
tableResult
.
getJobClient
().
isPresent
())
{
if
(
tableResult
.
getJobClient
().
isPresent
())
{
String
jobId
=
tableResult
.
getJobClient
().
get
().
getJobID
().
toHexString
();
String
jobId
=
tableResult
.
getJobClient
().
get
().
getJobID
().
toHexString
();
ResultRunnable
runnable
=
new
ResultRunnable
(
tableResult
,
maxRowNum
,
isChangeLog
,
isAutoCancel
);
ResultRunnable
runnable
=
new
ResultRunnable
(
tableResult
,
maxRowNum
,
isChangeLog
,
isAutoCancel
);
Thread
thread
=
new
Thread
(
runnable
,
jobId
);
Thread
thread
=
new
Thread
(
runnable
,
jobId
);
thread
.
start
();
thread
.
start
();
return
SelectResult
.
buildSuccess
(
jobId
);
return
SelectResult
.
buildSuccess
(
jobId
);
}
else
{
}
else
{
return
SelectResult
.
buildFailed
();
return
SelectResult
.
buildFailed
();
}
}
}
}
...
...
dlink-core/src/main/java/com/dlink/result/SubmitResult.java
View file @
04ef64e0
...
@@ -25,7 +25,7 @@ public class SubmitResult {
...
@@ -25,7 +25,7 @@ public class SubmitResult {
public
SubmitResult
()
{
public
SubmitResult
()
{
}
}
public
static
SubmitResult
error
(
String
error
){
public
static
SubmitResult
error
(
String
error
)
{
return
new
SubmitResult
(
false
,
error
);
return
new
SubmitResult
(
false
,
error
);
}
}
...
@@ -34,7 +34,7 @@ public class SubmitResult {
...
@@ -34,7 +34,7 @@ public class SubmitResult {
this
.
error
=
error
;
this
.
error
=
error
;
}
}
public
SubmitResult
(
String
sessionId
,
List
<
String
>
statements
,
String
flinkHost
,
String
jobName
)
{
public
SubmitResult
(
String
sessionId
,
List
<
String
>
statements
,
String
flinkHost
,
String
jobName
)
{
this
.
sessionId
=
sessionId
;
this
.
sessionId
=
sessionId
;
this
.
statements
=
statements
;
this
.
statements
=
statements
;
this
.
flinkHost
=
flinkHost
;
this
.
flinkHost
=
flinkHost
;
...
...
dlink-core/src/main/java/com/dlink/session/SessionConfig.java
View file @
04ef64e0
...
@@ -19,7 +19,7 @@ public class SessionConfig {
...
@@ -19,7 +19,7 @@ public class SessionConfig {
private
String
clusterName
;
private
String
clusterName
;
private
String
address
;
private
String
address
;
public
enum
SessionType
{
public
enum
SessionType
{
PUBLIC
,
PUBLIC
,
PRIVATE
PRIVATE
}
}
...
@@ -32,11 +32,11 @@ public class SessionConfig {
...
@@ -32,11 +32,11 @@ public class SessionConfig {
this
.
address
=
address
;
this
.
address
=
address
;
}
}
public
static
SessionConfig
build
(
String
type
,
boolean
useRemote
,
Integer
clusterId
,
String
clusterName
,
String
address
){
public
static
SessionConfig
build
(
String
type
,
boolean
useRemote
,
Integer
clusterId
,
String
clusterName
,
String
address
)
{
return
new
SessionConfig
(
SessionType
.
valueOf
(
type
),
useRemote
,
clusterId
,
clusterName
,
address
);
return
new
SessionConfig
(
SessionType
.
valueOf
(
type
),
useRemote
,
clusterId
,
clusterName
,
address
);
}
}
public
ExecutorSetting
getExecutorSetting
(){
public
ExecutorSetting
getExecutorSetting
()
{
return
new
ExecutorSetting
(
true
);
return
new
ExecutorSetting
(
true
);
}
}
...
...
dlink-core/src/main/java/com/dlink/session/SessionInfo.java
View file @
04ef64e0
...
@@ -26,8 +26,8 @@ public class SessionInfo {
...
@@ -26,8 +26,8 @@ public class SessionInfo {
this
.
createTime
=
createTime
;
this
.
createTime
=
createTime
;
}
}
public
static
SessionInfo
build
(
ExecutorEntity
executorEntity
){
public
static
SessionInfo
build
(
ExecutorEntity
executorEntity
)
{
return
new
SessionInfo
(
executorEntity
.
getSessionId
(),
executorEntity
.
getSessionConfig
(),
executorEntity
.
getCreateUser
(),
executorEntity
.
getCreateTime
());
return
new
SessionInfo
(
executorEntity
.
getSessionId
(),
executorEntity
.
getSessionConfig
(),
executorEntity
.
getCreateUser
(),
executorEntity
.
getCreateTime
());
}
}
}
}
dlink-core/src/main/java/com/dlink/session/SessionPool.java
View file @
04ef64e0
...
@@ -25,10 +25,10 @@ public class SessionPool {
...
@@ -25,10 +25,10 @@ public class SessionPool {
return
false
;
return
false
;
}
}
public
static
Integer
push
(
ExecutorEntity
executorEntity
){
public
static
Integer
push
(
ExecutorEntity
executorEntity
)
{
if
(
executorList
.
size
()
>=
FlinkConstant
.
DEFAULT_SESSION_COUNT
*
FlinkConstant
.
DEFAULT_FACTOR
)
{
if
(
executorList
.
size
()
>=
FlinkConstant
.
DEFAULT_SESSION_COUNT
*
FlinkConstant
.
DEFAULT_FACTOR
)
{
executorList
.
remove
(
0
);
executorList
.
remove
(
0
);
}
else
if
(
executorList
.
size
()
>=
FlinkConstant
.
DEFAULT_SESSION_COUNT
)
{
}
else
if
(
executorList
.
size
()
>=
FlinkConstant
.
DEFAULT_SESSION_COUNT
)
{
executorList
.
clear
();
executorList
.
clear
();
}
}
executorList
.
add
(
executorEntity
);
executorList
.
add
(
executorEntity
);
...
@@ -55,17 +55,17 @@ public class SessionPool {
...
@@ -55,17 +55,17 @@ public class SessionPool {
return
null
;
return
null
;
}
}
public
static
List
<
ExecutorEntity
>
list
(){
public
static
List
<
ExecutorEntity
>
list
()
{
return
executorList
;
return
executorList
;
}
}
public
static
List
<
SessionInfo
>
filter
(
String
createUser
){
public
static
List
<
SessionInfo
>
filter
(
String
createUser
)
{
List
<
SessionInfo
>
sessionInfos
=
new
ArrayList
<>();
List
<
SessionInfo
>
sessionInfos
=
new
ArrayList
<>();
for
(
ExecutorEntity
item
:
executorList
)
{
for
(
ExecutorEntity
item
:
executorList
)
{
if
(
item
.
getSessionConfig
().
getType
()==
SessionConfig
.
SessionType
.
PUBLIC
)
{
if
(
item
.
getSessionConfig
().
getType
()
==
SessionConfig
.
SessionType
.
PUBLIC
)
{
sessionInfos
.
add
(
SessionInfo
.
build
(
item
));
sessionInfos
.
add
(
SessionInfo
.
build
(
item
));
}
else
{
}
else
{
if
(
createUser
!=
null
&&
createUser
.
equals
(
item
.
getCreateUser
()))
{
if
(
createUser
!=
null
&&
createUser
.
equals
(
item
.
getCreateUser
()))
{
sessionInfos
.
add
(
SessionInfo
.
build
(
item
));
sessionInfos
.
add
(
SessionInfo
.
build
(
item
));
}
}
}
}
...
@@ -73,11 +73,11 @@ public class SessionPool {
...
@@ -73,11 +73,11 @@ public class SessionPool {
return
sessionInfos
;
return
sessionInfos
;
}
}
public
static
SessionInfo
getInfo
(
String
sessionId
){
public
static
SessionInfo
getInfo
(
String
sessionId
)
{
ExecutorEntity
executorEntity
=
get
(
sessionId
);
ExecutorEntity
executorEntity
=
get
(
sessionId
);
if
(
executorEntity
!=
null
)
{
if
(
executorEntity
!=
null
)
{
return
SessionInfo
.
build
(
executorEntity
);
return
SessionInfo
.
build
(
executorEntity
);
}
else
{
}
else
{
return
null
;
return
null
;
}
}
}
}
...
...
dlink-core/src/main/java/com/dlink/utils/MapParseUtils.java
View file @
04ef64e0
package
com
.
dlink
.
utils
;
package
com
.
dlink
.
utils
;
import
java.util.ArrayList
;
import
java.util.*
;
import
java.util.Arrays
;
import
java.util.Deque
;
import
java.util.HashMap
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Stack
;
import
java.util.regex.Matcher
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
java.util.regex.Pattern
;
import
java.util.stream.Collectors
;
import
java.util.stream.Collectors
;
...
...
dlink-core/src/main/java/com/dlink/utils/UDFUtil.java
View file @
04ef64e0
...
@@ -13,13 +13,13 @@ import org.codehaus.groovy.control.CompilerConfiguration;
...
@@ -13,13 +13,13 @@ import org.codehaus.groovy.control.CompilerConfiguration;
*/
*/
public
class
UDFUtil
{
public
class
UDFUtil
{
public
static
void
buildClass
(
String
code
){
public
static
void
buildClass
(
String
code
)
{
CustomStringJavaCompiler
compiler
=
new
CustomStringJavaCompiler
(
code
);
CustomStringJavaCompiler
compiler
=
new
CustomStringJavaCompiler
(
code
);
boolean
res
=
compiler
.
compiler
();
boolean
res
=
compiler
.
compiler
();
if
(
res
)
{
if
(
res
)
{
String
className
=
compiler
.
getFullClassName
();
String
className
=
compiler
.
getFullClassName
();
byte
[]
compiledBytes
=
compiler
.
getJavaFileObjectMap
(
className
).
getCompiledBytes
();
byte
[]
compiledBytes
=
compiler
.
getJavaFileObjectMap
(
className
).
getCompiledBytes
();
ClassPool
.
push
(
new
ClassEntity
(
className
,
code
,
compiledBytes
));
ClassPool
.
push
(
new
ClassEntity
(
className
,
code
,
compiledBytes
));
System
.
out
.
println
(
"编译成功"
);
System
.
out
.
println
(
"编译成功"
);
System
.
out
.
println
(
"compilerTakeTime:"
+
compiler
.
getCompilerTakeTime
());
System
.
out
.
println
(
"compilerTakeTime:"
+
compiler
.
getCompilerTakeTime
());
initClassLoader
(
className
);
initClassLoader
(
className
);
...
@@ -29,14 +29,14 @@ public class UDFUtil {
...
@@ -29,14 +29,14 @@ public class UDFUtil {
}
}
}
}
public
static
void
initClassLoader
(
String
name
){
public
static
void
initClassLoader
(
String
name
)
{
ClassEntity
classEntity
=
ClassPool
.
get
(
name
);
ClassEntity
classEntity
=
ClassPool
.
get
(
name
);
ClassLoader
contextClassLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
ClassLoader
contextClassLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
CompilerConfiguration
config
=
new
CompilerConfiguration
();
CompilerConfiguration
config
=
new
CompilerConfiguration
();
config
.
setSourceEncoding
(
"UTF-8"
);
config
.
setSourceEncoding
(
"UTF-8"
);
GroovyClassLoader
groovyClassLoader
=
new
GroovyClassLoader
(
contextClassLoader
,
config
);
GroovyClassLoader
groovyClassLoader
=
new
GroovyClassLoader
(
contextClassLoader
,
config
);
groovyClassLoader
.
setShouldRecompile
(
true
);
groovyClassLoader
.
setShouldRecompile
(
true
);
groovyClassLoader
.
defineClass
(
classEntity
.
getName
(),
classEntity
.
getClassByte
());
groovyClassLoader
.
defineClass
(
classEntity
.
getName
(),
classEntity
.
getClassByte
());
Thread
.
currentThread
().
setContextClassLoader
(
groovyClassLoader
);
Thread
.
currentThread
().
setContextClassLoader
(
groovyClassLoader
);
// Class<?> clazz = groovyClassLoader.parseClass(codeSource,"com.dlink.ud.udf.SubstringFunction");
// Class<?> clazz = groovyClassLoader.parseClass(codeSource,"com.dlink.ud.udf.SubstringFunction");
}
}
...
...
dlink-core/src/test/java/com/dlink/core/FlinkRestAPITest.java
View file @
04ef64e0
...
@@ -19,109 +19,109 @@ public class FlinkRestAPITest {
...
@@ -19,109 +19,109 @@ public class FlinkRestAPITest {
private
String
address
=
"cdh5:8081"
;
private
String
address
=
"cdh5:8081"
;
@Test
@Test
public
void
savepointTest
(){
public
void
savepointTest
()
{
//JsonNode savepointInfo = FlinkAPI.build(address).getSavepointInfo("602ad9d03b872dba44267432d1a2a3b2","04044589477a973a32e7dd53e1eb20fd");
//JsonNode savepointInfo = FlinkAPI.build(address).getSavepointInfo("602ad9d03b872dba44267432d1a2a3b2","04044589477a973a32e7dd53e1eb20fd");
SavePointResult
savepoints
=
FlinkAPI
.
build
(
address
).
savepoints
(
"243b97597448edbd2e635fc3d25b1064"
,
"trigger"
);
SavePointResult
savepoints
=
FlinkAPI
.
build
(
address
).
savepoints
(
"243b97597448edbd2e635fc3d25b1064"
,
"trigger"
);
System
.
out
.
println
(
savepoints
.
toString
());
System
.
out
.
println
(
savepoints
.
toString
());
}
}
@Test
@Test
public
void
selectTest
(){
public
void
selectTest
()
{
List
<
JsonNode
>
jobs
=
FlinkAPI
.
build
(
address
).
listJobs
();
List
<
JsonNode
>
jobs
=
FlinkAPI
.
build
(
address
).
listJobs
();
System
.
out
.
println
(
jobs
.
toString
());
System
.
out
.
println
(
jobs
.
toString
());
}
}
@Test
@Test
public
void
stopTest
(){
public
void
stopTest
()
{
FlinkAPI
.
build
(
address
).
stop
(
"0727f796fcf9e07d89e724f7e15598cf"
);
FlinkAPI
.
build
(
address
).
stop
(
"0727f796fcf9e07d89e724f7e15598cf"
);
}
}
@Test
@Test
public
void
getCheckPointsDetailInfoTest
(){
public
void
getCheckPointsDetailInfoTest
()
{
JsonNode
checkPointsDetailInfo
=
FlinkAPI
.
build
(
address
).
getCheckPointsConfig
(
"178e954faaa4bf06cfbda971bb8b2957"
);
JsonNode
checkPointsDetailInfo
=
FlinkAPI
.
build
(
address
).
getCheckPointsConfig
(
"178e954faaa4bf06cfbda971bb8b2957"
);
System
.
out
.
println
(
checkPointsDetailInfo
.
toString
());
System
.
out
.
println
(
checkPointsDetailInfo
.
toString
());
}
}
@Test
@Test
public
void
getConfigurationsDetailsInfoTest
(){
public
void
getConfigurationsDetailsInfoTest
()
{
JsonNode
configurationsDetailsInfo
=
FlinkAPI
.
build
(
address
).
getJobsConfig
(
"178e954faaa4bf06cfbda971bb8b2957"
);
JsonNode
configurationsDetailsInfo
=
FlinkAPI
.
build
(
address
).
getJobsConfig
(
"178e954faaa4bf06cfbda971bb8b2957"
);
System
.
out
.
println
(
configurationsDetailsInfo
.
toString
());
System
.
out
.
println
(
configurationsDetailsInfo
.
toString
());
}
}
@Test
@Test
public
void
getExectionsInfoTest
(){
public
void
getExectionsInfoTest
()
{
JsonNode
exectionsDetailInfo
=
FlinkAPI
.
build
(
address
).
getException
(
"178e954faaa4bf06cfbda971bb8b2957"
);
JsonNode
exectionsDetailInfo
=
FlinkAPI
.
build
(
address
).
getException
(
"178e954faaa4bf06cfbda971bb8b2957"
);
System
.
out
.
println
(
exectionsDetailInfo
.
toString
());
System
.
out
.
println
(
exectionsDetailInfo
.
toString
());
}
}
@Test
@Test
public
void
getJobManagerMetricsTest
(){
public
void
getJobManagerMetricsTest
()
{
JsonNode
jobManagerMetrics
=
FlinkAPI
.
build
(
address
).
getJobManagerMetrics
();
JsonNode
jobManagerMetrics
=
FlinkAPI
.
build
(
address
).
getJobManagerMetrics
();
System
.
out
.
println
(
jobManagerMetrics
.
toString
());
System
.
out
.
println
(
jobManagerMetrics
.
toString
());
}
}
@Test
@Test
public
void
getJobManagerConfigTest
(){
public
void
getJobManagerConfigTest
()
{
JsonNode
jobManagerConfig
=
FlinkAPI
.
build
(
address
).
getJobManagerConfig
();
JsonNode
jobManagerConfig
=
FlinkAPI
.
build
(
address
).
getJobManagerConfig
();
System
.
out
.
println
(
jobManagerConfig
.
toString
());
System
.
out
.
println
(
jobManagerConfig
.
toString
());
}
}
@Test
@Test
public
void
getJobManagerLogTest
(){
public
void
getJobManagerLogTest
()
{
String
jobManagerLog
=
FlinkAPI
.
build
(
address
).
getJobManagerLog
();
String
jobManagerLog
=
FlinkAPI
.
build
(
address
).
getJobManagerLog
();
System
.
out
.
println
(
jobManagerLog
);
System
.
out
.
println
(
jobManagerLog
);
}
}
@Test
@Test
public
void
getJobManagerStdOutTest
(){
public
void
getJobManagerStdOutTest
()
{
String
jobManagerLogs
=
FlinkAPI
.
build
(
address
).
getJobManagerStdOut
();
String
jobManagerLogs
=
FlinkAPI
.
build
(
address
).
getJobManagerStdOut
();
System
.
out
.
println
(
jobManagerLogs
);
System
.
out
.
println
(
jobManagerLogs
);
}
}
@Test
@Test
public
void
getJobManagerLogListTest
(){
public
void
getJobManagerLogListTest
()
{
JsonNode
jobManagerLogList
=
FlinkAPI
.
build
(
address
).
getJobManagerLogList
();
JsonNode
jobManagerLogList
=
FlinkAPI
.
build
(
address
).
getJobManagerLogList
();
System
.
out
.
println
(
jobManagerLogList
.
toString
());
System
.
out
.
println
(
jobManagerLogList
.
toString
());
}
}
@Test
@Test
public
void
getTaskManagersTest
(){
public
void
getTaskManagersTest
()
{
JsonNode
taskManagers
=
FlinkAPI
.
build
(
address
).
getTaskManagers
();
JsonNode
taskManagers
=
FlinkAPI
.
build
(
address
).
getTaskManagers
();
System
.
out
.
println
(
taskManagers
.
toString
());
System
.
out
.
println
(
taskManagers
.
toString
());
}
}
@Test
@Test
public
void
getTaskManagerMetricsTest
(){
public
void
getTaskManagerMetricsTest
()
{
JsonNode
taskManagerMetrics
=
FlinkAPI
.
build
(
address
).
getTaskManagerMetrics
(
"container_e34_1646992539398_0004_01_000002"
);
JsonNode
taskManagerMetrics
=
FlinkAPI
.
build
(
address
).
getTaskManagerMetrics
(
"container_e34_1646992539398_0004_01_000002"
);
System
.
out
.
println
(
taskManagerMetrics
.
toString
());
System
.
out
.
println
(
taskManagerMetrics
.
toString
());
}
}
@Test
@Test
public
void
getTaskManagerLogTest
(){
public
void
getTaskManagerLogTest
()
{
String
taskManagerLog
=
FlinkAPI
.
build
(
address
).
getTaskManagerLog
(
"container_e34_1646992539398_0004_01_000002"
);
String
taskManagerLog
=
FlinkAPI
.
build
(
address
).
getTaskManagerLog
(
"container_e34_1646992539398_0004_01_000002"
);
System
.
out
.
println
(
taskManagerLog
);
System
.
out
.
println
(
taskManagerLog
);
}
}
@Test
@Test
public
void
getTaskManagerStdOutTest
(){
public
void
getTaskManagerStdOutTest
()
{
String
taskManagerStdOut
=
FlinkAPI
.
build
(
address
).
getTaskManagerStdOut
(
"container_e34_1646992539398_0004_01_000002"
);
String
taskManagerStdOut
=
FlinkAPI
.
build
(
address
).
getTaskManagerStdOut
(
"container_e34_1646992539398_0004_01_000002"
);
System
.
out
.
println
(
taskManagerStdOut
);
System
.
out
.
println
(
taskManagerStdOut
);
}
}
@Test
@Test
public
void
getTaskManagerLogListTest
(){
public
void
getTaskManagerLogListTest
()
{
JsonNode
taskManagerLogList
=
FlinkAPI
.
build
(
address
).
getTaskManagerLogList
(
"container_e34_1646992539398_0004_01_000002"
);
JsonNode
taskManagerLogList
=
FlinkAPI
.
build
(
address
).
getTaskManagerLogList
(
"container_e34_1646992539398_0004_01_000002"
);
System
.
out
.
println
(
taskManagerLogList
.
toString
());
System
.
out
.
println
(
taskManagerLogList
.
toString
());
}
}
@Test
@Test
public
void
getTaskManagerThreadDumpTest
(){
public
void
getTaskManagerThreadDumpTest
()
{
JsonNode
taskManagerThreadDump
=
FlinkAPI
.
build
(
address
).
getTaskManagerThreadDump
(
"container_e34_1646992539398_0004_01_000002"
);
JsonNode
taskManagerThreadDump
=
FlinkAPI
.
build
(
address
).
getTaskManagerThreadDump
(
"container_e34_1646992539398_0004_01_000002"
);
System
.
out
.
println
(
taskManagerThreadDump
.
toString
());
System
.
out
.
println
(
taskManagerThreadDump
.
toString
());
}
}
}
}
dlink-core/src/test/java/com/dlink/core/FlinkSqlPlusTest.java
View file @
04ef64e0
package
com
.
dlink
.
core
;
package
com
.
dlink
.
core
;
import
com.dlink.executor.Executor
;
import
com.dlink.executor.ExecutorSetting
;
import
com.dlink.explainer.ca.CABuilder
;
import
com.dlink.explainer.ca.TableCANode
;
import
com.dlink.explainer.ca.TableCAResult
;
import
com.dlink.job.JobManager
;
import
com.dlink.parser.SingleSqlParserFactory
;
import
com.dlink.plus.FlinkSqlPlus
;
import
com.dlink.plus.FlinkSqlPlus
;
import
com.dlink.result.SubmitResult
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.junit.Test
;
import
org.junit.Test
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
/**
/**
* FlinkSqlPlusTest
* FlinkSqlPlusTest
*
*
...
@@ -25,7 +13,7 @@ import java.util.Map;
...
@@ -25,7 +13,7 @@ import java.util.Map;
public
class
FlinkSqlPlusTest
{
public
class
FlinkSqlPlusTest
{
@Test
@Test
public
void
getJobPlanInfo
(){
public
void
getJobPlanInfo
()
{
String
sql
=
"jdbcconfig:='connector' = 'jdbc',\n"
+
String
sql
=
"jdbcconfig:='connector' = 'jdbc',\n"
+
" 'url' = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n"
+
" 'url' = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n"
+
" 'username'='dlink',\n"
+
" 'username'='dlink',\n"
+
...
...
dlink-core/src/test/java/com/dlink/core/JobManagerTest.java
View file @
04ef64e0
package
com
.
dlink
.
core
;
package
com
.
dlink
.
core
;
import
com.dlink.executor.Executor
;
import
com.dlink.executor.ExecutorSetting
;
import
com.dlink.job.JobConfig
;
import
com.dlink.job.JobConfig
;
import
com.dlink.job.JobManager
;
import
com.dlink.job.JobManager
;
import
com.dlink.job.JobResult
;
import
com.dlink.job.JobResult
;
import
com.dlink.result.ResultPool
;
import
com.dlink.result.ResultPool
;
import
com.dlink.result.RunResult
;
import
com.dlink.result.SelectResult
;
import
com.dlink.result.SelectResult
;
import
com.dlink.result.SubmitResult
;
import
org.junit.Test
;
import
org.junit.Test
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.HashMap
;
import
java.util.List
;
/**
/**
* JobManagerTest
* JobManagerTest
...
@@ -24,16 +18,16 @@ import java.util.List;
...
@@ -24,16 +18,16 @@ import java.util.List;
public
class
JobManagerTest
{
public
class
JobManagerTest
{
@Test
@Test
public
void
cancelJobSelect
(){
public
void
cancelJobSelect
()
{
JobConfig
config
=
new
JobConfig
(
"session-yarn"
,
true
,
true
,
true
,
true
,
"s1"
,
2
,
JobConfig
config
=
new
JobConfig
(
"session-yarn"
,
true
,
true
,
true
,
true
,
"s1"
,
2
,
null
,
null
,
null
,
"测试"
,
false
,
false
,
false
,
100
,
0
,
null
,
null
,
null
,
"测试"
,
false
,
false
,
false
,
100
,
0
,
1
,
0
,
null
,
new
HashMap
<>());
1
,
0
,
null
,
new
HashMap
<>());
if
(
config
.
isUseRemote
())
{
if
(
config
.
isUseRemote
())
{
config
.
setAddress
(
"192.168.123.157:8081"
);
config
.
setAddress
(
"192.168.123.157:8081"
);
}
}
JobManager
jobManager
=
JobManager
.
build
(
config
);
JobManager
jobManager
=
JobManager
.
build
(
config
);
String
sql1
=
"CREATE TABLE Orders (\n"
+
String
sql1
=
"CREATE TABLE Orders (\n"
+
" order_number BIGINT,\n"
+
" order_number BIGINT,\n"
+
" price DECIMAL(32,2),\n"
+
" price DECIMAL(32,2),\n"
+
" order_time TIMESTAMP(3)\n"
+
" order_time TIMESTAMP(3)\n"
+
...
@@ -42,7 +36,7 @@ public class JobManagerTest {
...
@@ -42,7 +36,7 @@ public class JobManagerTest {
" 'rows-per-second' = '1'\n"
+
" 'rows-per-second' = '1'\n"
+
");"
;
");"
;
String
sql3
=
"select order_number,price,order_time from Orders"
;
String
sql3
=
"select order_number,price,order_time from Orders"
;
String
sql
=
sql1
+
sql3
;
String
sql
=
sql1
+
sql3
;
JobResult
result
=
jobManager
.
executeSql
(
sql
);
JobResult
result
=
jobManager
.
executeSql
(
sql
);
SelectResult
selectResult
=
ResultPool
.
get
(
result
.
getJobId
());
SelectResult
selectResult
=
ResultPool
.
get
(
result
.
getJobId
());
System
.
out
.
println
(
result
.
isSuccess
());
System
.
out
.
println
(
result
.
isSuccess
());
...
...
dlink-core/src/test/java/com/dlink/core/SqlParserTest.java
View file @
04ef64e0
...
@@ -16,14 +16,14 @@ import java.util.Map;
...
@@ -16,14 +16,14 @@ import java.util.Map;
public
class
SqlParserTest
{
public
class
SqlParserTest
{
@Test
@Test
public
void
selectTest
(){
public
void
selectTest
()
{
String
sql
=
"insert into T SElecT id,xm as name frOm people wheRe id=1 And enabled = 1"
;
String
sql
=
"insert into T SElecT id,xm as name frOm people wheRe id=1 And enabled = 1"
;
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql
);
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql
);
System
.
out
.
println
(
lists
.
toString
());
System
.
out
.
println
(
lists
.
toString
());
}
}
@Test
@Test
public
void
createAggTableTest
(){
public
void
createAggTableTest
()
{
String
sql
=
"CREATE AGGTABLE agg1 AS \n"
+
String
sql
=
"CREATE AGGTABLE agg1 AS \n"
+
"SELECT sid,data\n"
+
"SELECT sid,data\n"
+
"FROM score\n"
+
"FROM score\n"
+
...
@@ -37,32 +37,32 @@ public class SqlParserTest {
...
@@ -37,32 +37,32 @@ public class SqlParserTest {
"GROUP BY cls\r\n"
+
"GROUP BY cls\r\n"
+
"AGG BY TOP2(score) as (score,rank)"
;
"AGG BY TOP2(score) as (score,rank)"
;
//sql=sql.replace("\n"," ");
//sql=sql.replace("\n"," ");
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql2
);
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql2
);
System
.
out
.
println
(
lists
.
toString
());
System
.
out
.
println
(
lists
.
toString
());
System
.
out
.
println
(
StringUtils
.
join
(
lists
.
get
(
"SELECT"
),
","
));
System
.
out
.
println
(
StringUtils
.
join
(
lists
.
get
(
"SELECT"
),
","
));
}
}
@Test
@Test
public
void
setTest
(){
public
void
setTest
()
{
String
sql
=
"set table.exec.resource.default-parallelism = 2"
;
String
sql
=
"set table.exec.resource.default-parallelism = 2"
;
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql
);
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql
);
System
.
out
.
println
(
lists
.
toString
());
System
.
out
.
println
(
lists
.
toString
());
}
}
@Test
@Test
public
void
regTest
(){
public
void
regTest
()
{
String
sql
=
"--并行度\n"
+
String
sql
=
"--并行度\n"
+
"CREATE TABLE student (\n"
+
"CREATE TABLE student (\n"
+
" sid INT,\n"
+
" sid INT,\n"
+
" name STRING,\n"
+
" name STRING,\n"
+
" PRIMARY KEY (sid) NOT ENFORCED\n"
+
" PRIMARY KEY (sid) NOT ENFORCED\n"
+
") WITH ${tb}"
;
") WITH ${tb}"
;
sql
=
sql
.
replaceAll
(
"--([^'\r\n]{0,}('[^'\r\n]{0,}'){0,1}[^'\r\n]{0,}){0,}"
,
""
).
trim
();
sql
=
sql
.
replaceAll
(
"--([^'\r\n]{0,}('[^'\r\n]{0,}'){0,1}[^'\r\n]{0,}){0,}"
,
""
).
trim
();
System
.
out
.
println
(
sql
);
System
.
out
.
println
(
sql
);
}
}
@Test
@Test
public
void
createCDCSourceTest
(){
public
void
createCDCSourceTest
()
{
String
sql
=
"EXECUTE CDCSOURCE demo WITH (\n"
+
String
sql
=
"EXECUTE CDCSOURCE demo WITH (\n"
+
" 'hostname'='127.0.0.1',\n"
+
" 'hostname'='127.0.0.1',\n"
+
" 'port'='3306',\n"
+
" 'port'='3306',\n"
+
...
@@ -75,15 +75,15 @@ public class SqlParserTest {
...
@@ -75,15 +75,15 @@ public class SqlParserTest {
" 'topic'='dlinkcdc',\n"
+
" 'topic'='dlinkcdc',\n"
+
" 'brokers'='127.0.0.1:9092'\n"
+
" 'brokers'='127.0.0.1:9092'\n"
+
");"
;
");"
;
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql
);
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql
);
System
.
out
.
println
(
lists
.
toString
());
System
.
out
.
println
(
lists
.
toString
());
}
}
@Test
@Test
public
void
showFragmentTest
(){
public
void
showFragmentTest
()
{
String
sql
=
"show fragment test"
;
String
sql
=
"show fragment test"
;
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql
);
Map
<
String
,
List
<
String
>>
lists
=
SingleSqlParserFactory
.
generateParser
(
sql
);
System
.
out
.
println
(
lists
.
toString
());
System
.
out
.
println
(
lists
.
toString
());
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment