Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
9049f2d0
Unverified
Commit
9049f2d0
authored
Feb 09, 2022
by
aiwenmo
Committed by
GitHub
Feb 09, 2022
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #121 from DataLinkDC/dev-batch
batch 引擎
parents
caaa087c
3d7992d1
Changes
36
Show whitespace changes
Inline
Side-by-side
Showing
36 changed files
with
889 additions
and
289 deletions
+889
-289
pom.xml
dlink-admin/pom.xml
+5
-1
StudioExecuteDTO.java
...k-admin/src/main/java/com/dlink/dto/StudioExecuteDTO.java
+2
-1
Task.java
dlink-admin/src/main/java/com/dlink/model/Task.java
+13
-10
TaskMapper.xml
dlink-admin/src/main/resources/mapper/TaskMapper.xml
+0
-26
pom.xml
dlink-app/pom.xml
+4
-0
Submiter.java
dlink-app/src/main/java/com/dlink/app/flinksql/Submiter.java
+1
-1
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+203
-15
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+38
-30
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+27
-17
pom.xml
dlink-client/dlink-client-1.13/pom.xml
+11
-0
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+57
-48
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+53
-34
pom.xml
dlink-client/dlink-client-base/pom.xml
+102
-1
CustomTableEnvironment.java
.../main/java/com/dlink/executor/CustomTableEnvironment.java
+56
-0
Explainer.java
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
+1
-1
JobConfig.java
dlink-core/src/main/java/com/dlink/job/JobConfig.java
+6
-3
JobManager.java
dlink-core/src/main/java/com/dlink/job/JobManager.java
+2
-2
BatchTest.java
dlink-core/src/test/java/com/dlink/core/BatchTest.java
+72
-0
JobManagerTest.java
dlink-core/src/test/java/com/dlink/core/JobManagerTest.java
+1
-1
dlink.sql
dlink-doc/sql/dlink.sql
+1
-0
dlink_history.sql
dlink-doc/sql/dlink_history.sql
+6
-0
AppBatchExecutor.java
...or/src/main/java/com/dlink/executor/AppBatchExecutor.java
+23
-0
AppStreamExecutor.java
...r/src/main/java/com/dlink/executor/AppStreamExecutor.java
+6
-1
EnvironmentSetting.java
.../src/main/java/com/dlink/executor/EnvironmentSetting.java
+1
-0
Executor.java
...k-executor/src/main/java/com/dlink/executor/Executor.java
+112
-85
ExecutorSetting.java
...tor/src/main/java/com/dlink/executor/ExecutorSetting.java
+7
-3
LocalBatchExecutor.java
.../src/main/java/com/dlink/executor/LocalBatchExecutor.java
+23
-0
LocalStreamExecutor.java
...src/main/java/com/dlink/executor/LocalStreamExecutor.java
+4
-0
RemoteBatchExecutor.java
...src/main/java/com/dlink/executor/RemoteBatchExecutor.java
+24
-0
RemoteStreamExecutor.java
...rc/main/java/com/dlink/executor/RemoteStreamExecutor.java
+5
-1
CreateAggTableOperation.java
...ain/java/com/dlink/trans/ddl/CreateAggTableOperation.java
+2
-2
CreateCDCSourceOperation.java
...in/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
+1
-1
SetOperation.java
...cutor/src/main/java/com/dlink/trans/ddl/SetOperation.java
+3
-3
index.tsx
...components/Studio/StudioRightTool/StudioSetting/index.tsx
+12
-1
model.ts
dlink-web/src/pages/FlinkSqlStudio/model.ts
+2
-1
Welcome.tsx
dlink-web/src/pages/Welcome.tsx
+3
-0
No files found.
dlink-admin/pom.xml
View file @
9049f2d0
...
...
@@ -109,6 +109,10 @@
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-common
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-metadata-base
</artifactId>
...
...
@@ -124,7 +128,7 @@
<!--<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-mysql</artifactId>
<version>0.
5.0
</version>
<version>0.
6.0-SNAPSHOT
</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
...
...
dlink-admin/src/main/java/com/dlink/dto/StudioExecuteDTO.java
View file @
9049f2d0
...
...
@@ -27,6 +27,7 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
private
boolean
useChangeLog
;
private
boolean
useAutoCancel
;
private
boolean
statementSet
;
private
boolean
batchModel
;
private
boolean
useSession
;
private
String
session
;
private
Integer
clusterId
;
...
...
@@ -60,7 +61,7 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
}
return
new
JobConfig
(
type
,
useResult
,
useChangeLog
,
useAutoCancel
,
useSession
,
session
,
clusterId
,
clusterConfigurationId
,
jarId
,
taskId
,
jobName
,
fragment
,
statementSet
,
clusterConfigurationId
,
jarId
,
taskId
,
jobName
,
fragment
,
statementSet
,
batchModel
,
maxRowNum
,
checkPoint
,
parallelism
,
savePointStrategy
,
savePointPath
,
config
);
}
}
dlink-admin/src/main/java/com/dlink/model/Task.java
View file @
9049f2d0
...
...
@@ -25,7 +25,7 @@ import java.util.Map;
@Data
@EqualsAndHashCode
(
callSuper
=
false
)
@TableName
(
"dlink_task"
)
public
class
Task
extends
SuperEntity
{
public
class
Task
extends
SuperEntity
{
private
static
final
long
serialVersionUID
=
5988972129893667154L
;
...
...
@@ -48,6 +48,8 @@ public class Task extends SuperEntity{
private
boolean
statementSet
;
private
boolean
batchModel
;
private
Integer
clusterId
;
private
Integer
clusterConfigurationId
;
...
...
@@ -74,13 +76,13 @@ public class Task extends SuperEntity{
private
List
<
Savepoints
>
savepoints
;
@TableField
(
exist
=
false
)
private
List
<
Map
<
String
,
String
>>
config
=
new
ArrayList
<>();
private
List
<
Map
<
String
,
String
>>
config
=
new
ArrayList
<>();
public
List
<
Map
<
String
,
String
>>
parseConfig
()
{
public
List
<
Map
<
String
,
String
>>
parseConfig
()
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
try
{
if
(
Asserts
.
isNotNullString
(
configJson
))
{
if
(
Asserts
.
isNotNullString
(
configJson
))
{
config
=
objectMapper
.
readValue
(
configJson
,
ArrayList
.
class
);
}
}
catch
(
JsonProcessingException
e
)
{
...
...
@@ -89,16 +91,17 @@ public class Task extends SuperEntity{
return
config
;
}
public
JobConfig
buildSubmitConfig
(){
public
JobConfig
buildSubmitConfig
()
{
boolean
useRemote
=
true
;
if
(
clusterId
==
null
||
clusterId
==
0
)
{
if
(
clusterId
==
null
||
clusterId
==
0
)
{
useRemote
=
false
;
}
Map
<
String
,
String
>
map
=
new
HashMap
<>();
for
(
Map
<
String
,
String
>
item
:
config
)
{
map
.
put
(
item
.
get
(
"key"
),
item
.
get
(
"value"
));
Map
<
String
,
String
>
map
=
new
HashMap
<>();
for
(
Map
<
String
,
String
>
item
:
config
)
{
map
.
put
(
item
.
get
(
"key"
),
item
.
get
(
"value"
));
}
return
new
JobConfig
(
type
,
false
,
false
,
useRemote
,
clusterId
,
clusterConfigurationId
,
jarId
,
getId
(),
alias
,
fragment
,
statementSet
,
checkPoint
,
parallelism
,
savePointStrategy
,
savePointPath
,
map
);
return
new
JobConfig
(
type
,
false
,
false
,
useRemote
,
clusterId
,
clusterConfigurationId
,
jarId
,
getId
(),
alias
,
fragment
,
statementSet
,
batchModel
,
checkPoint
,
parallelism
,
savePointStrategy
,
savePointPath
,
map
);
}
}
dlink-admin/src/main/resources/mapper/TaskMapper.xml
View file @
9049f2d0
...
...
@@ -2,32 +2,6 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper
namespace=
"com.dlink.mapper.TaskMapper"
>
<!-- 通用查询映射结果 -->
<resultMap
id=
"BaseResultMap"
type=
"com.dlink.model.Task"
>
<id
column=
"id"
property=
"id"
/>
<result
column=
"name"
property=
"name"
/>
<result
column=
"alias"
property=
"alias"
/>
<result
column=
"dialect"
property=
"dialect"
/>
<result
column=
"type"
property=
"type"
/>
<result
column=
"check_point"
property=
"checkPoint"
/>
<result
column=
"save_point_strategy"
property=
"savePointStrategy"
/>
<result
column=
"save_point_path"
property=
"savePointPath"
/>
<result
column=
"parallelism"
property=
"parallelism"
/>
<result
column=
"fragment"
property=
"fragment"
/>
<result
column=
"statement_set"
property=
"statementSet"
/>
<result
column=
"cluster_id"
property=
"clusterId"
/>
<result
column=
"cluster_configuration_id"
property=
"clusterConfigurationId"
/>
<result
column=
"database_id"
property=
"databaseId"
/>
<result
column=
"jar_id"
property=
"jarId"
/>
<result
column=
"env_id"
property=
"envId"
/>
<result
column=
"config_json"
property=
"configJson"
/>
<result
column=
"note"
property=
"note"
/>
<result
column=
"step"
property=
"step"
/>
<result
column=
"enabled"
property=
"enabled"
/>
<result
column=
"create_time"
property=
"createTime"
/>
<result
column=
"update_time"
property=
"updateTime"
/>
</resultMap>
<select
id=
"selectForProTable"
resultType=
"com.dlink.model.Task"
>
select
a.*
...
...
dlink-app/pom.xml
View file @
9049f2d0
...
...
@@ -28,6 +28,10 @@
<artifactId>
dlink-client-1.13
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-executor
</artifactId>
...
...
dlink-app/src/main/java/com/dlink/app/flinksql/Submiter.java
View file @
9049f2d0
...
...
@@ -145,7 +145,7 @@ public class Submiter {
}
logger
.
info
(
"正在执行 FlinkSQL 语句集: "
+
String
.
join
(
FlinkSQLConstant
.
SEPARATOR
,
executes
));
try
{
executor
.
getEnvironment
().
execute
(
executorSetting
.
getJobName
());
executor
.
execute
(
executorSetting
.
getJobName
());
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
...
...
dlink-app/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
View file @
9049f2d0
package
com
.
dlink
.
executor
;
import
com.dlink.result.SqlExplainResult
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.RuntimeExecutionMode
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.ExecutionOptions
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.JSONGenerator
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.internal.TableEnvironmentImpl
;
...
...
@@ -19,8 +34,19 @@ import org.apache.flink.table.functions.TableAggregateFunction;
import
org.apache.flink.table.functions.TableFunction
;
import
org.apache.flink.table.functions.UserDefinedFunctionHelper
;
import
org.apache.flink.table.module.ModuleManager
;
import
org.apache.flink.table.operations.ExplainOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.QueryOperation
;
import
org.apache.flink.table.operations.command.ResetOperation
;
import
org.apache.flink.table.operations.command.SetOperation
;
import
org.apache.flink.table.planner.delegation.ExecutorBase
;
import
org.apache.flink.table.planner.utils.ExecutorUtils
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
...
...
@@ -29,7 +55,7 @@ import java.util.Map;
* @author wenmo
* @since 2021/6/7 22:06
**/
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
{
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
implements
CustomTableEnvironment
{
protected
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
TableConfig
tableConfig
,
Executor
executor
,
FunctionCatalog
functionCatalog
,
Planner
planner
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
);
...
...
@@ -39,14 +65,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
());
}
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
public
static
CustomTableEnvironmentImpl
createBatch
(
StreamExecutionEnvironment
executionEnvironment
)
{
Configuration
configuration
=
new
Configuration
();
configuration
.
set
(
ExecutionOptions
.
RUNTIME_MODE
,
RuntimeExecutionMode
.
BATCH
);
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
useBlinkPlanner
().
inBatchMode
().
build
(),
tableConfig
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
return
create
(
executionEnvironment
,
settings
,
new
TableConfig
());
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
,
TableConfig
tableConfig
)
{
if
(!
settings
.
isStreamingMode
())
{
throw
new
TableException
(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."
);
}
else
{
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
ModuleManager
moduleManager
=
new
ModuleManager
();
CatalogManager
catalogManager
=
CatalogManager
.
newBuilder
().
classLoader
(
classLoader
).
config
(
tableConfig
.
getConfiguration
()).
defaultCatalog
(
settings
.
getBuiltInCatalogName
(),
new
GenericInMemoryCatalog
(
settings
.
getBuiltInCatalogName
(),
settings
.
getBuiltInDatabaseName
())).
executionConfig
(
executionEnvironment
.
getConfig
()).
build
();
...
...
@@ -56,7 +87,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
Map
<
String
,
String
>
plannerProperties
=
settings
.
toPlannerProperties
();
Planner
planner
=
(
ComponentFactoryService
.
find
(
PlannerFactory
.
class
,
plannerProperties
)).
create
(
plannerProperties
,
executor
,
tableConfig
,
functionCatalog
,
catalogManager
);
return
new
CustomTableEnvironmentImpl
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
settings
.
isStreamingMode
(),
classLoader
);
}
}
private
static
Executor
lookupExecutor
(
Map
<
String
,
String
>
executorProperties
,
StreamExecutionEnvironment
executionEnvironment
)
{
...
...
@@ -68,4 +99,161 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
throw
new
TableException
(
"Could not instantiate the executor. Make sure a planner module is on the classpath"
,
var4
);
}
}
public
ObjectNode
getStreamGraph
(
String
statement
)
{
List
<
Operation
>
operations
=
super
.
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
else
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
operations
.
size
();
i
++)
{
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
return
objectNode
;
}
}
else
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() need a single SQL to query."
);
}
}
}
public
JobPlanInfo
getJobPlanInfo
(
List
<
String
>
statements
)
{
return
new
JobPlanInfo
(
JsonPlanGenerator
.
generatePlan
(
getJobGraphFromInserts
(
statements
)));
}
public
StreamGraph
getStreamGraphFromInserts
(
List
<
String
>
statements
)
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
for
(
String
statement
:
statements
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Only single statement is supported."
);
}
else
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
else
{
throw
new
TableException
(
"Only insert statement is supported now."
);
}
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
streamGraph
.
setJobName
(
tableConfig
.
getConfiguration
().
getString
(
PipelineOptions
.
NAME
));
}
return
streamGraph
;
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
public
JobGraph
getJobGraphFromInserts
(
List
<
String
>
statements
)
{
return
getStreamGraphFromInserts
(
statements
).
getJobGraph
();
}
public
SqlExplainResult
explainSqlRecord
(
String
statement
,
ExplainDetail
...
extraDetails
)
{
SqlExplainResult
record
=
new
SqlExplainResult
();
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
record
.
setParseTrue
(
true
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
List
<
Operation
>
operationlist
=
new
ArrayList
<>(
operations
);
for
(
int
i
=
0
;
i
<
operationlist
.
size
();
i
++)
{
Operation
operation
=
operationlist
.
get
(
i
);
if
(
operation
instanceof
ModifyOperation
)
{
record
.
setType
(
"Modify DML"
);
}
else
if
(
operation
instanceof
ExplainOperation
)
{
record
.
setType
(
"Explain DML"
);
}
else
if
(
operation
instanceof
QueryOperation
)
{
record
.
setType
(
"Query DML"
);
}
else
{
record
.
setExplain
(
operation
.
asSummaryString
());
operationlist
.
remove
(
i
);
record
.
setType
(
"DDL"
);
i
=
i
-
1
;
}
}
record
.
setExplainTrue
(
true
);
if
(
operationlist
.
size
()
==
0
)
{
//record.setExplain("DDL语句不进行解释。");
return
record
;
}
record
.
setExplain
(
planner
.
explain
(
operationlist
,
extraDetails
));
return
record
;
}
public
<
T
>
void
registerFunction
(
String
name
,
TableFunction
<
T
>
tableFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfTableFunction
(
tableFunction
);
this
.
functionCatalog
.
registerTempSystemTableFunction
(
name
,
tableFunction
,
typeInfo
);
}
public
<
T
,
ACC
>
void
registerFunction
(
String
name
,
AggregateFunction
<
T
,
ACC
>
aggregateFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfAggregateFunction
(
aggregateFunction
);
TypeInformation
<
ACC
>
accTypeInfo
=
UserDefinedFunctionHelper
.
getAccumulatorTypeOfAggregateFunction
(
aggregateFunction
);
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
aggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
<
T
,
ACC
>
void
registerFunction
(
String
name
,
TableAggregateFunction
<
T
,
ACC
>
tableAggregateFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfAggregateFunction
(
tableAggregateFunction
);
TypeInformation
<
ACC
>
accTypeInfo
=
UserDefinedFunctionHelper
.
getAccumulatorTypeOfAggregateFunction
(
tableAggregateFunction
);
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
tableAggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
for
(
Operation
operation
:
operations
)
{
if
(
operation
instanceof
SetOperation
)
{
callSet
((
SetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
else
if
(
operation
instanceof
ResetOperation
)
{
callReset
((
ResetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
}
return
false
;
}
private
void
callSet
(
SetOperation
setOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
setOperation
.
getKey
().
isPresent
()
&&
setOperation
.
getValue
().
isPresent
())
{
String
key
=
setOperation
.
getKey
().
get
().
trim
();
String
value
=
setOperation
.
getValue
().
get
().
trim
();
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
value
);
setMap
.
put
(
key
,
value
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
}
private
void
callReset
(
ResetOperation
resetOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
resetOperation
.
getKey
().
isPresent
())
{
String
key
=
resetOperation
.
getKey
().
get
().
trim
();
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
null
);
setMap
.
remove
(
key
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
else
{
setMap
.
clear
();
}
}
}
dlink-client/dlink-client-1.11/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
View file @
9049f2d0
...
...
@@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
;
...
...
@@ -13,7 +14,10 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.JSONGenerator
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.*
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.internal.TableEnvironmentImpl
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.FunctionCatalog
;
...
...
@@ -34,7 +38,6 @@ import org.apache.flink.table.operations.Operation;
import
org.apache.flink.table.operations.QueryOperation
;
import
org.apache.flink.table.planner.delegation.ExecutorBase
;
import
org.apache.flink.table.planner.utils.ExecutorUtils
;
import
org.apache.flink.types.Row
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
...
...
@@ -47,7 +50,7 @@ import java.util.Map;
* @author wenmo
* @since 2021/6/7 22:06
**/
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
{
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
implements
CustomTableEnvironment
{
protected
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
TableConfig
tableConfig
,
Executor
executor
,
FunctionCatalog
functionCatalog
,
Planner
planner
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
);
...
...
@@ -57,14 +60,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
());
}
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
public
static
CustomTableEnvironmentImpl
createBatch
(
StreamExecutionEnvironment
executionEnvironment
)
{
Configuration
configuration
=
new
Configuration
();
configuration
.
setString
(
"execution.runtime-mode"
,
"BATCH"
);
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
useBlinkPlanner
().
inBatchMode
().
build
(),
tableConfig
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
return
create
(
executionEnvironment
,
settings
,
new
TableConfig
());
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
,
TableConfig
tableConfig
)
{
if
(!
settings
.
isStreamingMode
())
{
throw
new
TableException
(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."
);
}
else
{
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
ModuleManager
moduleManager
=
new
ModuleManager
();
CatalogManager
catalogManager
=
CatalogManager
.
newBuilder
().
classLoader
(
classLoader
).
config
(
tableConfig
.
getConfiguration
()).
defaultCatalog
(
settings
.
getBuiltInCatalogName
(),
new
GenericInMemoryCatalog
(
settings
.
getBuiltInCatalogName
(),
settings
.
getBuiltInDatabaseName
())).
executionConfig
(
executionEnvironment
.
getConfig
()).
build
();
...
...
@@ -72,9 +80,9 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
Map
<
String
,
String
>
executorProperties
=
settings
.
toExecutorProperties
();
Executor
executor
=
lookupExecutor
(
executorProperties
,
executionEnvironment
);
Map
<
String
,
String
>
plannerProperties
=
settings
.
toPlannerProperties
();
Planner
planner
=
(
ComponentFactoryService
.
find
(
PlannerFactory
.
class
,
plannerProperties
)).
create
(
plannerProperties
,
executor
,
tableConfig
,
functionCatalog
,
catalogManager
);
Planner
planner
=
(
ComponentFactoryService
.
find
(
PlannerFactory
.
class
,
plannerProperties
)).
create
(
plannerProperties
,
executor
,
tableConfig
,
functionCatalog
,
catalogManager
);
return
new
CustomTableEnvironmentImpl
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
settings
.
isStreamingMode
(),
classLoader
);
}
}
private
static
Executor
lookupExecutor
(
Map
<
String
,
String
>
executorProperties
,
StreamExecutionEnvironment
executionEnvironment
)
{
...
...
@@ -94,25 +102,25 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
else
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
operations
.
size
();
i
++)
{
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
}
}
List
<
Transformation
<?>>
trans
=
super
.
planner
.
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
}
finally
{
return
objectNode
;
}
}
else
{
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
...
...
@@ -124,27 +132,27 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public
StreamGraph
getStreamGraphFromInserts
(
List
<
String
>
statements
)
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
for
(
String
statement
:
statements
)
{
for
(
String
statement
:
statements
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Only single statement is supported."
);
}
else
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
else
{
throw
new
TableException
(
"Only insert statement is supported now."
);
}
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
streamGraph
.
setJobName
(
tableConfig
.
getConfiguration
().
getString
(
PipelineOptions
.
NAME
));
}
return
streamGraph
;
}
else
{
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
...
...
@@ -174,11 +182,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
record
.
setExplain
(
operation
.
asSummaryString
());
operationlist
.
remove
(
i
);
record
.
setType
(
"DDL"
);
i
=
i
-
1
;
i
=
i
-
1
;
}
}
record
.
setExplainTrue
(
true
);
if
(
operationlist
.
size
()==
0
)
{
if
(
operationlist
.
size
()
==
0
)
{
return
record
;
}
record
.
setExplain
(
planner
.
explain
(
operationlist
,
extraDetails
));
...
...
dlink-client/dlink-client-1.12/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
View file @
9049f2d0
...
...
@@ -4,8 +4,11 @@ import com.dlink.result.SqlExplainResult;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.RuntimeExecutionMode
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.ExecutionOptions
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
;
...
...
@@ -13,7 +16,10 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.JSONGenerator
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.*
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.internal.TableEnvironmentImpl
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.FunctionCatalog
;
...
...
@@ -34,7 +40,6 @@ import org.apache.flink.table.operations.Operation;
import
org.apache.flink.table.operations.QueryOperation
;
import
org.apache.flink.table.planner.delegation.ExecutorBase
;
import
org.apache.flink.table.planner.utils.ExecutorUtils
;
import
org.apache.flink.types.Row
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
...
...
@@ -47,7 +52,7 @@ import java.util.Map;
* @author wenmo
* @since 2021/6/7 22:06
**/
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
{
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
implements
CustomTableEnvironment
{
protected
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
TableConfig
tableConfig
,
Executor
executor
,
FunctionCatalog
functionCatalog
,
Planner
planner
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
);
...
...
@@ -57,14 +62,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
());
}
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
public
static
CustomTableEnvironmentImpl
createBatch
(
StreamExecutionEnvironment
executionEnvironment
)
{
Configuration
configuration
=
new
Configuration
();
configuration
.
set
(
ExecutionOptions
.
RUNTIME_MODE
,
RuntimeExecutionMode
.
BATCH
);
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
useBlinkPlanner
().
inBatchMode
().
build
(),
tableConfig
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
return
create
(
executionEnvironment
,
settings
,
new
TableConfig
());
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
,
TableConfig
tableConfig
)
{
if
(!
settings
.
isStreamingMode
())
{
throw
new
TableException
(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."
);
}
else
{
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
ModuleManager
moduleManager
=
new
ModuleManager
();
CatalogManager
catalogManager
=
CatalogManager
.
newBuilder
().
classLoader
(
classLoader
).
config
(
tableConfig
.
getConfiguration
()).
defaultCatalog
(
settings
.
getBuiltInCatalogName
(),
new
GenericInMemoryCatalog
(
settings
.
getBuiltInCatalogName
(),
settings
.
getBuiltInDatabaseName
())).
executionConfig
(
executionEnvironment
.
getConfig
()).
build
();
...
...
@@ -74,7 +84,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
Map
<
String
,
String
>
plannerProperties
=
settings
.
toPlannerProperties
();
Planner
planner
=
(
ComponentFactoryService
.
find
(
PlannerFactory
.
class
,
plannerProperties
)).
create
(
plannerProperties
,
executor
,
tableConfig
,
functionCatalog
,
catalogManager
);
return
new
CustomTableEnvironmentImpl
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
settings
.
isStreamingMode
(),
classLoader
);
}
}
private
static
Executor
lookupExecutor
(
Map
<
String
,
String
>
executorProperties
,
StreamExecutionEnvironment
executionEnvironment
)
{
...
...
dlink-client/dlink-client-1.13/pom.xml
View file @
9049f2d0
...
...
@@ -28,6 +28,17 @@
<artifactId>
dlink-client-base
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-planner_${scala.binary.version}
</artifactId>
<exclusions>
<exclusion>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</exclusion>
</exclusions>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-planner-blink_${scala.binary.version}
</artifactId>
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
View file @
9049f2d0
package
com
.
dlink
.
executor
;
import
com.dlink.result.SqlExplainResult
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.RuntimeExecutionMode
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.ExecutionOptions
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
;
...
...
@@ -14,7 +16,10 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.JSONGenerator
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.*
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.internal.TableEnvironmentImpl
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.FunctionCatalog
;
...
...
@@ -37,7 +42,6 @@ import org.apache.flink.table.operations.command.ResetOperation;
import
org.apache.flink.table.operations.command.SetOperation
;
import
org.apache.flink.table.planner.delegation.ExecutorBase
;
import
org.apache.flink.table.planner.utils.ExecutorUtils
;
import
org.apache.flink.types.Row
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
...
...
@@ -51,7 +55,7 @@ import java.util.Map;
* @author wenmo
* @since 2021/6/7 22:06
**/
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
{
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
implements
CustomTableEnvironment
{
protected
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
TableConfig
tableConfig
,
Executor
executor
,
FunctionCatalog
functionCatalog
,
Planner
planner
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
);
...
...
@@ -61,14 +65,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
());
}
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
public
static
CustomTableEnvironmentImpl
createBatch
(
StreamExecutionEnvironment
executionEnvironment
)
{
Configuration
configuration
=
new
Configuration
();
configuration
.
set
(
ExecutionOptions
.
RUNTIME_MODE
,
RuntimeExecutionMode
.
BATCH
);
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
useBlinkPlanner
().
inBatchMode
().
build
(),
tableConfig
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
return
create
(
executionEnvironment
,
settings
,
new
TableConfig
());
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
,
TableConfig
tableConfig
)
{
if
(!
settings
.
isStreamingMode
())
{
throw
new
TableException
(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."
);
}
else
{
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
ModuleManager
moduleManager
=
new
ModuleManager
();
CatalogManager
catalogManager
=
CatalogManager
.
newBuilder
().
classLoader
(
classLoader
).
config
(
tableConfig
.
getConfiguration
()).
defaultCatalog
(
settings
.
getBuiltInCatalogName
(),
new
GenericInMemoryCatalog
(
settings
.
getBuiltInCatalogName
(),
settings
.
getBuiltInDatabaseName
())).
executionConfig
(
executionEnvironment
.
getConfig
()).
build
();
...
...
@@ -78,7 +87,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
Map
<
String
,
String
>
plannerProperties
=
settings
.
toPlannerProperties
();
Planner
planner
=
(
ComponentFactoryService
.
find
(
PlannerFactory
.
class
,
plannerProperties
)).
create
(
plannerProperties
,
executor
,
tableConfig
,
functionCatalog
,
catalogManager
);
return
new
CustomTableEnvironmentImpl
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
settings
.
isStreamingMode
(),
classLoader
);
}
}
private
static
Executor
lookupExecutor
(
Map
<
String
,
String
>
executorProperties
,
StreamExecutionEnvironment
executionEnvironment
)
{
...
...
@@ -98,25 +107,25 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
else
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
operations
.
size
();
i
++)
{
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
}
finally
{
return
objectNode
;
}
}
else
{
}
else
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() need a single SQL to query."
);
}
}
...
...
@@ -128,27 +137,27 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public
StreamGraph
getStreamGraphFromInserts
(
List
<
String
>
statements
)
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
for
(
String
statement
:
statements
)
{
for
(
String
statement
:
statements
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Only single statement is supported."
);
}
else
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
else
{
throw
new
TableException
(
"Only insert statement is supported now."
);
}
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
streamGraph
.
setJobName
(
tableConfig
.
getConfiguration
().
getString
(
PipelineOptions
.
NAME
));
}
return
streamGraph
;
}
else
{
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
...
...
@@ -178,11 +187,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
record
.
setExplain
(
operation
.
asSummaryString
());
operationlist
.
remove
(
i
);
record
.
setType
(
"DDL"
);
i
=
i
-
1
;
i
=
i
-
1
;
}
}
record
.
setExplainTrue
(
true
);
if
(
operationlist
.
size
()==
0
)
{
if
(
operationlist
.
size
()
==
0
)
{
//record.setExplain("DDL语句不进行解释。");
return
record
;
}
...
...
@@ -207,43 +216,43 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
tableAggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
public
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
for
(
Operation
operation
:
operations
)
{
if
(
operation
instanceof
SetOperation
)
{
callSet
((
SetOperation
)
operation
,
environment
,
setMap
);
for
(
Operation
operation
:
operations
)
{
if
(
operation
instanceof
SetOperation
)
{
callSet
((
SetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
else
if
(
operation
instanceof
ResetOperation
){
callReset
((
ResetOperation
)
operation
,
environment
,
setMap
);
}
else
if
(
operation
instanceof
ResetOperation
)
{
callReset
((
ResetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
}
return
false
;
}
private
void
callSet
(
SetOperation
setOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
private
void
callSet
(
SetOperation
setOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
setOperation
.
getKey
().
isPresent
()
&&
setOperation
.
getValue
().
isPresent
())
{
String
key
=
setOperation
.
getKey
().
get
().
trim
();
String
value
=
setOperation
.
getValue
().
get
().
trim
();
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
value
);
setMap
.
put
(
key
,
value
);
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
value
);
setMap
.
put
(
key
,
value
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
}
private
void
callReset
(
ResetOperation
resetOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
private
void
callReset
(
ResetOperation
resetOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
resetOperation
.
getKey
().
isPresent
())
{
String
key
=
resetOperation
.
getKey
().
get
().
trim
();
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
null
);
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
null
);
setMap
.
remove
(
key
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
else
{
}
else
{
setMap
.
clear
();
}
}
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
View file @
9049f2d0
...
...
@@ -4,15 +4,22 @@ import com.dlink.result.SqlExplainResult;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.RuntimeExecutionMode
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.ExecutionOptions
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.JSONGenerator
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.*
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.internal.TableEnvironmentImpl
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.FunctionCatalog
;
...
...
@@ -34,7 +41,6 @@ import org.apache.flink.table.operations.QueryOperation;
import
org.apache.flink.table.operations.command.ResetOperation
;
import
org.apache.flink.table.operations.command.SetOperation
;
import
org.apache.flink.table.planner.delegation.DefaultExecutor
;
import
org.apache.flink.types.Row
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
...
...
@@ -48,7 +54,7 @@ import java.util.Map;
* @author wenmo
* @since 2021/10/22 10:02
**/
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
{
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
implements
CustomTableEnvironment
{
protected
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
TableConfig
tableConfig
,
Executor
executor
,
FunctionCatalog
functionCatalog
,
Planner
planner
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
);
...
...
@@ -75,8 +81,16 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
userClassLoader
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
){
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
(),
TableConfig
.
getDefault
());
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
)
{
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
(),
TableConfig
.
getDefault
());
}
public
static
CustomTableEnvironmentImpl
createBatch
(
StreamExecutionEnvironment
executionEnvironment
)
{
Configuration
configuration
=
new
Configuration
();
configuration
.
set
(
ExecutionOptions
.
RUNTIME_MODE
,
RuntimeExecutionMode
.
BATCH
);
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
useBlinkPlanner
().
inBatchMode
().
build
(),
tableConfig
);
}
public
static
CustomTableEnvironmentImpl
create
(
...
...
@@ -155,53 +169,58 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
else
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
operations
.
size
();
i
++)
{
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
}
}
List
<
Transformation
<?>>
trans
=
super
.
planner
.
translate
(
modifyOperations
);
if
(
execEnv
instanceof
DefaultExecutor
)
{
if
(
execEnv
instanceof
DefaultExecutor
)
{
StreamGraph
streamGraph
=
((
DefaultExecutor
)
execEnv
).
getExecutionEnvironment
().
generateStreamGraph
(
trans
);
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
}
finally
{
return
objectNode
;
}
}
else
{
}
else
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() need a single SQL to query."
);
}
}
}
@Override
public
JobPlanInfo
getJobPlanInfo
(
List
<
String
>
statements
)
{
return
new
JobPlanInfo
(
JsonPlanGenerator
.
generatePlan
(
getJobGraphFromInserts
(
statements
)));
}
public
StreamGraph
getStreamGraphFromInserts
(
List
<
String
>
statements
)
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
for
(
String
statement
:
statements
)
{
for
(
String
statement
:
statements
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Only single statement is supported."
);
}
else
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
else
{
throw
new
TableException
(
"Only insert statement is supported now."
);
}
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
if
(
execEnv
instanceof
DefaultExecutor
)
{
if
(
execEnv
instanceof
DefaultExecutor
)
{
StreamGraph
streamGraph
=
((
DefaultExecutor
)
execEnv
).
getExecutionEnvironment
().
generateStreamGraph
(
trans
);
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
streamGraph
.
setJobName
(
tableConfig
.
getConfiguration
().
getString
(
PipelineOptions
.
NAME
));
}
return
streamGraph
;
}
else
{
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
...
...
@@ -231,11 +250,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
record
.
setExplain
(
operation
.
asSummaryString
());
operationlist
.
remove
(
i
);
record
.
setType
(
"DDL"
);
i
=
i
-
1
;
i
=
i
-
1
;
}
}
record
.
setExplainTrue
(
true
);
if
(
operationlist
.
size
()==
0
)
{
if
(
operationlist
.
size
()
==
0
)
{
//record.setExplain("DDL语句不进行解释。");
return
record
;
}
...
...
@@ -260,43 +279,43 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
tableAggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
public
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
for
(
Operation
operation
:
operations
)
{
if
(
operation
instanceof
SetOperation
)
{
callSet
((
SetOperation
)
operation
,
environment
,
setMap
);
for
(
Operation
operation
:
operations
)
{
if
(
operation
instanceof
SetOperation
)
{
callSet
((
SetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
else
if
(
operation
instanceof
ResetOperation
){
callReset
((
ResetOperation
)
operation
,
environment
,
setMap
);
}
else
if
(
operation
instanceof
ResetOperation
)
{
callReset
((
ResetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
}
return
false
;
}
private
void
callSet
(
SetOperation
setOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
private
void
callSet
(
SetOperation
setOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
setOperation
.
getKey
().
isPresent
()
&&
setOperation
.
getValue
().
isPresent
())
{
String
key
=
setOperation
.
getKey
().
get
().
trim
();
String
value
=
setOperation
.
getValue
().
get
().
trim
();
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
value
);
setMap
.
put
(
key
,
value
);
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
value
);
setMap
.
put
(
key
,
value
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
}
private
void
callReset
(
ResetOperation
resetOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
private
void
callReset
(
ResetOperation
resetOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
resetOperation
.
getKey
().
isPresent
())
{
String
key
=
resetOperation
.
getKey
().
get
().
trim
();
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
null
);
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
null
);
setMap
.
remove
(
key
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
else
{
}
else
{
setMap
.
clear
();
}
}
...
...
dlink-client/dlink-client-base/pom.xml
View file @
9049f2d0
...
...
@@ -14,14 +14,115 @@
<properties>
<source.level>
1.8
</source.level>
<target.level>
1.8
</target.level>
<dubbo.version>
3.0.2.1
</dubbo.version>
<flink.version>
1.13.5
</flink.version>
<scala.binary.version>
2.11
</scala.binary.version>
<junit.version>
4.12
</junit.version>
<spring.version>
4.3.16.RELEASE
</spring.version>
<maven-compiler-plugin.version>
3.7.0
</maven-compiler-plugin.version>
<maven.compiler.source>
8
</maven.compiler.source>
<maven.compiler.target>
8
</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>
org.springframework
</groupId>
<artifactId>
spring-framework-bom
</artifactId>
<version>
${spring.version}
</version>
<type>
pom
</type>
<scope>
import
</scope>
</dependency>
<dependency>
<groupId>
org.apache.dubbo
</groupId>
<artifactId>
dubbo-bom
</artifactId>
<version>
${dubbo.version}
</version>
<type>
pom
</type>
<scope>
import
</scope>
</dependency>
<dependency>
<groupId>
org.apache.dubbo
</groupId>
<artifactId>
dubbo-dependencies-zookeeper
</artifactId>
<version>
${dubbo.version}
</version>
<type>
pom
</type>
</dependency>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<version>
${junit.version}
</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-common
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-planner_${scala.binary.version}
</artifactId>
<exclusions>
<exclusion>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</exclusion>
</exclusions>
<version>
${flink.version}
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-planner-blink_${scala.binary.version}
</artifactId>
<exclusions>
<exclusion>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</exclusion>
</exclusions>
<version>
${flink.version}
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-clients_${scala.binary.version}
</artifactId>
<exclusions>
<exclusion>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</exclusion>
</exclusions>
<version>
${flink.version}
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
org.apache.dubbo
</groupId>
<artifactId>
dubbo
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
org.apache.dubbo
</groupId>
<artifactId>
dubbo-dependencies-zookeeper
</artifactId>
<type>
pom
</type>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.springframework
</groupId>
<artifactId>
spring-test
</artifactId>
<scope>
test
</scope>
</dependency>
</dependencies>
<profiles>
...
...
dlink-client/dlink-client-base/src/main/java/com/dlink/executor/CustomTableEnvironment.java
0 → 100644
View file @
9049f2d0
package
com
.
dlink
.
executor
;
import
com.dlink.result.SqlExplainResult
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.*
;
import
org.apache.flink.table.catalog.Catalog
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Optional
;
/**
* CustomTableEnvironment
*
* @author wenmo
* @since 2022/2/5 10:35
*/
public
interface
CustomTableEnvironment
{
TableConfig
getConfig
();
CatalogManager
getCatalogManager
();
void
registerCatalog
(
String
catalogName
,
Catalog
catalog
);
String
[]
listCatalogs
();
Optional
<
Catalog
>
getCatalog
(
String
catalogName
);
TableResult
executeSql
(
String
statement
);
Table
sqlQuery
(
String
statement
);
void
registerTable
(
String
name
,
Table
table
);
String
explainSql
(
String
statement
,
ExplainDetail
...
extraDetails
);
ObjectNode
getStreamGraph
(
String
statement
);
JobPlanInfo
getJobPlanInfo
(
List
<
String
>
statements
);
StreamGraph
getStreamGraphFromInserts
(
List
<
String
>
statements
);
JobGraph
getJobGraphFromInserts
(
List
<
String
>
statements
);
SqlExplainResult
explainSqlRecord
(
String
statement
,
ExplainDetail
...
extraDetails
);
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
config
,
Map
<
String
,
Object
>
setMap
);
StatementSet
createStatementSet
();
}
dlink-core/src/main/java/com/dlink/explainer/Explainer.java
View file @
9049f2d0
...
...
@@ -215,7 +215,7 @@ public class Explainer {
record
=
executor
.
explainSqlRecord
(
item
.
getValue
());
if
(
Asserts
.
isNull
(
record
))
{
record
=
new
SqlExplainResult
();
executor
.
get
Environment
().
get
StreamGraph
();
executor
.
getStreamGraph
();
}
else
{
executor
.
executeSql
(
item
.
getValue
());
}
...
...
dlink-core/src/main/java/com/dlink/job/JobConfig.java
View file @
9049f2d0
...
...
@@ -42,6 +42,7 @@ public class JobConfig {
private
String
jobName
;
private
boolean
useSqlFragment
;
private
boolean
useStatementSet
;
private
boolean
useBatchModel
;
private
Integer
maxRowNum
;
private
Integer
checkpoint
;
private
Integer
parallelism
;
...
...
@@ -66,7 +67,7 @@ public class JobConfig {
public
JobConfig
(
String
type
,
boolean
useResult
,
boolean
useChangeLog
,
boolean
useAutoCancel
,
boolean
useSession
,
String
session
,
Integer
clusterId
,
Integer
clusterConfigurationId
,
Integer
jarId
,
Integer
taskId
,
String
jobName
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
Integer
maxRowNum
,
Integer
checkpoint
,
Integer
parallelism
,
boolean
useStatementSet
,
boolean
useBatchModel
,
Integer
maxRowNum
,
Integer
checkpoint
,
Integer
parallelism
,
Integer
savePointStrategyValue
,
String
savePointPath
,
Map
<
String
,
String
>
config
)
{
this
.
type
=
type
;
this
.
useResult
=
useResult
;
...
...
@@ -82,6 +83,7 @@ public class JobConfig {
this
.
jobName
=
jobName
;
this
.
useSqlFragment
=
useSqlFragment
;
this
.
useStatementSet
=
useStatementSet
;
this
.
useBatchModel
=
useBatchModel
;
this
.
maxRowNum
=
maxRowNum
;
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
...
...
@@ -126,7 +128,7 @@ public class JobConfig {
public
JobConfig
(
String
type
,
boolean
useResult
,
boolean
useSession
,
boolean
useRemote
,
Integer
clusterId
,
Integer
clusterConfigurationId
,
Integer
jarId
,
Integer
taskId
,
String
jobName
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
Integer
checkpoint
,
Integer
parallelism
,
Integer
savePointStrategyValue
,
boolean
useStatementSet
,
boolean
useBatchModel
,
Integer
checkpoint
,
Integer
parallelism
,
Integer
savePointStrategyValue
,
String
savePointPath
,
Map
<
String
,
String
>
config
)
{
this
.
type
=
type
;
this
.
useResult
=
useResult
;
...
...
@@ -139,6 +141,7 @@ public class JobConfig {
this
.
jobName
=
jobName
;
this
.
useSqlFragment
=
useSqlFragment
;
this
.
useStatementSet
=
useStatementSet
;
this
.
useBatchModel
=
useBatchModel
;
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
savePointStrategy
=
SavePointStrategy
.
get
(
savePointStrategyValue
);
...
...
@@ -147,7 +150,7 @@ public class JobConfig {
}
public
ExecutorSetting
getExecutorSetting
(){
return
new
ExecutorSetting
(
checkpoint
,
parallelism
,
useSqlFragment
,
useStatementSet
,
savePointPath
,
jobName
,
config
);
return
new
ExecutorSetting
(
checkpoint
,
parallelism
,
useSqlFragment
,
useStatementSet
,
useBatchModel
,
savePointPath
,
jobName
,
config
);
}
public
void
setSessionConfig
(
SessionConfig
sessionConfig
){
...
...
dlink-core/src/main/java/com/dlink/job/JobManager.java
View file @
9049f2d0
...
...
@@ -328,7 +328,7 @@ public class JobManager {
break
;
}
}
StreamGraph
streamGraph
=
executor
.
get
Environment
().
get
StreamGraph
();
StreamGraph
streamGraph
=
executor
.
getStreamGraph
();
streamGraph
.
setJobName
(
config
.
getJobName
());
JobGraph
jobGraph
=
streamGraph
.
getJobGraph
();
GatewayResult
gatewayResult
=
null
;
...
...
@@ -352,7 +352,7 @@ public class JobManager {
break
;
}
}
JobExecutionResult
jobExecutionResult
=
executor
.
getEnvironment
().
execute
(
config
.
getJobName
());
JobExecutionResult
jobExecutionResult
=
executor
.
execute
(
config
.
getJobName
());
if
(
jobExecutionResult
.
isJobExecutionResult
())
{
job
.
setJobId
(
jobExecutionResult
.
getJobID
().
toHexString
());
}
...
...
dlink-core/src/test/java/com/dlink/core/BatchTest.java
0 → 100644
View file @
9049f2d0
package
com
.
dlink
.
core
;
import
com.dlink.executor.CustomTableEnvironmentImpl
;
import
org.apache.flink.api.common.RuntimeExecutionMode
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.ExecutionOptions
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableEnvironment
;
import
org.apache.flink.table.api.TableResult
;
import
org.junit.Test
;
/**
* BatchTest
*
* @author wenmo
* @since 2022/2/7 23:15
*/
public
class
BatchTest
{
@Test
public
void
batchTest
()
{
String
source
=
"CREATE TABLE Orders (\n"
+
" order_number BIGINT,\n"
+
" price DECIMAL(32,2),\n"
+
" buyer ROW<first_name STRING, last_name STRING>,\n"
+
" order_time TIMESTAMP(3)\n"
+
") WITH (\n"
+
" 'connector' = 'datagen',\n"
+
" 'number-of-rows' = '100'\n"
+
")"
;
String
select
=
"select order_number,price,order_time from Orders"
;
// LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
EnvironmentSettings
settings
=
EnvironmentSettings
.
newInstance
()
// .inStreamingMode() // 声明为流任务
.
inBatchMode
()
// 声明为批任务
.
build
();
TableEnvironment
tEnv
=
TableEnvironment
.
create
(
settings
);
tEnv
.
executeSql
(
source
);
TableResult
tableResult
=
tEnv
.
executeSql
(
select
);
tableResult
.
print
();
}
@Test
public
void
batchTest2
()
{
String
source
=
"CREATE TABLE Orders (\n"
+
" order_number BIGINT,\n"
+
" price DECIMAL(32,2),\n"
+
" buyer ROW<first_name STRING, last_name STRING>,\n"
+
" order_time TIMESTAMP(3)\n"
+
") WITH (\n"
+
" 'connector' = 'datagen',\n"
+
" 'number-of-rows' = '100'\n"
+
")"
;
String
select
=
"select order_number,price,order_time from Orders"
;
// LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment
environment
=
StreamExecutionEnvironment
.
createLocalEnvironment
();
Configuration
configuration
=
new
Configuration
();
configuration
.
set
(
ExecutionOptions
.
RUNTIME_MODE
,
RuntimeExecutionMode
.
BATCH
);
// configuration.setString("execution.runtime-mode", "STREAMING");
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
CustomTableEnvironmentImpl
batchTableEnvironment
=
CustomTableEnvironmentImpl
.
create
(
environment
,
EnvironmentSettings
.
newInstance
().
useBlinkPlanner
().
inBatchMode
().
build
(),
tableConfig
);
batchTableEnvironment
.
executeSql
(
source
);
batchTableEnvironment
.
executeSql
(
select
);
// TableResult tableResult = batchTableEnvironment.executeSql(select);
// tableResult.print();
}
}
dlink-core/src/test/java/com/dlink/core/JobManagerTest.java
View file @
9049f2d0
...
...
@@ -27,7 +27,7 @@ public class JobManagerTest {
public
void
cancelJobSelect
(){
JobConfig
config
=
new
JobConfig
(
"session-yarn"
,
true
,
true
,
true
,
true
,
"s1"
,
2
,
null
,
null
,
null
,
"测试"
,
false
,
false
,
100
,
0
,
null
,
null
,
null
,
"测试"
,
false
,
false
,
false
,
100
,
0
,
1
,
0
,
null
,
new
HashMap
<>());
if
(
config
.
isUseRemote
())
{
config
.
setAddress
(
"192.168.123.157:8081"
);
...
...
dlink-doc/sql/dlink.sql
View file @
9049f2d0
...
...
@@ -254,6 +254,7 @@ CREATE TABLE `dlink_task` (
`parallelism`
int
(
4
)
NULL
DEFAULT
NULL
COMMENT
'parallelism'
,
`fragment`
tinyint
(
1
)
NULL
DEFAULT
NULL
COMMENT
'fragment'
,
`statement_set`
tinyint
(
1
)
NULL
DEFAULT
NULL
COMMENT
'启用语句集'
,
`batch_model`
tinyint
(
1
)
NULL
DEFAULT
0
COMMENT
'使用批模式'
,
`cluster_id`
int
(
11
)
NULL
DEFAULT
NULL
COMMENT
'Flink集群ID'
,
`cluster_configuration_id`
int
(
11
)
NULL
DEFAULT
NULL
COMMENT
'集群配置ID'
,
`database_id`
int
(
11
)
NULL
DEFAULT
NULL
COMMENT
'数据源ID'
,
...
...
dlink-doc/sql/dlink_history.sql
View file @
9049f2d0
...
...
@@ -524,4 +524,10 @@ ALTER TABLE `dlink_task`
update
dlink_task
set
dialect
=
'FlinkJar'
where
jar_id
is
not
null
;
update
dlink_catalogue
set
type
=
'FlinkJar'
where
task_id
in
(
select
id
as
task_id
from
dlink_task
where
jar_id
is
not
null
);
-- ----------------------------
-- 0.6.0-SNAPSHOT 2022-02-07
-- ----------------------------
ALTER
TABLE
`dlink_task`
ADD
COLUMN
`batch_model`
tinyint
(
1
)
NULL
DEFAULT
0
COMMENT
'使用批模式'
AFTER
`statement_set`
;
SET
FOREIGN_KEY_CHECKS
=
1
;
dlink-executor/src/main/java/com/dlink/executor/AppBatchExecutor.java
0 → 100644
View file @
9049f2d0
package
com
.
dlink
.
executor
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
/**
* AppBatchExecutor
*
* @author wenmo
* @since 2022/2/7 22:14
*/
public
class
AppBatchExecutor
extends
Executor
{
public
AppBatchExecutor
(
ExecutorSetting
executorSetting
)
{
this
.
executorSetting
=
executorSetting
;
this
.
environment
=
StreamExecutionEnvironment
.
createLocalEnvironment
();
init
();
}
@Override
CustomTableEnvironment
createCustomTableEnvironment
()
{
return
CustomTableEnvironmentImpl
.
createBatch
(
environment
);
}
}
dlink-executor/src/main/java/com/dlink/executor/AppStreamExecutor.java
View file @
9049f2d0
...
...
@@ -8,11 +8,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @author wenmo
* @since 2021/11/18
*/
public
class
AppStreamExecutor
extends
Executor
{
public
class
AppStreamExecutor
extends
Executor
{
public
AppStreamExecutor
(
ExecutorSetting
executorSetting
)
{
this
.
executorSetting
=
executorSetting
;
this
.
environment
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
init
();
}
@Override
CustomTableEnvironment
createCustomTableEnvironment
()
{
return
CustomTableEnvironmentImpl
.
create
(
environment
);
}
}
dlink-executor/src/main/java/com/dlink/executor/EnvironmentSetting.java
View file @
9049f2d0
...
...
@@ -18,6 +18,7 @@ public class EnvironmentSetting {
private
String
host
;
private
int
port
;
private
boolean
useRemote
;
public
static
final
EnvironmentSetting
LOCAL
=
new
EnvironmentSetting
(
false
);
public
EnvironmentSetting
(
boolean
useRemote
)
{
...
...
dlink-executor/src/main/java/com/dlink/executor/Executor.java
View file @
9049f2d0
...
...
@@ -6,6 +6,8 @@ import com.dlink.result.SqlExplainResult;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.ExecutionConfig
;
import
org.apache.flink.api.common.JobExecutionResult
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
...
...
@@ -18,8 +20,6 @@ import org.apache.flink.table.api.ExplainDetail;
import
org.apache.flink.table.api.StatementSet
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.functions.ScalarFunction
;
import
org.apache.flink.table.functions.UserDefinedFunction
;
import
java.util.HashMap
;
import
java.util.List
;
...
...
@@ -27,16 +27,17 @@ import java.util.Map;
/**
* Executor
*
* @author wenmo
* @since 2021/11/17
**/
public
abstract
class
Executor
{
protected
StreamExecutionEnvironment
environment
;
protected
CustomTableEnvironment
Impl
stEnvironment
;
protected
CustomTableEnvironment
stEnvironment
;
protected
EnvironmentSetting
environmentSetting
;
protected
ExecutorSetting
executorSetting
;
protected
Map
<
String
,
Object
>
setConfig
=
new
HashMap
<>();
protected
Map
<
String
,
Object
>
setConfig
=
new
HashMap
<>();
protected
SqlManager
sqlManager
=
new
SqlManager
();
protected
boolean
useSqlFragment
=
true
;
...
...
@@ -49,44 +50,60 @@ public abstract class Executor {
return
useSqlFragment
;
}
public
static
Executor
build
(){
public
static
Executor
build
()
{
return
new
LocalStreamExecutor
(
ExecutorSetting
.
DEFAULT
);
}
public
static
Executor
build
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
)
{
if
(
environmentSetting
.
isUseRemote
())
{
return
buildRemoteExecutor
(
environmentSetting
,
executorSetting
);
}
else
{
public
static
Executor
build
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
)
{
if
(
environmentSetting
.
isUseRemote
())
{
return
buildRemoteExecutor
(
environmentSetting
,
executorSetting
);
}
else
{
return
buildLocalExecutor
(
executorSetting
);
}
}
public
static
Executor
buildLocalExecutor
(
ExecutorSetting
executorSetting
){
public
static
Executor
buildLocalExecutor
(
ExecutorSetting
executorSetting
)
{
if
(
executorSetting
.
isUseBatchModel
())
{
return
new
LocalBatchExecutor
(
executorSetting
);
}
else
{
return
new
LocalStreamExecutor
(
executorSetting
);
}
}
public
static
Executor
buildAppStreamExecutor
(
ExecutorSetting
executorSetting
){
public
static
Executor
buildAppStreamExecutor
(
ExecutorSetting
executorSetting
)
{
if
(
executorSetting
.
isUseBatchModel
())
{
return
new
AppBatchExecutor
(
executorSetting
);
}
else
{
return
new
AppStreamExecutor
(
executorSetting
);
}
}
public
static
Executor
buildRemoteExecutor
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
)
{
public
static
Executor
buildRemoteExecutor
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
)
{
environmentSetting
.
setUseRemote
(
true
);
return
new
RemoteStreamExecutor
(
environmentSetting
,
executorSetting
);
if
(
executorSetting
.
isUseBatchModel
())
{
return
new
RemoteBatchExecutor
(
environmentSetting
,
executorSetting
);
}
else
{
return
new
RemoteStreamExecutor
(
environmentSetting
,
executorSetting
);
}
}
public
StreamExecutionEnvironment
getEnvironment
(){
public
ExecutionConfig
getExecutionConfig
()
{
return
environment
.
getConfig
();
}
public
StreamExecutionEnvironment
getStreamExecutionEnvironment
()
{
return
environment
;
}
public
CustomTableEnvironment
Impl
getCustomTableEnvironmentImpl
()
{
public
CustomTableEnvironment
getCustomTableEnvironment
()
{
return
stEnvironment
;
}
public
ExecutorSetting
getExecutorSetting
(){
public
ExecutorSetting
getExecutorSetting
()
{
return
executorSetting
;
}
public
EnvironmentSetting
getEnvironmentSetting
(){
public
EnvironmentSetting
getEnvironmentSetting
()
{
return
environmentSetting
;
}
...
...
@@ -98,192 +115,202 @@ public abstract class Executor {
this
.
setConfig
=
setConfig
;
}
protected
void
init
(){
protected
void
init
()
{
initEnvironment
();
initStreamExecutionEnvironment
();
}
public
void
update
(
ExecutorSetting
executorSetting
){
public
void
update
(
ExecutorSetting
executorSetting
)
{
updateEnvironment
(
executorSetting
);
updateStreamExecutionEnvironment
(
executorSetting
);
}
p
rivate
void
initEnvironment
()
{
if
(
executorSetting
.
getCheckpoint
()!=
null
&&
executorSetting
.
getCheckpoint
()>
0
)
{
p
ublic
void
initEnvironment
()
{
if
(
executorSetting
.
getCheckpoint
()
!=
null
&&
executorSetting
.
getCheckpoint
()
>
0
)
{
environment
.
enableCheckpointing
(
executorSetting
.
getCheckpoint
());
}
if
(
executorSetting
.
getParallelism
()!=
null
&&
executorSetting
.
getParallelism
()>
0
)
{
if
(
executorSetting
.
getParallelism
()
!=
null
&&
executorSetting
.
getParallelism
()
>
0
)
{
environment
.
setParallelism
(
executorSetting
.
getParallelism
());
}
if
(
executorSetting
.
getConfig
()!=
null
)
{
if
(
executorSetting
.
getConfig
()
!=
null
)
{
Configuration
configuration
=
Configuration
.
fromMap
(
executorSetting
.
getConfig
());
environment
.
getConfig
().
configure
(
configuration
,
null
);
}
}
p
rivate
void
updateEnvironment
(
ExecutorSetting
executorSetting
)
{
if
(
executorSetting
.
getCheckpoint
()!=
null
&&
executorSetting
.
getCheckpoint
()>
0
)
{
p
ublic
void
updateEnvironment
(
ExecutorSetting
executorSetting
)
{
if
(
executorSetting
.
getCheckpoint
()
!=
null
&&
executorSetting
.
getCheckpoint
()
>
0
)
{
environment
.
enableCheckpointing
(
executorSetting
.
getCheckpoint
());
}
if
(
executorSetting
.
getParallelism
()!=
null
&&
executorSetting
.
getParallelism
()>
0
)
{
if
(
executorSetting
.
getParallelism
()
!=
null
&&
executorSetting
.
getParallelism
()
>
0
)
{
environment
.
setParallelism
(
executorSetting
.
getParallelism
());
}
if
(
executorSetting
.
getConfig
()!=
null
)
{
if
(
executorSetting
.
getConfig
()
!=
null
)
{
Configuration
configuration
=
Configuration
.
fromMap
(
executorSetting
.
getConfig
());
environment
.
getConfig
().
configure
(
configuration
,
null
);
}
}
private
void
initStreamExecutionEnvironment
(){
abstract
CustomTableEnvironment
createCustomTableEnvironment
();
private
void
initStreamExecutionEnvironment
()
{
useSqlFragment
=
executorSetting
.
isUseSqlFragment
();
stEnvironment
=
CustomTableEnvironmentImpl
.
create
(
environment
);
if
(
executorSetting
.
getJobName
()!=
null
&&!
""
.
equals
(
executorSetting
.
getJobName
()))
{
stEnvironment
=
createCustomTableEnvironment
(
);
if
(
executorSetting
.
getJobName
()
!=
null
&&
!
""
.
equals
(
executorSetting
.
getJobName
()))
{
stEnvironment
.
getConfig
().
getConfiguration
().
setString
(
PipelineOptions
.
NAME
.
key
(),
executorSetting
.
getJobName
());
}
setConfig
.
put
(
PipelineOptions
.
NAME
.
key
(),
executorSetting
.
getJobName
());
if
(
executorSetting
.
getConfig
()!=
null
)
{
setConfig
.
put
(
PipelineOptions
.
NAME
.
key
(),
executorSetting
.
getJobName
());
if
(
executorSetting
.
getConfig
()
!=
null
)
{
for
(
Map
.
Entry
<
String
,
String
>
entry
:
executorSetting
.
getConfig
().
entrySet
())
{
stEnvironment
.
getConfig
().
getConfiguration
().
setString
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
}
private
void
updateStreamExecutionEnvironment
(
ExecutorSetting
executorSetting
){
private
void
updateStreamExecutionEnvironment
(
ExecutorSetting
executorSetting
)
{
useSqlFragment
=
executorSetting
.
isUseSqlFragment
();
copyCatalog
();
if
(
executorSetting
.
getJobName
()!=
null
&&!
""
.
equals
(
executorSetting
.
getJobName
()))
{
if
(
executorSetting
.
getJobName
()
!=
null
&&
!
""
.
equals
(
executorSetting
.
getJobName
()))
{
stEnvironment
.
getConfig
().
getConfiguration
().
setString
(
PipelineOptions
.
NAME
.
key
(),
executorSetting
.
getJobName
());
}
setConfig
.
put
(
PipelineOptions
.
NAME
.
key
(),
executorSetting
.
getJobName
());
if
(
executorSetting
.
getConfig
()!=
null
)
{
setConfig
.
put
(
PipelineOptions
.
NAME
.
key
(),
executorSetting
.
getJobName
());
if
(
executorSetting
.
getConfig
()
!=
null
)
{
for
(
Map
.
Entry
<
String
,
String
>
entry
:
executorSetting
.
getConfig
().
entrySet
())
{
stEnvironment
.
getConfig
().
getConfiguration
().
setString
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
}
private
void
copyCatalog
(){
private
void
copyCatalog
()
{
String
[]
catalogs
=
stEnvironment
.
listCatalogs
();
CustomTableEnvironment
Impl
newstEnvironment
=
CustomTableEnvironmentImpl
.
create
(
environment
);
CustomTableEnvironment
newstEnvironment
=
createCustomTableEnvironment
(
);
for
(
int
i
=
0
;
i
<
catalogs
.
length
;
i
++)
{
if
(
stEnvironment
.
getCatalog
(
catalogs
[
i
]).
isPresent
())
{
newstEnvironment
.
getCatalogManager
().
unregisterCatalog
(
catalogs
[
i
],
true
);
if
(
stEnvironment
.
getCatalog
(
catalogs
[
i
]).
isPresent
())
{
newstEnvironment
.
getCatalogManager
().
unregisterCatalog
(
catalogs
[
i
],
true
);
newstEnvironment
.
registerCatalog
(
catalogs
[
i
],
stEnvironment
.
getCatalog
(
catalogs
[
i
]).
get
());
}
}
stEnvironment
=
newstEnvironment
;
}
public
String
pretreatStatement
(
String
statement
){
return
FlinkInterceptor
.
pretreatStatement
(
this
,
statement
);
public
String
pretreatStatement
(
String
statement
)
{
return
FlinkInterceptor
.
pretreatStatement
(
this
,
statement
);
}
private
boolean
pretreatExecute
(
String
statement
){
return
!
FlinkInterceptor
.
build
(
this
,
statement
);
private
boolean
pretreatExecute
(
String
statement
)
{
return
!
FlinkInterceptor
.
build
(
this
,
statement
);
}
public
TableResult
executeSql
(
String
statement
){
public
JobExecutionResult
execute
(
String
jobName
)
throws
Exception
{
return
environment
.
execute
(
jobName
);
}
public
TableResult
executeSql
(
String
statement
)
{
statement
=
pretreatStatement
(
statement
);
if
(
pretreatExecute
(
statement
))
{
if
(
pretreatExecute
(
statement
))
{
return
stEnvironment
.
executeSql
(
statement
);
}
else
{
}
else
{
return
CustomTableResultImpl
.
TABLE_RESULT_OK
;
}
}
public
String
explainSql
(
String
statement
,
ExplainDetail
...
extraDetails
){
public
String
explainSql
(
String
statement
,
ExplainDetail
...
extraDetails
)
{
statement
=
pretreatStatement
(
statement
);
if
(
pretreatExecute
(
statement
))
{
return
stEnvironment
.
explainSql
(
statement
,
extraDetails
);
}
else
{
if
(
pretreatExecute
(
statement
))
{
return
stEnvironment
.
explainSql
(
statement
,
extraDetails
);
}
else
{
return
""
;
}
}
public
SqlExplainResult
explainSqlRecord
(
String
statement
,
ExplainDetail
...
extraDetails
){
public
SqlExplainResult
explainSqlRecord
(
String
statement
,
ExplainDetail
...
extraDetails
)
{
statement
=
pretreatStatement
(
statement
);
if
(
Asserts
.
isNotNullString
(
statement
)&&
pretreatExecute
(
statement
))
{
return
stEnvironment
.
explainSqlRecord
(
statement
,
extraDetails
);
}
else
{
if
(
Asserts
.
isNotNullString
(
statement
)
&&
pretreatExecute
(
statement
))
{
return
stEnvironment
.
explainSqlRecord
(
statement
,
extraDetails
);
}
else
{
return
null
;
}
}
public
ObjectNode
getStreamGraph
(
String
statement
){
public
ObjectNode
getStreamGraph
(
String
statement
)
{
statement
=
pretreatStatement
(
statement
);
if
(
pretreatExecute
(
statement
))
{
if
(
pretreatExecute
(
statement
))
{
return
stEnvironment
.
getStreamGraph
(
statement
);
}
else
{
}
else
{
return
null
;
}
}
public
ObjectNode
getStreamGraph
(
List
<
String
>
statements
){
public
ObjectNode
getStreamGraph
(
List
<
String
>
statements
)
{
StreamGraph
streamGraph
=
stEnvironment
.
getStreamGraphFromInserts
(
statements
);
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
}
finally
{
return
objectNode
;
}
}
public
ObjectNode
getStreamGraphFromDataStream
(
List
<
String
>
statements
){
for
(
String
statement
:
statements
){
public
StreamGraph
getStreamGraph
()
{
return
environment
.
getStreamGraph
();
}
public
ObjectNode
getStreamGraphFromDataStream
(
List
<
String
>
statements
)
{
for
(
String
statement
:
statements
)
{
executeSql
(
statement
);
}
StreamGraph
streamGraph
=
environment
.
getStreamGraph
();
StreamGraph
streamGraph
=
getStreamGraph
();
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
}
finally
{
return
objectNode
;
}
}
public
JobPlanInfo
getJobPlanInfo
(
List
<
String
>
statements
){
public
JobPlanInfo
getJobPlanInfo
(
List
<
String
>
statements
)
{
return
stEnvironment
.
getJobPlanInfo
(
statements
);
}
public
JobPlanInfo
getJobPlanInfoFromDataStream
(
List
<
String
>
statements
){
for
(
String
statement
:
statements
)
{
public
JobPlanInfo
getJobPlanInfoFromDataStream
(
List
<
String
>
statements
)
{
for
(
String
statement
:
statements
)
{
executeSql
(
statement
);
}
StreamGraph
streamGraph
=
environment
.
getStreamGraph
();
StreamGraph
streamGraph
=
getStreamGraph
();
return
new
JobPlanInfo
(
JsonPlanGenerator
.
generatePlan
(
streamGraph
.
getJobGraph
()));
}
public
void
registerFunction
(
String
name
,
ScalarFunction
function
){
/*
public void registerFunction(String name, ScalarFunction function){
stEnvironment.registerFunction(name,function);
}
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2){
stEnvironment.createTemporarySystemFunction(name,var2);
}
}
*/
public
CatalogManager
getCatalogManager
(){
public
CatalogManager
getCatalogManager
()
{
return
stEnvironment
.
getCatalogManager
();
}
public
JobGraph
getJobGraphFromInserts
(
List
<
String
>
statements
){
public
JobGraph
getJobGraphFromInserts
(
List
<
String
>
statements
)
{
return
stEnvironment
.
getJobGraphFromInserts
(
statements
);
}
public
StatementSet
createStatementSet
(){
public
StatementSet
createStatementSet
()
{
return
stEnvironment
.
createStatementSet
();
}
public
TableResult
executeStatementSet
(
List
<
String
>
statements
){
public
TableResult
executeStatementSet
(
List
<
String
>
statements
)
{
StatementSet
statementSet
=
stEnvironment
.
createStatementSet
();
for
(
String
item
:
statements
)
{
statementSet
.
addInsertSql
(
item
);
...
...
@@ -291,7 +318,7 @@ public abstract class Executor {
return
statementSet
.
execute
();
}
public
String
explainStatementSet
(
List
<
String
>
statements
){
public
String
explainStatementSet
(
List
<
String
>
statements
)
{
StatementSet
statementSet
=
stEnvironment
.
createStatementSet
();
for
(
String
item
:
statements
)
{
statementSet
.
addInsertSql
(
item
);
...
...
@@ -299,15 +326,15 @@ public abstract class Executor {
return
statementSet
.
explain
();
}
public
void
submitSql
(
String
statements
){
public
void
submitSql
(
String
statements
)
{
executeSql
(
statements
);
}
public
void
submitStatementSet
(
List
<
String
>
statements
){
public
void
submitStatementSet
(
List
<
String
>
statements
)
{
executeStatementSet
(
statements
);
}
public
boolean
parseAndLoadConfiguration
(
String
statement
){
return
stEnvironment
.
parseAndLoadConfiguration
(
statement
,
environment
,
setConfig
);
public
boolean
parseAndLoadConfiguration
(
String
statement
)
{
return
stEnvironment
.
parseAndLoadConfiguration
(
statement
,
environment
,
setConfig
);
}
}
dlink-executor/src/main/java/com/dlink/executor/ExecutorSetting.java
View file @
9049f2d0
...
...
@@ -21,6 +21,7 @@ import java.util.Map;
@Getter
public
class
ExecutorSetting
{
private
boolean
useBatchModel
=
false
;
private
Integer
checkpoint
;
private
Integer
parallelism
;
private
boolean
useSqlFragment
;
...
...
@@ -74,17 +75,19 @@ public class ExecutorSetting {
this
.
config
=
config
;
}
public
ExecutorSetting
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
String
savePointPath
,
String
jobName
,
Map
<
String
,
String
>
config
)
{
public
ExecutorSetting
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
boolean
useBatchModel
,
String
savePointPath
,
String
jobName
,
Map
<
String
,
String
>
config
)
{
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
useSqlFragment
=
useSqlFragment
;
this
.
useStatementSet
=
useStatementSet
;
this
.
useBatchModel
=
useBatchModel
;
this
.
savePointPath
=
savePointPath
;
this
.
jobName
=
jobName
;
this
.
config
=
config
;
}
public
static
ExecutorSetting
build
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
String
savePointPath
,
String
jobName
,
String
configJson
){
public
static
ExecutorSetting
build
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
boolean
useBatchModel
,
String
savePointPath
,
String
jobName
,
String
configJson
){
List
<
Map
<
String
,
String
>>
configList
=
new
ArrayList
<>();
if
(
Asserts
.
isNotNullString
(
configJson
))
{
try
{
...
...
@@ -97,7 +100,7 @@ public class ExecutorSetting {
for
(
Map
<
String
,
String
>
item
:
configList
){
config
.
put
(
item
.
get
(
"key"
),
item
.
get
(
"value"
));
}
return
new
ExecutorSetting
(
checkpoint
,
parallelism
,
useSqlFragment
,
useStatementSet
,
savePointPath
,
jobName
,
config
);
return
new
ExecutorSetting
(
checkpoint
,
parallelism
,
useSqlFragment
,
useStatementSet
,
useBatchModel
,
savePointPath
,
jobName
,
config
);
}
public
static
ExecutorSetting
build
(
Map
<
String
,
String
>
settingMap
){
...
...
@@ -113,6 +116,7 @@ public class ExecutorSetting {
parallelism
,
"1"
.
equals
(
settingMap
.
get
(
"useSqlFragment"
)),
"1"
.
equals
(
settingMap
.
get
(
"useStatementSet"
)),
"1"
.
equals
(
settingMap
.
get
(
"useBatchModel"
)),
settingMap
.
get
(
"savePointPath"
),
settingMap
.
get
(
"jobName"
),
settingMap
.
get
(
"config"
));
...
...
dlink-executor/src/main/java/com/dlink/executor/LocalBatchExecutor.java
0 → 100644
View file @
9049f2d0
package
com
.
dlink
.
executor
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
/**
* LocalBatchExecutor
*
* @author wenmo
* @since 2022/2/4 0:04
*/
public
class
LocalBatchExecutor
extends
Executor
{
public
LocalBatchExecutor
(
ExecutorSetting
executorSetting
)
{
this
.
executorSetting
=
executorSetting
;
this
.
environment
=
StreamExecutionEnvironment
.
createLocalEnvironment
();
init
();
}
@Override
CustomTableEnvironment
createCustomTableEnvironment
()
{
return
CustomTableEnvironmentImpl
.
createBatch
(
environment
);
}
}
dlink-executor/src/main/java/com/dlink/executor/LocalStreamExecutor.java
View file @
9049f2d0
...
...
@@ -16,4 +16,8 @@ public class LocalStreamExecutor extends Executor {
init
();
}
@Override
CustomTableEnvironment
createCustomTableEnvironment
()
{
return
CustomTableEnvironmentImpl
.
create
(
environment
);
}
}
dlink-executor/src/main/java/com/dlink/executor/RemoteBatchExecutor.java
0 → 100644
View file @
9049f2d0
package
com
.
dlink
.
executor
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
/**
* RemoteBatchExecutor
*
* @author wenmo
* @since 2022/2/7 22:10
*/
public
class
RemoteBatchExecutor
extends
Executor
{
public
RemoteBatchExecutor
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
)
{
this
.
environmentSetting
=
environmentSetting
;
this
.
executorSetting
=
executorSetting
;
this
.
environment
=
StreamExecutionEnvironment
.
createRemoteEnvironment
(
environmentSetting
.
getHost
(),
environmentSetting
.
getPort
());
init
();
}
@Override
CustomTableEnvironment
createCustomTableEnvironment
()
{
return
CustomTableEnvironmentImpl
.
createBatch
(
environment
);
}
}
dlink-executor/src/main/java/com/dlink/executor/RemoteStreamExecutor.java
View file @
9049f2d0
...
...
@@ -10,11 +10,15 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
**/
public
class
RemoteStreamExecutor
extends
Executor
{
public
RemoteStreamExecutor
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
)
{
public
RemoteStreamExecutor
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
)
{
this
.
environmentSetting
=
environmentSetting
;
this
.
executorSetting
=
executorSetting
;
this
.
environment
=
StreamExecutionEnvironment
.
createRemoteEnvironment
(
environmentSetting
.
getHost
(),
environmentSetting
.
getPort
());
init
();
}
@Override
CustomTableEnvironment
createCustomTableEnvironment
()
{
return
CustomTableEnvironmentImpl
.
create
(
environment
);
}
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CreateAggTableOperation.java
View file @
9049f2d0
...
...
@@ -37,7 +37,7 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
@Override
public
void
build
(
Executor
executor
)
{
AggTable
aggTable
=
AggTable
.
build
(
statement
);
Table
source
=
executor
.
getCustomTableEnvironment
Impl
().
sqlQuery
(
"select * from "
+
aggTable
.
getTable
());
Table
source
=
executor
.
getCustomTableEnvironment
().
sqlQuery
(
"select * from "
+
aggTable
.
getTable
());
List
<
String
>
wheres
=
aggTable
.
getWheres
();
if
(
wheres
!=
null
&&
wheres
.
size
()>
0
)
{
for
(
String
s
:
wheres
)
{
...
...
@@ -47,6 +47,6 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
Table
sink
=
source
.
groupBy
(
aggTable
.
getGroupBy
())
.
flatAggregate
(
aggTable
.
getAggBy
())
.
select
(
aggTable
.
getColumns
());
executor
.
getCustomTableEnvironment
Impl
().
registerTable
(
aggTable
.
getName
(),
sink
);
executor
.
getCustomTableEnvironment
().
registerTable
(
aggTable
.
getName
(),
sink
);
}
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
View file @
9049f2d0
...
...
@@ -40,7 +40,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
,
cdcSource
.
getPassword
(),
cdcSource
.
getCheckpoint
(),
cdcSource
.
getParallelism
(),
cdcSource
.
getDatabase
(),
cdcSource
.
getTable
()
,
cdcSource
.
getTopic
(),
cdcSource
.
getBrokers
());
try
{
FlinkCDCMergeBuilder
.
buildMySqlCDC
(
executor
.
getEnvironment
(),
config
);
FlinkCDCMergeBuilder
.
buildMySqlCDC
(
executor
.
get
StreamExecution
Environment
(),
config
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
...
...
dlink-executor/src/main/java/com/dlink/trans/ddl/SetOperation.java
View file @
9049f2d0
...
...
@@ -52,10 +52,10 @@ public class SetOperation extends AbstractOperation implements Operation {
if
(
Asserts
.
isNotNullMap
(
map
)&&
map
.
size
()==
2
)
{
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
StringUtils
.
join
(
map
.
get
(
"SET"
),
"."
),
StringUtils
.
join
(
map
.
get
(
"="
),
","
));
executor
.
getCustomTableEnvironment
Impl
().
getConfig
().
addConfiguration
(
Configuration
.
fromMap
(
confMap
));
executor
.
getCustomTableEnvironment
().
getConfig
().
addConfiguration
(
Configuration
.
fromMap
(
confMap
));
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
executor
.
getE
nvironment
().
get
Config
().
configure
(
configuration
,
null
);
executor
.
getCustomTableEnvironment
Impl
().
getConfig
().
addConfiguration
(
configuration
);
executor
.
getE
xecution
Config
().
configure
(
configuration
,
null
);
executor
.
getCustomTableEnvironment
().
getConfig
().
addConfiguration
(
configuration
);
}
}
}
dlink-web/src/components/Studio/StudioRightTool/StudioSetting/index.tsx
View file @
9049f2d0
...
...
@@ -14,7 +14,7 @@ const {Text} = Typography;
const
StudioSetting
=
(
props
:
any
)
=>
{
const
{
sessionCluster
,
clusterConfiguration
,
current
,
form
,
dispatch
,
tabs
,
currentSession
,
jars
,
env
,
toolHeight
}
=
props
;
const
{
sessionCluster
,
clusterConfiguration
,
current
,
form
,
dispatch
,
tabs
,
currentSession
,
env
,
toolHeight
}
=
props
;
const
getClusterOptions
=
()
=>
{
const
itemList
=
[];
...
...
@@ -209,6 +209,17 @@ const StudioSetting = (props: any) => {
</
Form
.
Item
>
</
Col
>
</
Row
>
<
Row
>
<
Col
span=
{
12
}
>
<
Form
.
Item
label=
"批模式"
className=
{
styles
.
form_item
}
name=
"batchModel"
valuePropName=
"checked"
tooltip=
{
{
title
:
'使用批模式'
,
icon
:
<
InfoCircleOutlined
/>}
}
>
<
Switch
checkedChildren=
"启用"
unCheckedChildren=
"禁用"
/>
</
Form
.
Item
>
</
Col
>
</
Row
>
<
Form
.
Item
label=
"SavePoint策略"
className=
{
styles
.
form_item
}
name=
"savePointStrategy"
tooltip=
'指定 SavePoint策略,默认为禁用'
...
...
dlink-web/src/pages/FlinkSqlStudio/model.ts
View file @
9049f2d0
...
...
@@ -70,6 +70,7 @@ export type TaskType = {
parallelism
?:
number
,
fragment
?:
boolean
,
statementSet
?:
boolean
,
batchModel
?:
boolean
,
config
?:
[],
clusterId
?:
any
,
clusterName
?:
string
,
...
...
@@ -355,7 +356,7 @@ const Model: ModelType = {
const
{
deleteType
,
current
}
=
payload
;
const
newTabs
=
state
.
tabs
;
const
firstKey
=
newTabs
.
panes
[
0
].
key
;
cons
t
newCurrent
=
newTabs
.
panes
[
0
];
le
t
newCurrent
=
newTabs
.
panes
[
0
];
if
(
deleteType
==
'CLOSE_OTHER'
)
{
const
keys
=
[
firstKey
,
current
.
key
];
newCurrent
=
current
;
...
...
dlink-web/src/pages/Welcome.tsx
View file @
9049f2d0
...
...
@@ -647,6 +647,9 @@ export default (): React.ReactNode => {
<
li
>
<
Link
>
新增 FlinkJar Dialect 的管理
</
Link
>
</
li
>
<
li
>
<
Link
>
新增 Batch 引擎
</
Link
>
</
li
>
</
ul
>
</
Paragraph
>
</
Timeline
.
Item
>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment