Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
59c7b06d
Commit
59c7b06d
authored
Oct 29, 2021
by
godkaikai
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
yarn application 测试
parent
8ee08a32
Changes
38
Hide whitespace changes
Inline
Side-by-side
Showing
38 changed files
with
3343 additions
and
88 deletions
+3343
-88
pom.xml
dlink-admin/pom.xml
+6
-0
StudioController.java
.../src/main/java/com/dlink/controller/StudioController.java
+8
-0
StudioService.java
...-admin/src/main/java/com/dlink/service/StudioService.java
+4
-0
StudioServiceImpl.java
...c/main/java/com/dlink/service/impl/StudioServiceImpl.java
+8
-0
pom.xml
dlink-app/pom.xml
+129
-0
MainApp.java
dlink-app/src/main/java/com/dlink/app/MainApp.java
+30
-0
Asserts.java
dlink-app/src/main/java/com/dlink/app/assertion/Asserts.java
+101
-0
AppConstant.java
...app/src/main/java/com/dlink/app/constant/AppConstant.java
+12
-0
DBConfig.java
dlink-app/src/main/java/com/dlink/app/db/DBConfig.java
+62
-0
DBUtil.java
dlink-app/src/main/java/com/dlink/app/db/DBUtil.java
+62
-0
Executor.java
dlink-app/src/main/java/com/dlink/app/executor/Executor.java
+66
-0
ExecutorSetting.java
...src/main/java/com/dlink/app/executor/ExecutorSetting.java
+89
-0
SqlManager.java
...-app/src/main/java/com/dlink/app/executor/SqlManager.java
+178
-0
FlinkSQLFactory.java
...src/main/java/com/dlink/app/flinksql/FlinkSQLFactory.java
+44
-0
package.xml
dlink-assembly/src/main/assembly/package.xml
+8
-0
pom.xml
dlink-client/dlink-client-1.12/pom.xml
+5
-0
pom.xml
dlink-executor/pom.xml
+5
-5
AbstractExecutor.java
...or/src/main/java/com/dlink/executor/AbstractExecutor.java
+0
-36
EnvironmentSetting.java
.../src/main/java/com/dlink/executor/EnvironmentSetting.java
+1
-9
Executor.java
...k-executor/src/main/java/com/dlink/executor/Executor.java
+152
-30
LocalStreamExecutor.java
...src/main/java/com/dlink/executor/LocalStreamExecutor.java
+19
-0
RemoteStreamExecutor.java
...rc/main/java/com/dlink/executor/RemoteStreamExecutor.java
+20
-0
pom.xml
dlink-gateway/pom.xml
+50
-0
AbstractGateway.java
...eway/src/main/java/com/dlink/gateway/AbstractGateway.java
+36
-0
Gateway.java
dlink-gateway/src/main/java/com/dlink/gateway/Gateway.java
+51
-0
GatewayConfig.java
...ateway/src/main/java/com/dlink/gateway/GatewayConfig.java
+76
-0
GatewayType.java
...-gateway/src/main/java/com/dlink/gateway/GatewayType.java
+39
-0
GatewayException.java
...in/java/com/dlink/gateway/exception/GatewayException.java
+18
-0
AbstractGatewayResult.java
.../java/com/dlink/gateway/result/AbstractGatewayResult.java
+51
-0
GatewayResult.java
...src/main/java/com/dlink/gateway/result/GatewayResult.java
+12
-0
YarnResult.java
...ay/src/main/java/com/dlink/gateway/result/YarnResult.java
+36
-0
YarnApplicationGateway.java
...n/java/com/dlink/gateway/yarn/YarnApplicationGateway.java
+112
-0
YarnGateway.java
...way/src/main/java/com/dlink/gateway/yarn/YarnGateway.java
+26
-0
YarnClusterDescriptor.java
...ain/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+1782
-0
com.dlink.gateway.Gateway
...ain/resources/META-INF/services/com.dlink.gateway.Gateway
+1
-0
GatewayTest.java
...-gateway/src/test/java/com/dlink/gateway/GatewayTest.java
+25
-0
index.tsx
...c/components/Studio/StudioConsole/StudioProcess/index.tsx
+7
-4
pom.xml
pom.xml
+12
-4
No files found.
dlink-admin/pom.xml
View file @
59c7b06d
...
@@ -114,6 +114,12 @@
...
@@ -114,6 +114,12 @@
<groupId>
com.dlink
</groupId>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-metadata-base
</artifactId>
<artifactId>
dlink-metadata-base
</artifactId>
</dependency>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-gateway
</artifactId>
<scope>
provided
</scope>
</dependency>
<!--<dependency>
<!--<dependency>
<groupId>com.dlink</groupId>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-mysql</artifactId>
<artifactId>dlink-metadata-mysql</artifactId>
...
...
dlink-admin/src/main/java/com/dlink/controller/StudioController.java
View file @
59c7b06d
...
@@ -5,6 +5,7 @@ import com.dlink.dto.SessionDTO;
...
@@ -5,6 +5,7 @@ import com.dlink.dto.SessionDTO;
import
com.dlink.dto.StudioCADTO
;
import
com.dlink.dto.StudioCADTO
;
import
com.dlink.dto.StudioDDLDTO
;
import
com.dlink.dto.StudioDDLDTO
;
import
com.dlink.dto.StudioExecuteDTO
;
import
com.dlink.dto.StudioExecuteDTO
;
import
com.dlink.gateway.GatewayConfig
;
import
com.dlink.job.JobResult
;
import
com.dlink.job.JobResult
;
import
com.dlink.result.IResult
;
import
com.dlink.result.IResult
;
import
com.dlink.service.StudioService
;
import
com.dlink.service.StudioService
;
...
@@ -139,4 +140,11 @@ public class StudioController {
...
@@ -139,4 +140,11 @@ public class StudioController {
public
Result
cancel
(
@RequestParam
Integer
clusterId
,
@RequestParam
String
jobId
)
{
public
Result
cancel
(
@RequestParam
Integer
clusterId
,
@RequestParam
String
jobId
)
{
return
Result
.
succeed
(
studioService
.
cancel
(
clusterId
,
jobId
),
"停止成功"
);
return
Result
.
succeed
(
studioService
.
cancel
(
clusterId
,
jobId
),
"停止成功"
);
}
}
/**
* 提交jar
*/
@PostMapping
(
"/submitJar"
)
public
Result
submitJar
(
@RequestBody
JsonNode
para
)
{
return
Result
.
succeed
(
studioService
.
submitJar
(
GatewayConfig
.
build
(
para
)),
"执行成功"
);
}
}
}
dlink-admin/src/main/java/com/dlink/service/StudioService.java
View file @
59c7b06d
...
@@ -5,6 +5,8 @@ import com.dlink.dto.StudioDDLDTO;
...
@@ -5,6 +5,8 @@ import com.dlink.dto.StudioDDLDTO;
import
com.dlink.dto.StudioExecuteDTO
;
import
com.dlink.dto.StudioExecuteDTO
;
import
com.dlink.explainer.ca.ColumnCANode
;
import
com.dlink.explainer.ca.ColumnCANode
;
import
com.dlink.explainer.ca.TableCANode
;
import
com.dlink.explainer.ca.TableCANode
;
import
com.dlink.gateway.GatewayConfig
;
import
com.dlink.gateway.result.GatewayResult
;
import
com.dlink.job.JobResult
;
import
com.dlink.job.JobResult
;
import
com.dlink.result.IResult
;
import
com.dlink.result.IResult
;
import
com.dlink.result.SelectResult
;
import
com.dlink.result.SelectResult
;
...
@@ -48,4 +50,6 @@ public interface StudioService {
...
@@ -48,4 +50,6 @@ public interface StudioService {
List
<
JsonNode
>
listJobs
(
Integer
clusterId
);
List
<
JsonNode
>
listJobs
(
Integer
clusterId
);
boolean
cancel
(
Integer
clusterId
,
String
jobId
);
boolean
cancel
(
Integer
clusterId
,
String
jobId
);
GatewayResult
submitJar
(
GatewayConfig
config
);
}
}
dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java
View file @
59c7b06d
...
@@ -8,6 +8,9 @@ import com.dlink.dto.StudioExecuteDTO;
...
@@ -8,6 +8,9 @@ import com.dlink.dto.StudioExecuteDTO;
import
com.dlink.explainer.ca.CABuilder
;
import
com.dlink.explainer.ca.CABuilder
;
import
com.dlink.explainer.ca.ColumnCANode
;
import
com.dlink.explainer.ca.ColumnCANode
;
import
com.dlink.explainer.ca.TableCANode
;
import
com.dlink.explainer.ca.TableCANode
;
import
com.dlink.gateway.Gateway
;
import
com.dlink.gateway.GatewayConfig
;
import
com.dlink.gateway.result.GatewayResult
;
import
com.dlink.job.JobConfig
;
import
com.dlink.job.JobConfig
;
import
com.dlink.job.JobManager
;
import
com.dlink.job.JobManager
;
import
com.dlink.job.JobResult
;
import
com.dlink.job.JobResult
;
...
@@ -159,4 +162,9 @@ public class StudioServiceImpl implements StudioService {
...
@@ -159,4 +162,9 @@ public class StudioServiceImpl implements StudioService {
Asserts
.
checkNotNull
(
cluster
,
"该集群不存在"
);
Asserts
.
checkNotNull
(
cluster
,
"该集群不存在"
);
return
FlinkAPI
.
build
(
cluster
.
getJobManagerHost
()).
stop
(
jobId
);
return
FlinkAPI
.
build
(
cluster
.
getJobManagerHost
()).
stop
(
jobId
);
}
}
@Override
public
GatewayResult
submitJar
(
GatewayConfig
config
)
{
return
Gateway
.
build
(
config
).
submitJar
();
}
}
}
dlink-app/pom.xml
0 → 100644
View file @
59c7b06d
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.3.2
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-app
</artifactId>
<properties>
<mainClass>
com.dlink.app.MainApp
</mainClass>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<flink.version>
1.12.5
</flink.version>
<slf4j.version>
1.7.7
</slf4j.version>
<log4j.version>
1.2.17
</log4j.version>
<scala.binary.version>
2.11
</scala.binary.version>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-core
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-java
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-clients_${scala.binary.version}
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-java_${scala.binary.version}
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-planner-blink_${scala.binary.version}
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-scala_${scala.binary.version}
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
<version>
1.7.25
</version>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-log4j12
</artifactId>
<version>
${slf4j.version}
</version>
</dependency>
<dependency>
<groupId>
log4j
</groupId>
<artifactId>
log4j
</artifactId>
<version>
${log4j.version}
</version>
</dependency>
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
<version>
8.0.21
</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/resources
</directory>
<includes>
<include>
*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.6.0
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
UTF-8
</encoding>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-assembly-plugin
</artifactId>
<version>
2.6
</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>
com.dlink.app.MainApp
</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>
make-assembly
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
dlink-app/src/main/java/com/dlink/app/MainApp.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
app
;
import
com.dlink.app.assertion.Asserts
;
import
com.dlink.app.db.DBConfig
;
import
com.dlink.app.executor.Executor
;
import
com.dlink.app.flinksql.FlinkSQLFactory
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
java.io.IOException
;
import
java.time.LocalDateTime
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* MainApp
*
* @author qiwenkai
* @since 2021/10/27 11:10
**/
public
class
MainApp
{
public
static
void
main
(
String
[]
args
)
throws
IOException
{
System
.
out
.
println
(
LocalDateTime
.
now
()
+
"任务开始"
);
ParameterTool
parameters
=
ParameterTool
.
fromArgs
(
args
);
String
id
=
parameters
.
get
(
"id"
,
null
);
if
(
Asserts
.
isNotNullString
(
id
))
{
Executor
.
build
().
submit
(
FlinkSQLFactory
.
getStatements
(
Integer
.
valueOf
(
id
),
DBConfig
.
build
(
parameters
)));
}
}
}
dlink-app/src/main/java/com/dlink/app/assertion/Asserts.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
app
.
assertion
;
import
java.util.Collection
;
import
java.util.Map
;
/**
* Asserts
*
* @author wenmo
* @since 2021/7/5 21:57
*/
public
class
Asserts
{
public
static
boolean
isNotNull
(
Object
object
){
return
object
!=
null
;
}
public
static
boolean
isNull
(
Object
object
){
return
object
==
null
;
}
public
static
boolean
isNullString
(
String
str
){
return
isNull
(
str
)||
""
.
equals
(
str
);
}
public
static
boolean
isNotNullString
(
String
str
){
return
!
isNullString
(
str
);
}
public
static
boolean
isEquals
(
String
str1
,
String
str2
){
if
(
isNull
(
str1
)&&
isNull
(
str2
)){
return
true
;
}
else
if
(
isNull
(
str1
)||
isNull
(
str2
)){
return
false
;
}
else
{
return
str1
.
equals
(
str2
);
}
}
public
static
boolean
isEqualsIgnoreCase
(
String
str1
,
String
str2
){
if
(
isNull
(
str1
)&&
isNull
(
str2
)){
return
true
;
}
else
if
(
isNull
(
str1
)||
isNull
(
str2
)){
return
false
;
}
else
{
return
str1
.
equalsIgnoreCase
(
str2
);
}
}
public
static
boolean
isNullCollection
(
Collection
collection
)
{
if
(
isNull
(
collection
)||
collection
.
size
()==
0
)
{
return
true
;
}
return
false
;
}
public
static
boolean
isNotNullCollection
(
Collection
collection
)
{
return
!
isNullCollection
(
collection
);
}
public
static
boolean
isNullMap
(
Map
map
)
{
if
(
isNull
(
map
)||
map
.
size
()==
0
)
{
return
true
;
}
return
false
;
}
public
static
boolean
isNotNullMap
(
Map
map
)
{
return
!
isNullMap
(
map
);
}
public
static
void
checkNull
(
String
key
,
String
msg
)
{
if
(
key
==
null
||
""
.
equals
(
key
))
{
throw
new
RuntimeException
(
msg
);
}
}
public
static
void
checkNotNull
(
Object
object
,
String
msg
)
{
if
(
isNull
(
object
))
{
throw
new
RuntimeException
(
msg
);
}
}
public
static
void
checkNullString
(
String
key
,
String
msg
)
{
if
(
isNull
(
key
)||
isEquals
(
""
,
key
))
{
throw
new
RuntimeException
(
msg
);
}
}
public
static
void
checkNullCollection
(
Collection
collection
,
String
msg
)
{
if
(
isNullCollection
(
collection
)){
throw
new
RuntimeException
(
msg
);
}
}
public
static
void
checkNullMap
(
Map
map
,
String
msg
)
{
if
(
isNullMap
(
map
)){
throw
new
RuntimeException
(
msg
);
}
}
}
dlink-app/src/main/java/com/dlink/app/constant/AppConstant.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
app
.
constant
;
/**
* AppConstant
*
* @author qiwenkai
* @since 2021/10/27 15:24
**/
public
class
AppConstant
{
public
static
final
String
FLINKSQL_SEPARATOR
=
";"
;
}
dlink-app/src/main/java/com/dlink/app/db/DBConfig.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
app
.
db
;
import
org.apache.flink.api.java.utils.ParameterTool
;
/**
* DBConfig
*
* @author qiwenkai
* @since 2021/10/27 14:46
**/
public
class
DBConfig
{
private
String
driver
;
private
String
url
;
private
String
username
;
private
String
password
;
public
DBConfig
(
String
driver
,
String
url
,
String
username
,
String
password
)
{
this
.
driver
=
driver
;
this
.
url
=
url
;
this
.
username
=
username
;
this
.
password
=
password
;
}
public
static
DBConfig
build
(
String
driver
,
String
url
,
String
username
,
String
password
){
return
new
DBConfig
(
driver
,
url
,
username
,
password
);
}
public
static
DBConfig
build
(
ParameterTool
parameters
){
return
new
DBConfig
(
parameters
.
get
(
"driver"
,
null
),
parameters
.
get
(
"url"
,
null
),
parameters
.
get
(
"username"
,
null
),
parameters
.
get
(
"password"
,
null
));
}
public
String
getDriver
()
{
return
driver
;
}
public
String
getUrl
()
{
return
url
;
}
public
String
getUsername
()
{
return
username
;
}
public
String
getPassword
()
{
return
password
;
}
@Override
public
String
toString
()
{
return
"DBConfig{"
+
"driver='"
+
driver
+
'\''
+
", url='"
+
url
+
'\''
+
", username='"
+
username
+
'\''
+
", password='"
+
password
+
'\''
+
'}'
;
}
}
dlink-app/src/main/java/com/dlink/app/db/DBUtil.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
app
.
db
;
import
java.io.IOException
;
import
java.sql.Connection
;
import
java.sql.DriverManager
;
import
java.sql.ResultSet
;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.time.LocalDateTime
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
/**
* DBUtil
*
* @author qiwenkai
* @since 2021/10/27 11:25
**/
public
class
DBUtil
{
private
static
Connection
getConnection
(
DBConfig
config
)
throws
IOException
{
Connection
conn
=
null
;
try
{
Class
.
forName
(
config
.
getDriver
());
conn
=
DriverManager
.
getConnection
(
config
.
getUrl
(),
config
.
getUsername
(),
config
.
getPassword
());
}
catch
(
SQLException
|
ClassNotFoundException
e
)
{
e
.
printStackTrace
();
close
(
conn
);
}
return
conn
;
}
private
static
void
close
(
Connection
conn
)
{
try
{
if
(
conn
!=
null
)
{
conn
.
close
();
}
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
}
public
static
String
getOneByID
(
String
sql
,
DBConfig
config
)
throws
SQLException
,
IOException
{
Connection
conn
=
getConnection
(
config
);
String
result
=
null
;
try
(
Statement
stmt
=
conn
.
createStatement
();
ResultSet
rs
=
stmt
.
executeQuery
(
sql
))
{
if
(
rs
.
next
())
{
result
=
rs
.
getString
(
1
);
}
}
close
(
conn
);
/*catch (SQLException e1) {
e1.printStackTrace();
String message = e1.getMessage();
System.err.println(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 异常,ID 为");
}*/
return
result
;
}
}
dlink-app/src/main/java/com/dlink/app/executor/Executor.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
app
.
executor
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment
;
import
org.apache.flink.table.api.internal.TableEnvironmentImpl
;
import
java.util.List
;
/**
* Executor
*
* @author qiwenkai
* @since 2021/10/27 15:52
**/
public
class
Executor
{
private
StreamExecutionEnvironment
environment
;
private
StreamTableEnvironment
stEnvironment
;
private
ExecutorSetting
executorSetting
;
private
SqlManager
sqlManager
;
public
static
Executor
build
(){
return
new
Executor
(
ExecutorSetting
.
DEFAULT
);
}
public
static
Executor
build
(
ExecutorSetting
setting
){
return
new
Executor
(
setting
);
}
private
Executor
(
ExecutorSetting
executorSetting
)
{
this
.
executorSetting
=
executorSetting
;
this
.
sqlManager
=
new
SqlManager
();
init
(
executorSetting
);
}
private
void
init
(
ExecutorSetting
setting
){
this
.
environment
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
this
.
stEnvironment
=
StreamTableEnvironment
.
create
(
this
.
environment
);
}
private
void
executeSql
(
String
statement
){
if
(
executorSetting
.
isUseSqlFragment
())
{
statement
=
sqlManager
.
parseVariable
(
statement
);
if
(
statement
.
length
()
>
0
&&
checkShowFragments
(
statement
)){
stEnvironment
.
executeSql
(
statement
);
}
}
else
{
stEnvironment
.
executeSql
(
statement
);
}
}
public
void
submit
(
List
<
String
>
statements
){
for
(
String
statement
:
statements
){
if
(
statement
==
null
||
""
.
equals
(
statement
.
trim
())){
continue
;
}
executeSql
(
statement
);
}
}
private
boolean
checkShowFragments
(
String
sql
){
return
sqlManager
.
checkShowFragments
(
sql
);
}
}
dlink-app/src/main/java/com/dlink/app/executor/ExecutorSetting.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
app
.
executor
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
java.util.Map
;
/**
* ExecutorSetting
*
* @author wenmo
* @since 2021/5/25 13:43
**/
public
class
ExecutorSetting
{
private
Integer
checkpoint
;
private
Integer
parallelism
;
private
boolean
useSqlFragment
;
private
String
savePointPath
;
private
String
jobName
;
private
Map
<
String
,
String
>
config
;
public
static
final
ExecutorSetting
DEFAULT
=
new
ExecutorSetting
(
0
,
1
,
true
);
public
ExecutorSetting
(
boolean
useSqlFragment
)
{
this
.
useSqlFragment
=
useSqlFragment
;
}
public
ExecutorSetting
(
Integer
checkpoint
)
{
this
.
checkpoint
=
checkpoint
;
}
public
ExecutorSetting
(
Integer
checkpoint
,
boolean
useSqlFragment
)
{
this
.
checkpoint
=
checkpoint
;
this
.
useSqlFragment
=
useSqlFragment
;
}
public
ExecutorSetting
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
)
{
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
useSqlFragment
=
useSqlFragment
;
}
public
ExecutorSetting
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
String
savePointPath
,
String
jobName
)
{
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
useSqlFragment
=
useSqlFragment
;
this
.
savePointPath
=
savePointPath
;
this
.
jobName
=
jobName
;
}
public
ExecutorSetting
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
String
savePointPath
)
{
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
useSqlFragment
=
useSqlFragment
;
this
.
savePointPath
=
savePointPath
;
}
public
ExecutorSetting
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
String
savePointPath
,
String
jobName
,
Map
<
String
,
String
>
config
)
{
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
this
.
useSqlFragment
=
useSqlFragment
;
this
.
savePointPath
=
savePointPath
;
this
.
jobName
=
jobName
;
this
.
config
=
config
;
}
public
Integer
getCheckpoint
()
{
return
checkpoint
;
}
public
Integer
getParallelism
()
{
return
parallelism
;
}
public
boolean
isUseSqlFragment
()
{
return
useSqlFragment
;
}
public
String
getSavePointPath
()
{
return
savePointPath
;
}
public
String
getJobName
()
{
return
jobName
;
}
public
Map
<
String
,
String
>
getConfig
()
{
return
config
;
}
}
dlink-app/src/main/java/com/dlink/app/executor/SqlManager.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
app
.
executor
;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.table.api.ExpressionParserException
;
import
org.apache.flink.table.catalog.exceptions.CatalogException
;
import
org.apache.flink.util.StringUtils
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
static
java
.
lang
.
String
.
format
;
import
static
org
.
apache
.
flink
.
util
.
Preconditions
.
checkArgument
;
import
static
org
.
apache
.
flink
.
util
.
Preconditions
.
checkNotNull
;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
public
final
class
SqlManager
{
private
Map
<
String
,
String
>
sqlFragments
;
static
final
String
SHOW_FRAGMENTS
=
"SHOW FRAGMENTS"
;
public
SqlManager
()
{
sqlFragments
=
new
HashMap
<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public
List
<
String
>
listSqlFragments
()
{
return
new
ArrayList
<>(
sqlFragments
.
keySet
());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public
void
registerSqlFragment
(
String
sqlFragmentName
,
String
sqlFragment
)
{
checkArgument
(
!
StringUtils
.
isNullOrWhitespaceOnly
(
sqlFragmentName
),
"sql fragment name cannot be null or empty."
);
checkNotNull
(
sqlFragment
,
"sql fragment cannot be null"
);
/*if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}*/
sqlFragments
.
put
(
sqlFragmentName
,
sqlFragment
);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public
void
unregisterSqlFragment
(
String
sqlFragmentName
,
boolean
ignoreIfNotExists
)
{
checkArgument
(
!
StringUtils
.
isNullOrWhitespaceOnly
(
sqlFragmentName
),
"sql fragmentName name cannot be null or empty."
);
if
(
sqlFragments
.
containsKey
(
sqlFragmentName
))
{
sqlFragments
.
remove
(
sqlFragmentName
);
}
else
if
(!
ignoreIfNotExists
)
{
throw
new
CatalogException
(
format
(
"The fragment of sql %s does not exist."
,
sqlFragmentName
));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public
String
getSqlFragment
(
String
sqlFragmentName
)
{
checkArgument
(
!
StringUtils
.
isNullOrWhitespaceOnly
(
sqlFragmentName
),
"sql fragmentName name cannot be null or empty."
);
if
(
sqlFragments
.
containsKey
(
sqlFragmentName
))
{
return
sqlFragments
.
get
(
sqlFragmentName
);
}
else
{
throw
new
CatalogException
(
format
(
"The fragment of sql %s does not exist."
,
sqlFragmentName
));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public
Map
<
String
,
String
>
getSqlFragment
()
{
return
sqlFragments
;
}
public
Iterator
getSqlFragmentsIterator
()
{
return
sqlFragments
.
entrySet
().
iterator
();
}
public
boolean
checkShowFragments
(
String
sql
){
return
SHOW_FRAGMENTS
.
equals
(
sql
.
trim
().
toUpperCase
());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public
String
parseVariable
(
String
statement
)
{
if
(
statement
==
null
||
""
.
equals
(
statement
))
{
return
statement
;
}
String
[]
strs
=
statement
.
split
(
";"
);
StringBuilder
sb
=
new
StringBuilder
();
for
(
int
i
=
0
;
i
<
strs
.
length
;
i
++)
{
String
str
=
strs
[
i
].
trim
();
if
(
str
.
length
()
==
0
)
{
continue
;
}
if
(
str
.
contains
(
":="
))
{
String
[]
strs2
=
str
.
split
(
":="
);
if
(
strs2
.
length
>=
2
)
{
if
(
strs2
[
0
].
length
()
==
0
)
{
throw
new
ExpressionParserException
(
"Illegal variable name."
);
}
String
valueString
=
str
.
substring
(
str
.
indexOf
(
":="
)
+
2
);
this
.
registerSqlFragment
(
strs2
[
0
],
replaceVariable
(
valueString
));
}
else
{
throw
new
ExpressionParserException
(
"Illegal variable definition."
);
}
}
else
{
sb
.
append
(
replaceVariable
(
str
));
}
}
return
sb
.
toString
();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private
String
replaceVariable
(
String
statement
)
{
String
pattern
=
"\\$\\{(.+?)\\}"
;
Pattern
p
=
Pattern
.
compile
(
pattern
);
Matcher
m
=
p
.
matcher
(
statement
);
StringBuffer
sb
=
new
StringBuffer
();
while
(
m
.
find
())
{
String
key
=
m
.
group
(
1
);
String
value
=
this
.
getSqlFragment
(
key
);
m
.
appendReplacement
(
sb
,
value
==
null
?
""
:
value
);
}
m
.
appendTail
(
sb
);
return
sb
.
toString
();
}
}
dlink-app/src/main/java/com/dlink/app/flinksql/FlinkSQLFactory.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
app
.
flinksql
;
import
com.dlink.app.constant.AppConstant
;
import
com.dlink.app.db.DBConfig
;
import
com.dlink.app.db.DBUtil
;
import
java.io.IOException
;
import
java.sql.SQLException
;
import
java.time.LocalDateTime
;
import
java.util.Arrays
;
import
java.util.List
;
/**
* FlinkSQLFactory
*
* @author qiwenkai
* @since 2021/10/27 11:15
**/
public
class
FlinkSQLFactory
{
private
static
String
getQuerySQL
(
Integer
id
)
throws
SQLException
{
if
(
id
==
null
)
{
throw
new
SQLException
(
"请指定任务ID"
);
}
return
"select statement from dlink_task_statement where id = "
+
id
;
}
private
static
String
getFlinkSQLStatement
(
Integer
id
,
DBConfig
config
)
{
String
statement
=
""
;
try
{
statement
=
DBUtil
.
getOneByID
(
getQuerySQL
(
id
),
config
);
}
catch
(
IOException
|
SQLException
e
)
{
e
.
printStackTrace
();
System
.
err
.
println
(
LocalDateTime
.
now
().
toString
()
+
" --> 获取 FlinkSQL 异常,ID 为"
+
id
);
System
.
err
.
println
(
LocalDateTime
.
now
().
toString
()
+
"连接信息为:"
+
config
.
toString
()
);
System
.
err
.
println
(
LocalDateTime
.
now
().
toString
()
+
"异常信息为:"
+
e
.
getMessage
()
);
}
return
statement
;
}
public
static
List
<
String
>
getStatements
(
Integer
id
,
DBConfig
config
){
return
Arrays
.
asList
(
getFlinkSQLStatement
(
id
,
config
).
split
(
AppConstant
.
FLINKSQL_SEPARATOR
));
}
}
dlink-assembly/src/main/assembly/package.xml
View file @
59c7b06d
...
@@ -150,5 +150,13 @@
...
@@ -150,5 +150,13 @@
<include>
clickhouse-jdbc-*.jar
</include>
<include>
clickhouse-jdbc-*.jar
</include>
</includes>
</includes>
</fileSet>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/dlink-gateway/target
</directory>
<outputDirectory>
lib
</outputDirectory>
<includes>
<include>
dlink-gateway-${project.version}.jar
</include>
</includes>
</fileSet>
</fileSets>
</fileSets>
</assembly>
</assembly>
\ No newline at end of file
dlink-client/dlink-client-1.12/pom.xml
View file @
59c7b06d
...
@@ -44,6 +44,11 @@
...
@@ -44,6 +44,11 @@
</exclusions>
</exclusions>
<version>
${flink.version}
</version>
<version>
${flink.version}
</version>
</dependency>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-yarn_2.11
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
<artifactId>
slf4j-api
</artifactId>
...
...
dlink-executor/pom.xml
View file @
59c7b06d
...
@@ -39,21 +39,21 @@
...
@@ -39,21 +39,21 @@
<artifactId>
junit
</artifactId>
<artifactId>
junit
</artifactId>
<scope>
provided
</scope>
<scope>
provided
</scope>
</dependency>
</dependency>
<
!--<
dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.12
</artifactId>
<artifactId>
dlink-client-1.12
</artifactId>
<!–<scope>provided</scope>–>
<!--<scope>provided</scope>-->
</dependency>
</dependency>
<dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-connector-jdbc-1.12
</artifactId>
<artifactId>
dlink-connector-jdbc-1.12
</artifactId>
<!–<scope>provided</scope>–>
<!--<scope>provided</scope>-->
</dependency>
</dependency>
<dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-function
</artifactId>
<artifactId>
dlink-function
</artifactId>
<!–<scope>provided</scope>–>
<!--<scope>provided</scope>-->
</dependency>
-->
</dependency>
</dependencies>
</dependencies>
</project>
</project>
\ No newline at end of file
dlink-executor/src/main/java/com/dlink/executor/AbstractExecutor.java
deleted
100644 → 0
View file @
8ee08a32
package
com
.
dlink
.
executor
;
import
com.dlink.assertion.Asserts
;
/**
* AbstractExecutor
*
* @author wenmo
* @since 2021/10/22 11:19
**/
public
abstract
class
AbstractExecutor
implements
Executor
{
protected
EnvironmentSetting
environmentSetting
;
protected
ExecutorSetting
executorSetting
;
public
Executor
setEnvironmentSetting
(
EnvironmentSetting
setting
)
{
this
.
environmentSetting
=
setting
;
return
this
;
}
public
EnvironmentSetting
getEnvironmentSetting
()
{
return
environmentSetting
;
}
public
ExecutorSetting
getExecutorSetting
()
{
return
executorSetting
;
}
public
void
setExecutorSetting
(
ExecutorSetting
executorSetting
)
{
this
.
executorSetting
=
executorSetting
;
}
public
boolean
canHandle
(
String
version
)
{
return
Asserts
.
isEqualsIgnoreCase
(
getVersion
(),
version
);
}
}
dlink-executor/src/main/java/com/dlink/executor/EnvironmentSetting.java
View file @
59c7b06d
...
@@ -18,7 +18,6 @@ public class EnvironmentSetting {
...
@@ -18,7 +18,6 @@ public class EnvironmentSetting {
private
String
host
;
private
String
host
;
private
int
port
;
private
int
port
;
private
boolean
useRemote
;
private
boolean
useRemote
;
private
String
version
;
public
static
final
EnvironmentSetting
LOCAL
=
new
EnvironmentSetting
(
false
);
public
static
final
EnvironmentSetting
LOCAL
=
new
EnvironmentSetting
(
false
);
public
EnvironmentSetting
(
boolean
useRemote
)
{
public
EnvironmentSetting
(
boolean
useRemote
)
{
...
@@ -31,20 +30,13 @@ public class EnvironmentSetting {
...
@@ -31,20 +30,13 @@ public class EnvironmentSetting {
this
.
useRemote
=
true
;
this
.
useRemote
=
true
;
}
}
public
EnvironmentSetting
(
String
host
,
int
port
,
boolean
useRemote
,
String
version
)
{
this
.
host
=
host
;
this
.
port
=
port
;
this
.
useRemote
=
useRemote
;
this
.
version
=
version
;
}
public
static
EnvironmentSetting
build
(
String
address
){
public
static
EnvironmentSetting
build
(
String
address
){
Asserts
.
checkNull
(
address
,
"Flink 地址不能为空"
);
Asserts
.
checkNull
(
address
,
"Flink 地址不能为空"
);
String
[]
strs
=
address
.
split
(
NetConstant
.
COLON
);
String
[]
strs
=
address
.
split
(
NetConstant
.
COLON
);
if
(
strs
.
length
>=
2
)
{
if
(
strs
.
length
>=
2
)
{
return
new
EnvironmentSetting
(
strs
[
0
],
Integer
.
parseInt
(
strs
[
1
]));
return
new
EnvironmentSetting
(
strs
[
0
],
Integer
.
parseInt
(
strs
[
1
]));
}
else
{
}
else
{
return
new
EnvironmentSetting
(
strs
[
0
],
FlinkConstant
.
FLINK_REST_DEFAULT_PORT
);
return
new
EnvironmentSetting
(
strs
[
0
],
FlinkConstant
.
FLINK_REST_DEFAULT_PORT
);
}
}
}
}
...
...
dlink-executor/src/main/java/com/dlink/executor/Executor.java
View file @
59c7b06d
package
com
.
dlink
.
executor
;
package
com
.
dlink
.
executor
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.executor.custom.CustomTableEnvironmentImpl
;
import
com.dlink.exception.FlinkException
;
import
com.dlink.result.SqlExplainResult
;
import
sun.misc.Service
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.JobExecutionResult
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.Table
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.functions.ScalarFunction
;
import
org.apache.flink.table.functions.UserDefinedFunction
;
import
java.util.Iterator
;
import
java.util.Map
;
import
java.util.Optional
;
/**
/**
* Executor
* Executor
*
* @author wenmo
* @author wenmo
* @since 2021/5/25 13:39
* @since 2021/10/22 11:01
**/
**/
public
interface
Executor
{
public
abstract
class
Executor
{
static
Optional
<
Executor
>
get
(
EnvironmentSetting
setting
)
{
Asserts
.
checkNotNull
(
setting
,
"Flink 执行配置不能为空"
);
protected
StreamExecutionEnvironment
environment
;
Iterator
<
Executor
>
providers
=
Service
.
providers
(
Executor
.
class
);
protected
CustomTableEnvironmentImpl
stEnvironment
;
while
(
providers
.
hasNext
())
{
protected
EnvironmentSetting
environmentSetting
;
Executor
executor
=
providers
.
next
();
protected
ExecutorSetting
executorSetting
;
if
(
executor
.
canHandle
(
setting
.
getVersion
()))
{
return
Optional
.
of
(
executor
.
setEnvironmentSetting
(
setting
));
public
static
Executor
build
(){
return
new
LocalStreamExecutor
(
ExecutorSetting
.
DEFAULT
);
}
public
static
Executor
build
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
){
if
(
environmentSetting
.
isUseRemote
()){
return
buildRemoteExecutor
(
environmentSetting
,
executorSetting
);
}
else
{
return
buildLocalExecutor
(
executorSetting
);
}
}
public
static
Executor
buildLocalExecutor
(
ExecutorSetting
executorSetting
){
return
new
LocalStreamExecutor
(
executorSetting
);
}
public
static
Executor
buildRemoteExecutor
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
){
environmentSetting
.
setUseRemote
(
true
);
return
new
RemoteStreamExecutor
(
environmentSetting
,
executorSetting
);
}
public
StreamExecutionEnvironment
getEnvironment
(){
return
environment
;
}
public
CustomTableEnvironmentImpl
getCustomTableEnvironmentImpl
(){
return
stEnvironment
;
}
public
ExecutorSetting
getExecutorSetting
(){
return
executorSetting
;
}
public
EnvironmentSetting
getEnvironmentSetting
(){
return
environmentSetting
;
}
protected
void
init
(){
initEnvironment
();
initStreamExecutionEnvironment
();
}
public
void
update
(
ExecutorSetting
executorSetting
){
updateEnvironment
(
executorSetting
);
updateStreamExecutionEnvironment
(
executorSetting
);
}
private
void
initEnvironment
(){
if
(
executorSetting
.
getCheckpoint
()!=
null
&&
executorSetting
.
getCheckpoint
()>
0
){
environment
.
enableCheckpointing
(
executorSetting
.
getCheckpoint
());
}
if
(
executorSetting
.
getParallelism
()!=
null
&&
executorSetting
.
getParallelism
()>
0
){
environment
.
setParallelism
(
executorSetting
.
getParallelism
());
}
}
private
void
updateEnvironment
(
ExecutorSetting
executorSetting
){
if
(
executorSetting
.
getCheckpoint
()!=
null
&&
executorSetting
.
getCheckpoint
()>
0
){
environment
.
enableCheckpointing
(
executorSetting
.
getCheckpoint
());
}
if
(
executorSetting
.
getParallelism
()!=
null
&&
executorSetting
.
getParallelism
()>
0
){
environment
.
setParallelism
(
executorSetting
.
getParallelism
());
}
}
private
void
initStreamExecutionEnvironment
(){
stEnvironment
=
CustomTableEnvironmentImpl
.
create
(
environment
);
if
(
executorSetting
.
isUseSqlFragment
()){
stEnvironment
.
useSqlFragment
();
}
else
{
stEnvironment
.
unUseSqlFragment
();
}
if
(
executorSetting
.
getJobName
()!=
null
&&!
""
.
equals
(
executorSetting
.
getJobName
())){
stEnvironment
.
getConfig
().
getConfiguration
().
setString
(
"pipeline.name"
,
executorSetting
.
getJobName
());
}
if
(
executorSetting
.
getConfig
()!=
null
){
for
(
Map
.
Entry
<
String
,
String
>
entry
:
executorSetting
.
getConfig
().
entrySet
())
{
stEnvironment
.
getConfig
().
getConfiguration
().
setString
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
}
private
void
updateStreamExecutionEnvironment
(
ExecutorSetting
executorSetting
){
copyCatalog
();
if
(
executorSetting
.
isUseSqlFragment
()){
stEnvironment
.
useSqlFragment
();
}
else
{
stEnvironment
.
unUseSqlFragment
();
}
if
(
executorSetting
.
getJobName
()!=
null
&&!
""
.
equals
(
executorSetting
.
getJobName
())){
stEnvironment
.
getConfig
().
getConfiguration
().
setString
(
"pipeline.name"
,
executorSetting
.
getJobName
());
}
if
(
executorSetting
.
getConfig
()!=
null
){
for
(
Map
.
Entry
<
String
,
String
>
entry
:
executorSetting
.
getConfig
().
entrySet
())
{
stEnvironment
.
getConfig
().
getConfiguration
().
setString
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
}
}
return
Optional
.
empty
();
}
}
static
Executor
build
(
EnvironmentSetting
config
)
{
private
void
copyCatalog
(){
Optional
<
Executor
>
optionalExecutor
=
Executor
.
get
(
config
);
String
[]
catalogs
=
stEnvironment
.
listCatalogs
();
if
(!
optionalExecutor
.
isPresent
())
{
CustomTableEnvironmentImpl
newstEnvironment
=
CustomTableEnvironmentImpl
.
create
(
environment
);
throw
new
FlinkException
(
"不支持 Flink 版本【"
+
config
.
getVersion
()
+
"】"
);
for
(
int
i
=
0
;
i
<
catalogs
.
length
;
i
++)
{
if
(
stEnvironment
.
getCatalog
(
catalogs
[
i
]).
isPresent
())
{
newstEnvironment
.
getCatalogManager
().
unregisterCatalog
(
catalogs
[
i
],
true
);
newstEnvironment
.
registerCatalog
(
catalogs
[
i
],
stEnvironment
.
getCatalog
(
catalogs
[
i
]).
get
());
}
}
}
return
optionalExecutor
.
get
()
;
stEnvironment
=
newstEnvironment
;
}
}
Executor
setEnvironmentSetting
(
EnvironmentSetting
setting
);
public
JobExecutionResult
execute
(
String
jobName
)
throws
Exception
{
return
stEnvironment
.
execute
(
jobName
);
}
EnvironmentSetting
getEnvironmentSetting
();
public
TableResult
executeSql
(
String
statement
){
return
stEnvironment
.
executeSql
(
statement
);
}
boolean
canHandle
(
String
type
);
public
Table
sqlQuery
(
String
statement
){
return
stEnvironment
.
sqlQuery
(
statement
);
}
String
getVersion
();
public
String
explainSql
(
String
statement
,
ExplainDetail
...
extraDetails
){
return
stEnvironment
.
explainSql
(
statement
,
extraDetails
);
}
Executor
build
();
public
SqlExplainResult
explainSqlRecord
(
String
statement
,
ExplainDetail
...
extraDetails
){
return
stEnvironment
.
explainSqlRecord
(
statement
,
extraDetails
);
}
Executor
build
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
);
public
ObjectNode
getStreamGraph
(
String
statement
){
return
stEnvironment
.
getStreamGraph
(
statement
);
}
Executor
buildLocalExecutor
();
public
void
registerFunction
(
String
name
,
ScalarFunction
function
){
stEnvironment
.
registerFunction
(
name
,
function
);
}
Executor
buildRemoteExecutor
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
);
public
void
createTemporarySystemFunction
(
String
name
,
Class
<?
extends
UserDefinedFunction
>
var2
){
stEnvironment
.
createTemporarySystemFunction
(
name
,
var2
);
}
public
CatalogManager
getCatalogManager
(){
return
stEnvironment
.
getCatalogManager
();
}
}
}
dlink-executor/src/main/java/com/dlink/executor/LocalStreamExecutor.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
executor
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
/**
* LocalStreamExecuter
*
* @author wenmo
* @since 2021/5/25 13:48
**/
public
class
LocalStreamExecutor
extends
Executor
{
public
LocalStreamExecutor
(
ExecutorSetting
executorSetting
)
{
this
.
executorSetting
=
executorSetting
;
this
.
environment
=
StreamExecutionEnvironment
.
createLocalEnvironment
();
init
();
}
}
dlink-executor/src/main/java/com/dlink/executor/RemoteStreamExecutor.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
executor
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
/**
* RemoteStreamExecutor
*
* @author wenmo
* @since 2021/5/25 14:05
**/
public
class
RemoteStreamExecutor
extends
Executor
{
public
RemoteStreamExecutor
(
EnvironmentSetting
environmentSetting
,
ExecutorSetting
executorSetting
)
{
this
.
environmentSetting
=
environmentSetting
;
this
.
executorSetting
=
executorSetting
;
this
.
environment
=
StreamExecutionEnvironment
.
createRemoteEnvironment
(
environmentSetting
.
getHost
(),
environmentSetting
.
getPort
());
init
();
}
}
dlink-gateway/pom.xml
0 → 100644
View file @
59c7b06d
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.3.2
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-gateway
</artifactId>
<properties>
<java.version>
1.8
</java.version>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-common
</artifactId>
</dependency>
<dependency>
<groupId>
com.fasterxml.jackson.core
</groupId>
<artifactId>
jackson-annotations
</artifactId>
</dependency>
<dependency>
<groupId>
com.fasterxml.jackson.core
</groupId>
<artifactId>
jackson-databind
</artifactId>
</dependency>
<dependency>
<groupId>
cn.hutool
</groupId>
<artifactId>
hutool-all
</artifactId>
</dependency>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.12
</artifactId>
<scope>
provided
</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
dlink-gateway/src/main/java/com/dlink/gateway/AbstractGateway.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
;
import
org.apache.flink.configuration.Configuration
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* AbstractGateway
*
* @author wenmo
* @since 2021/10/29
**/
public
abstract
class
AbstractGateway
implements
Gateway
{
protected
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
AbstractGateway
.
class
);
protected
GatewayConfig
config
;
protected
Configuration
configuration
;
public
AbstractGateway
()
{
}
public
AbstractGateway
(
GatewayConfig
config
)
{
this
.
config
=
config
;
}
@Override
public
boolean
canHandle
(
GatewayType
type
)
{
return
type
==
getType
();
}
@Override
public
void
setGatewayConfig
(
GatewayConfig
config
)
{
this
.
config
=
config
;
}
protected
abstract
void
init
();
}
dlink-gateway/src/main/java/com/dlink/gateway/Gateway.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.gateway.exception.GatewayException
;
import
com.dlink.gateway.result.GatewayResult
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
sun.misc.Service
;
import
java.util.Iterator
;
import
java.util.Optional
;
/**
* Submiter
*
* @author wenmo
* @since 2021/10/29
**/
public
interface
Gateway
{
static
Optional
<
Gateway
>
get
(
GatewayConfig
config
){
Asserts
.
checkNotNull
(
config
,
"配置不能为空"
);
Iterator
<
Gateway
>
providers
=
Service
.
providers
(
Gateway
.
class
);
while
(
providers
.
hasNext
())
{
Gateway
gateway
=
providers
.
next
();
if
(
gateway
.
canHandle
(
config
.
getType
())){
gateway
.
setGatewayConfig
(
config
);
return
Optional
.
of
(
gateway
);
}
}
return
Optional
.
empty
();
}
static
Gateway
build
(
GatewayConfig
config
){
Optional
<
Gateway
>
optionalGateway
=
Gateway
.
get
(
config
);
if
(!
optionalGateway
.
isPresent
()){
throw
new
GatewayException
(
"不支持 Flink Gateway 类型【"
+
config
.
getType
().
getLongValue
()+
"】,请添加扩展包"
);
}
return
optionalGateway
.
get
();
}
boolean
canHandle
(
GatewayType
type
);
GatewayType
getType
();
void
setGatewayConfig
(
GatewayConfig
config
);
GatewayResult
submitJobGraph
(
JobGraph
jobGraph
);
GatewayResult
submitJar
();
}
dlink-gateway/src/main/java/com/dlink/gateway/GatewayConfig.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
;
import
com.fasterxml.jackson.databind.JsonNode
;
import
lombok.Getter
;
import
lombok.Setter
;
import
java.util.Arrays
;
/**
* SubmitConfig
*
* @author wenmo
* @since 2021/10/29
**/
@Getter
@Setter
public
class
GatewayConfig
{
private
GatewayType
type
;
private
String
jobName
;
private
String
configDir
;
private
String
userJarPath
;
private
String
[]
userJarParas
;
private
String
userJarMainAppClass
;
private
String
savePoint
;
public
GatewayConfig
()
{
}
public
GatewayConfig
(
GatewayType
type
,
String
jobName
,
String
configDir
,
String
userJarPath
,
String
[]
userJarParas
,
String
userJarMainAppClass
,
String
savePoint
)
{
this
.
type
=
type
;
this
.
jobName
=
jobName
;
this
.
configDir
=
configDir
;
this
.
userJarPath
=
userJarPath
;
this
.
userJarParas
=
userJarParas
;
this
.
userJarMainAppClass
=
userJarMainAppClass
;
this
.
savePoint
=
savePoint
;
}
public
static
GatewayConfig
build
(
JsonNode
para
){
GatewayConfig
config
=
new
GatewayConfig
();
config
.
setType
(
GatewayType
.
get
(
para
.
get
(
"type"
).
asText
()));
if
(
para
.
has
(
"jobName"
))
{
config
.
setJobName
(
para
.
get
(
"jobName"
).
asText
());
}
if
(
para
.
has
(
"configDir"
))
{
config
.
setConfigDir
(
para
.
get
(
"configDir"
).
asText
());
}
if
(
para
.
has
(
"userJarPath"
))
{
config
.
setUserJarPath
(
para
.
get
(
"userJarPath"
).
asText
());
}
if
(
para
.
has
(
"userJarParas"
))
{
config
.
setUserJarParas
(
para
.
get
(
"userJarParas"
).
asText
().
split
(
"\\s+"
));
}
if
(
para
.
has
(
"userJarMainAppClass"
))
{
config
.
setUserJarMainAppClass
(
para
.
get
(
"userJarMainAppClass"
).
asText
());
}
if
(
para
.
has
(
"savePoint"
))
{
config
.
setSavePoint
(
para
.
get
(
"savePoint"
).
asText
());
}
return
config
;
}
@Override
public
String
toString
()
{
return
"GatewayConfig{"
+
"type="
+
type
+
", jobName='"
+
jobName
+
'\''
+
", configDir='"
+
configDir
+
'\''
+
", userJarPath='"
+
userJarPath
+
'\''
+
", userJarParas="
+
Arrays
.
toString
(
userJarParas
)
+
", userJarMainAppClass='"
+
userJarMainAppClass
+
'\''
+
", savePoint='"
+
savePoint
+
'\''
+
'}'
;
}
}
dlink-gateway/src/main/java/com/dlink/gateway/GatewayType.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
;
import
com.dlink.assertion.Asserts
;
/**
* SubmitType
*
* @author wenmo
* @since 2021/10/29
**/
public
enum
GatewayType
{
YARN_APPLICATION
(
"ya"
,
"yarn-application"
);
private
String
value
;
private
String
longValue
;
GatewayType
(
String
value
,
String
longValue
){
this
.
value
=
value
;
this
.
longValue
=
longValue
;
}
public
String
getValue
()
{
return
value
;
}
public
String
getLongValue
()
{
return
longValue
;
}
public
static
GatewayType
get
(
String
value
){
for
(
GatewayType
type
:
GatewayType
.
values
())
{
if
(
Asserts
.
isEquals
(
type
.
getValue
(),
value
)||
Asserts
.
isEquals
(
type
.
getLongValue
(),
value
)){
return
type
;
}
}
return
GatewayType
.
YARN_APPLICATION
;
}
}
dlink-gateway/src/main/java/com/dlink/gateway/exception/GatewayException.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
.
exception
;
/**
* GatewayException
*
* @author wenmo
* @since 2021/10/29
**/
public
class
GatewayException
extends
RuntimeException
{
public
GatewayException
(
String
message
,
Throwable
cause
)
{
super
(
message
,
cause
);
}
public
GatewayException
(
String
message
)
{
super
(
message
);
}
}
\ No newline at end of file
dlink-gateway/src/main/java/com/dlink/gateway/result/AbstractGatewayResult.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
.
result
;
import
com.dlink.gateway.GatewayType
;
import
lombok.Getter
;
import
lombok.Setter
;
import
java.time.LocalDateTime
;
/**
* AbstractGatewayResult
*
* @author qiwenkai
* @since 2021/10/29 15:44
**/
@Setter
@Getter
public
abstract
class
AbstractGatewayResult
implements
GatewayResult
{
protected
String
jobId
;
protected
GatewayType
type
;
protected
String
savePointPath
;
protected
LocalDateTime
startTime
;
protected
LocalDateTime
endTime
;
protected
boolean
isSuccess
;
protected
String
exceptionMsg
;
public
AbstractGatewayResult
(
GatewayType
type
,
LocalDateTime
startTime
)
{
this
.
type
=
type
;
this
.
startTime
=
startTime
;
}
public
AbstractGatewayResult
(
String
jobId
,
String
savePointPath
,
LocalDateTime
startTime
,
LocalDateTime
endTime
,
boolean
isSuccess
,
String
exceptionMsg
)
{
this
.
jobId
=
jobId
;
this
.
savePointPath
=
savePointPath
;
this
.
startTime
=
startTime
;
this
.
endTime
=
endTime
;
this
.
isSuccess
=
isSuccess
;
this
.
exceptionMsg
=
exceptionMsg
;
}
public
void
success
(){
this
.
isSuccess
=
true
;
this
.
endTime
=
LocalDateTime
.
now
();
}
public
void
fail
(
String
error
){
this
.
isSuccess
=
false
;
this
.
endTime
=
LocalDateTime
.
now
();
this
.
exceptionMsg
=
error
;
}
}
dlink-gateway/src/main/java/com/dlink/gateway/result/GatewayResult.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
.
result
;
/**
* GatewayResult
*
* @author qiwenkai
* @since 2021/10/29 15:39
**/
public
interface
GatewayResult
{
}
dlink-gateway/src/main/java/com/dlink/gateway/result/YarnResult.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
.
result
;
import
com.dlink.gateway.GatewayType
;
import
lombok.Getter
;
import
lombok.Setter
;
import
java.time.LocalDateTime
;
/**
* YarnResult
*
* @author qiwenkai
* @since 2021/10/29 15:49
**/
@Getter
@Setter
public
class
YarnResult
extends
AbstractGatewayResult
{
private
String
appId
;
private
String
webURL
;
public
YarnResult
(
GatewayType
type
,
LocalDateTime
startTime
)
{
super
(
type
,
startTime
);
}
public
YarnResult
(
String
appId
,
String
jobId
,
String
savePointPath
,
LocalDateTime
startTime
,
LocalDateTime
endTime
,
boolean
isSuccess
,
String
exceptionMsg
)
{
super
(
jobId
,
savePointPath
,
startTime
,
endTime
,
isSuccess
,
exceptionMsg
);
this
.
appId
=
appId
;
}
public
static
YarnResult
build
(
GatewayType
type
){
return
new
YarnResult
(
type
,
LocalDateTime
.
now
());
}
}
dlink-gateway/src/main/java/com/dlink/gateway/yarn/YarnApplicationGateway.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
.
yarn
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.gateway.GatewayConfig
;
import
com.dlink.gateway.GatewayType
;
import
com.dlink.gateway.result.GatewayResult
;
import
com.dlink.gateway.result.YarnResult
;
import
org.apache.flink.client.deployment.ClusterClientFactory
;
import
org.apache.flink.client.deployment.ClusterSpecification
;
import
org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
;
import
org.apache.flink.client.deployment.application.ApplicationConfiguration
;
import
org.apache.flink.client.program.ClusterClient
;
import
org.apache.flink.client.program.ClusterClientProvider
;
import
org.apache.flink.configuration.DeploymentOptions
;
import
org.apache.flink.configuration.GlobalConfiguration
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.SavepointConfigOptions
;
import
org.apache.flink.yarn.YarnClusterDescriptor
;
import
org.apache.flink.yarn.configuration.YarnConfigOptions
;
import
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
;
import
org.apache.hadoop.yarn.api.records.ApplicationId
;
import
java.util.Collections
;
/**
* YarnApplicationGateway
*
* @author wenmo
* @since 2021/10/29
**/
public
class
YarnApplicationGateway
extends
YarnGateway
{
public
YarnApplicationGateway
(
GatewayConfig
config
)
{
super
(
config
);
}
public
YarnApplicationGateway
()
{
}
@Override
public
GatewayType
getType
()
{
return
GatewayType
.
YARN_APPLICATION
;
}
@Override
public
void
init
()
{
configuration
=
GlobalConfiguration
.
loadConfiguration
(
config
.
getConfigDir
());
configuration
.
set
(
DeploymentOptions
.
TARGET
,
getType
().
getLongValue
());
if
(
Asserts
.
isNotNullString
(
config
.
getSavePoint
()))
{
configuration
.
setString
(
SavepointConfigOptions
.
SAVEPOINT_PATH
,
config
.
getSavePoint
());
}
clientServiceLoader
=
new
DefaultClusterClientServiceLoader
();
}
@Override
public
GatewayResult
submitJobGraph
(
JobGraph
jobGraph
)
{
init
();
YarnResult
result
=
YarnResult
.
build
(
getType
());
final
ClusterClientFactory
clientFactory
=
clientServiceLoader
.
getClusterClientFactory
(
configuration
);
try
(
final
YarnClusterDescriptor
clusterDescriptor
=
(
YarnClusterDescriptor
)
clientFactory
.
createClusterDescriptor
(
configuration
))
{
final
ClusterSpecification
clusterSpecification
=
clientFactory
.
getClusterSpecification
(
configuration
);
ClusterClientProvider
<
ApplicationId
>
clusterClientProvider
=
clusterDescriptor
.
deployInternal
(
clusterSpecification
,
config
.
getJobName
(),
YarnApplicationClusterEntryPoint
.
class
.
getName
(),
jobGraph
,
false
);
ClusterClient
<
ApplicationId
>
clusterClient
=
clusterClientProvider
.
getClusterClient
();
ApplicationId
applicationId
=
clusterClient
.
getClusterId
();
result
.
setAppId
(
applicationId
.
toString
());
result
.
setWebURL
(
clusterClient
.
getWebInterfaceURL
());
result
.
success
();
}
catch
(
Exception
e
){
e
.
printStackTrace
();
logger
.
error
(
e
.
getMessage
());
result
.
fail
(
e
.
getMessage
());
}
return
result
;
}
@Override
public
GatewayResult
submitJar
()
{
init
();
YarnResult
result
=
YarnResult
.
build
(
getType
());
logger
.
warn
(
config
.
toString
());
configuration
.
set
(
PipelineOptions
.
JARS
,
Collections
.
singletonList
(
config
.
getUserJarPath
()));
configuration
.
set
(
YarnConfigOptions
.
APPLICATION_NAME
,
config
.
getJobName
());
ApplicationConfiguration
appConfig
=
new
ApplicationConfiguration
(
config
.
getUserJarParas
(),
config
.
getUserJarMainAppClass
());
final
ClusterClientFactory
clientFactory
=
clientServiceLoader
.
getClusterClientFactory
(
configuration
);
try
(
final
YarnClusterDescriptor
clusterDescriptor
=
(
YarnClusterDescriptor
)
clientFactory
.
createClusterDescriptor
(
configuration
))
{
final
ClusterSpecification
clusterSpecification
=
clientFactory
.
getClusterSpecification
(
configuration
);
ClusterClientProvider
<
ApplicationId
>
clusterClientProvider
=
clusterDescriptor
.
deployApplicationCluster
(
clusterSpecification
,
appConfig
);
ClusterClient
<
ApplicationId
>
clusterClient
=
clusterClientProvider
.
getClusterClient
();
ApplicationId
applicationId
=
clusterClient
.
getClusterId
();
result
.
setAppId
(
applicationId
.
toString
());
result
.
setWebURL
(
clusterClient
.
getWebInterfaceURL
());
result
.
success
();
}
catch
(
Exception
e
){
e
.
printStackTrace
();
logger
.
error
(
e
.
getMessage
());
result
.
fail
(
e
.
getMessage
());
}
return
result
;
}
}
dlink-gateway/src/main/java/com/dlink/gateway/yarn/YarnGateway.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
.
yarn
;
import
com.dlink.gateway.AbstractGateway
;
import
com.dlink.gateway.GatewayConfig
;
import
org.apache.flink.client.deployment.ClusterClientFactory
;
import
org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
;
/**
* YarnSubmiter
*
* @author wenmo
* @since 2021/10/29
**/
public
abstract
class
YarnGateway
extends
AbstractGateway
{
protected
DefaultClusterClientServiceLoader
clientServiceLoader
;
public
YarnGateway
()
{
}
public
YarnGateway
(
GatewayConfig
config
)
{
super
(
config
);
}
public
void
init
(){}
}
dlink-gateway/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
0 → 100644
View file @
59c7b06d
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
flink
.
yarn
;
import
org.apache.flink.annotation.VisibleForTesting
;
import
org.apache.flink.api.common.cache.DistributedCache
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.client.deployment.ClusterDeploymentException
;
import
org.apache.flink.client.deployment.ClusterDescriptor
;
import
org.apache.flink.client.deployment.ClusterRetrieveException
;
import
org.apache.flink.client.deployment.ClusterSpecification
;
import
org.apache.flink.client.deployment.application.ApplicationConfiguration
;
import
org.apache.flink.client.program.ClusterClientProvider
;
import
org.apache.flink.client.program.rest.RestClusterClient
;
import
org.apache.flink.configuration.ConfigConstants
;
import
org.apache.flink.configuration.ConfigOption
;
import
org.apache.flink.configuration.ConfigUtils
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.ConfigurationUtils
;
import
org.apache.flink.configuration.CoreOptions
;
import
org.apache.flink.configuration.HighAvailabilityOptions
;
import
org.apache.flink.configuration.IllegalConfigurationException
;
import
org.apache.flink.configuration.JobManagerOptions
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.configuration.ResourceManagerOptions
;
import
org.apache.flink.configuration.RestOptions
;
import
org.apache.flink.configuration.SecurityOptions
;
import
org.apache.flink.core.plugin.PluginConfig
;
import
org.apache.flink.core.plugin.PluginUtils
;
import
org.apache.flink.runtime.clusterframework.BootstrapTools
;
import
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobmanager.HighAvailabilityMode
;
import
org.apache.flink.runtime.jobmanager.JobManagerProcessSpec
;
import
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils
;
import
org.apache.flink.runtime.util.HadoopUtils
;
import
org.apache.flink.util.FlinkException
;
import
org.apache.flink.util.Preconditions
;
import
org.apache.flink.util.ShutdownHookUtil
;
import
org.apache.flink.util.StringUtils
;
import
org.apache.flink.yarn.configuration.YarnConfigOptions
;
import
org.apache.flink.yarn.configuration.YarnConfigOptionsInternal
;
import
org.apache.flink.yarn.configuration.YarnDeploymentTarget
;
import
org.apache.flink.yarn.configuration.YarnLogConfigUtil
;
import
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
;
import
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
;
import
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint
;
import
org.apache.commons.collections.ListUtils
;
import
org.apache.hadoop.fs.FileSystem
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.hdfs.DFSConfigKeys
;
import
org.apache.hadoop.security.UserGroupInformation
;
import
org.apache.hadoop.yarn.api.ApplicationConstants
;
import
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
;
import
org.apache.hadoop.yarn.api.records.ApplicationId
;
import
org.apache.hadoop.yarn.api.records.ApplicationReport
;
import
org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext
;
import
org.apache.hadoop.yarn.api.records.ContainerLaunchContext
;
import
org.apache.hadoop.yarn.api.records.FinalApplicationStatus
;
import
org.apache.hadoop.yarn.api.records.LocalResourceType
;
import
org.apache.hadoop.yarn.api.records.NodeReport
;
import
org.apache.hadoop.yarn.api.records.NodeState
;
import
org.apache.hadoop.yarn.api.records.Priority
;
import
org.apache.hadoop.yarn.api.records.QueueInfo
;
import
org.apache.hadoop.yarn.api.records.Resource
;
import
org.apache.hadoop.yarn.api.records.YarnApplicationState
;
import
org.apache.hadoop.yarn.api.records.YarnClusterMetrics
;
import
org.apache.hadoop.yarn.client.api.YarnClient
;
import
org.apache.hadoop.yarn.client.api.YarnClientApplication
;
import
org.apache.hadoop.yarn.conf.YarnConfiguration
;
import
org.apache.hadoop.yarn.exceptions.YarnException
;
import
org.apache.hadoop.yarn.util.ConverterUtils
;
import
org.apache.hadoop.yarn.util.Records
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.annotation.Nullable
;
import
java.io.ByteArrayOutputStream
;
import
java.io.File
;
import
java.io.FileOutputStream
;
import
java.io.IOException
;
import
java.io.ObjectOutputStream
;
import
java.io.PrintStream
;
import
java.io.UnsupportedEncodingException
;
import
java.lang.reflect.InvocationTargetException
;
import
java.lang.reflect.Method
;
import
java.net.URI
;
import
java.net.URLDecoder
;
import
java.nio.charset.Charset
;
import
java.util.Collection
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Optional
;
import
java.util.Set
;
import
java.util.stream.Collectors
;
import
static
org
.
apache
.
flink
.
configuration
.
ConfigConstants
.
DEFAULT_FLINK_USR_LIB_DIR
;
import
static
org
.
apache
.
flink
.
configuration
.
ConfigConstants
.
ENV_FLINK_LIB_DIR
;
import
static
org
.
apache
.
flink
.
runtime
.
entrypoint
.
component
.
FileJobGraphRetriever
.
JOB_GRAPH_FILE_PATH
;
import
static
org
.
apache
.
flink
.
util
.
Preconditions
.
checkArgument
;
import
static
org
.
apache
.
flink
.
util
.
Preconditions
.
checkNotNull
;
import
static
org
.
apache
.
flink
.
yarn
.
YarnConfigKeys
.
LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR
;
/** The descriptor with deployment information for deploying a Flink cluster on Yarn. */
public
class
YarnClusterDescriptor
implements
ClusterDescriptor
<
ApplicationId
>
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
YarnClusterDescriptor
.
class
);
private
final
YarnConfiguration
yarnConfiguration
;
private
final
YarnClient
yarnClient
;
private
final
YarnClusterInformationRetriever
yarnClusterInformationRetriever
;
/** True if the descriptor must not shut down the YarnClient. */
private
final
boolean
sharedYarnClient
;
/** Lazily initialized list of files to ship. */
private
final
List
<
File
>
shipFiles
=
new
LinkedList
<>();
private
final
List
<
File
>
shipArchives
=
new
LinkedList
<>();
private
final
String
yarnQueue
;
private
Path
flinkJarPath
;
private
final
Configuration
flinkConfiguration
;
private
final
String
customName
;
private
final
String
nodeLabel
;
private
final
String
applicationType
;
private
YarnConfigOptions
.
UserJarInclusion
userJarInclusion
;
public
YarnClusterDescriptor
(
Configuration
flinkConfiguration
,
YarnConfiguration
yarnConfiguration
,
YarnClient
yarnClient
,
YarnClusterInformationRetriever
yarnClusterInformationRetriever
,
boolean
sharedYarnClient
)
{
this
.
yarnConfiguration
=
Preconditions
.
checkNotNull
(
yarnConfiguration
);
this
.
yarnClient
=
Preconditions
.
checkNotNull
(
yarnClient
);
this
.
yarnClusterInformationRetriever
=
Preconditions
.
checkNotNull
(
yarnClusterInformationRetriever
);
this
.
sharedYarnClient
=
sharedYarnClient
;
this
.
flinkConfiguration
=
Preconditions
.
checkNotNull
(
flinkConfiguration
);
this
.
userJarInclusion
=
getUserJarInclusionMode
(
flinkConfiguration
);
getLocalFlinkDistPath
(
flinkConfiguration
).
ifPresent
(
this
::
setLocalJarPath
);
decodeFilesToShipToCluster
(
flinkConfiguration
,
YarnConfigOptions
.
SHIP_FILES
)
.
ifPresent
(
this
::
addShipFiles
);
decodeFilesToShipToCluster
(
flinkConfiguration
,
YarnConfigOptions
.
SHIP_ARCHIVES
)
.
ifPresent
(
this
::
addShipArchives
);
this
.
yarnQueue
=
flinkConfiguration
.
getString
(
YarnConfigOptions
.
APPLICATION_QUEUE
);
this
.
customName
=
flinkConfiguration
.
getString
(
YarnConfigOptions
.
APPLICATION_NAME
);
this
.
applicationType
=
flinkConfiguration
.
getString
(
YarnConfigOptions
.
APPLICATION_TYPE
);
this
.
nodeLabel
=
flinkConfiguration
.
getString
(
YarnConfigOptions
.
NODE_LABEL
);
}
private
Optional
<
List
<
File
>>
decodeFilesToShipToCluster
(
final
Configuration
configuration
,
final
ConfigOption
<
List
<
String
>>
configOption
)
{
checkNotNull
(
configuration
);
checkNotNull
(
configOption
);
final
List
<
File
>
files
=
ConfigUtils
.
decodeListFromConfig
(
configuration
,
configOption
,
File:
:
new
);
return
files
.
isEmpty
()
?
Optional
.
empty
()
:
Optional
.
of
(
files
);
}
private
Optional
<
Path
>
getLocalFlinkDistPath
(
final
Configuration
configuration
)
{
final
String
localJarPath
=
configuration
.
getString
(
YarnConfigOptions
.
FLINK_DIST_JAR
);
if
(
localJarPath
!=
null
)
{
return
Optional
.
of
(
new
Path
(
localJarPath
));
}
LOG
.
info
(
"No path for the flink jar passed. Using the location of "
+
getClass
()
+
" to locate the jar"
);
// check whether it's actually a jar file --> when testing we execute this class without a
// flink-dist jar
final
String
decodedPath
=
getDecodedJarPath
();
return
decodedPath
.
endsWith
(
".jar"
)
?
Optional
.
of
(
new
Path
(
new
File
(
decodedPath
).
toURI
()))
:
Optional
.
empty
();
}
private
String
getDecodedJarPath
()
{
final
String
encodedJarPath
=
getClass
().
getProtectionDomain
().
getCodeSource
().
getLocation
().
getPath
();
try
{
return
URLDecoder
.
decode
(
encodedJarPath
,
Charset
.
defaultCharset
().
name
());
}
catch
(
UnsupportedEncodingException
e
)
{
throw
new
RuntimeException
(
"Couldn't decode the encoded Flink dist jar path: "
+
encodedJarPath
+
" You can supply a path manually via the command line."
);
}
}
@VisibleForTesting
List
<
File
>
getShipFiles
()
{
return
shipFiles
;
}
public
YarnClient
getYarnClient
()
{
return
yarnClient
;
}
/**
* The class to start the application master with. This class runs the main method in case of
* session cluster.
*/
protected
String
getYarnSessionClusterEntrypoint
()
{
return
YarnSessionClusterEntrypoint
.
class
.
getName
();
}
/**
* The class to start the application master with. This class runs the main method in case of
* the job cluster.
*/
protected
String
getYarnJobClusterEntrypoint
()
{
return
YarnJobClusterEntrypoint
.
class
.
getName
();
}
public
Configuration
getFlinkConfiguration
()
{
return
flinkConfiguration
;
}
public
void
setLocalJarPath
(
Path
localJarPath
)
{
if
(!
localJarPath
.
toString
().
endsWith
(
"jar"
))
{
throw
new
IllegalArgumentException
(
"The passed jar path ('"
+
localJarPath
+
"') does not end with the 'jar' extension"
);
}
this
.
flinkJarPath
=
localJarPath
;
}
/**
* Adds the given files to the list of files to ship.
*
* <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be excluded from the upload by
* {@link YarnApplicationFileUploader#registerMultipleLocalResources(Collection, String,
* LocalResourceType)} since we upload the Flink uber jar ourselves and do not need to deploy it
* multiple times.
*
* @param shipFiles files to ship
*/
public
void
addShipFiles
(
List
<
File
>
shipFiles
)
{
checkArgument
(
userJarInclusion
!=
YarnConfigOptions
.
UserJarInclusion
.
DISABLED
||
isUsrLibDirIncludedInShipFiles
(
shipFiles
),
"This is an illegal ship directory : %s. When setting the %s to %s the name of ship directory can not be %s."
,
ConfigConstants
.
DEFAULT_FLINK_USR_LIB_DIR
,
YarnConfigOptions
.
CLASSPATH_INCLUDE_USER_JAR
.
key
(),
YarnConfigOptions
.
UserJarInclusion
.
DISABLED
,
ConfigConstants
.
DEFAULT_FLINK_USR_LIB_DIR
);
this
.
shipFiles
.
addAll
(
shipFiles
);
}
private
void
addShipArchives
(
List
<
File
>
shipArchives
)
{
checkArgument
(
isArchiveOnlyIncludedInShipArchiveFiles
(
shipArchives
),
"Non-archive files are included."
);
this
.
shipArchives
.
addAll
(
shipArchives
);
}
private
static
boolean
isArchiveOnlyIncludedInShipArchiveFiles
(
List
<
File
>
shipFiles
)
{
return
shipFiles
.
stream
()
.
filter
(
File:
:
isFile
)
.
map
(
File:
:
getName
)
.
map
(
String:
:
toLowerCase
)
.
allMatch
(
name
->
name
.
endsWith
(
".tar.gz"
)
||
name
.
endsWith
(
".tar"
)
||
name
.
endsWith
(
".tgz"
)
||
name
.
endsWith
(
".dst"
)
||
name
.
endsWith
(
".jar"
)
||
name
.
endsWith
(
".zip"
));
}
private
void
isReadyForDeployment
(
ClusterSpecification
clusterSpecification
)
throws
Exception
{
if
(
this
.
flinkJarPath
==
null
)
{
throw
new
YarnDeploymentException
(
"The Flink jar path is null"
);
}
if
(
this
.
flinkConfiguration
==
null
)
{
throw
new
YarnDeploymentException
(
"Flink configuration object has not been set"
);
}
// Check if we don't exceed YARN's maximum virtual cores.
final
int
numYarnMaxVcores
=
yarnClusterInformationRetriever
.
getMaxVcores
();
int
configuredAmVcores
=
flinkConfiguration
.
getInteger
(
YarnConfigOptions
.
APP_MASTER_VCORES
);
if
(
configuredAmVcores
>
numYarnMaxVcores
)
{
throw
new
IllegalConfigurationException
(
String
.
format
(
"The number of requested virtual cores for application master %d"
+
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster."
,
configuredAmVcores
,
numYarnMaxVcores
));
}
int
configuredVcores
=
flinkConfiguration
.
getInteger
(
YarnConfigOptions
.
VCORES
,
clusterSpecification
.
getSlotsPerTaskManager
());
// don't configure more than the maximum configured number of vcores
if
(
configuredVcores
>
numYarnMaxVcores
)
{
throw
new
IllegalConfigurationException
(
String
.
format
(
"The number of requested virtual cores per node %d"
+
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster."
+
" Please note that the number of virtual cores is set to the number of task slots by default"
+
" unless configured in the Flink config with '%s.'"
,
configuredVcores
,
numYarnMaxVcores
,
YarnConfigOptions
.
VCORES
.
key
()));
}
// check if required Hadoop environment variables are set. If not, warn user
if
(
System
.
getenv
(
"HADOOP_CONF_DIR"
)
==
null
&&
System
.
getenv
(
"YARN_CONF_DIR"
)
==
null
)
{
LOG
.
warn
(
"Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. "
+
"The Flink YARN Client needs one of these to be set to properly load the Hadoop "
+
"configuration for accessing YARN."
);
}
}
public
String
getNodeLabel
()
{
return
nodeLabel
;
}
// -------------------------------------------------------------
// Lifecycle management
// -------------------------------------------------------------
@Override
public
void
close
()
{
if
(!
sharedYarnClient
)
{
yarnClient
.
stop
();
}
}
// -------------------------------------------------------------
// ClusterClient overrides
// -------------------------------------------------------------
@Override
public
ClusterClientProvider
<
ApplicationId
>
retrieve
(
ApplicationId
applicationId
)
throws
ClusterRetrieveException
{
try
{
// check if required Hadoop environment variables are set. If not, warn user
if
(
System
.
getenv
(
"HADOOP_CONF_DIR"
)
==
null
&&
System
.
getenv
(
"YARN_CONF_DIR"
)
==
null
)
{
LOG
.
warn
(
"Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set."
+
"The Flink YARN Client needs one of these to be set to properly load the Hadoop "
+
"configuration for accessing YARN."
);
}
final
ApplicationReport
report
=
yarnClient
.
getApplicationReport
(
applicationId
);
if
(
report
.
getFinalApplicationStatus
()
!=
FinalApplicationStatus
.
UNDEFINED
)
{
// Flink cluster is not running anymore
LOG
.
error
(
"The application {} doesn't run anymore. It has previously completed with final status: {}"
,
applicationId
,
report
.
getFinalApplicationStatus
());
throw
new
RuntimeException
(
"The Yarn application "
+
applicationId
+
" doesn't run anymore."
);
}
setClusterEntrypointInfoToConfig
(
report
);
return
()
->
{
try
{
return
new
RestClusterClient
<>(
flinkConfiguration
,
report
.
getApplicationId
());
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Couldn't retrieve Yarn cluster"
,
e
);
}
};
}
catch
(
Exception
e
)
{
throw
new
ClusterRetrieveException
(
"Couldn't retrieve Yarn cluster"
,
e
);
}
}
@Override
public
ClusterClientProvider
<
ApplicationId
>
deploySessionCluster
(
ClusterSpecification
clusterSpecification
)
throws
ClusterDeploymentException
{
try
{
return
deployInternal
(
clusterSpecification
,
"Flink session cluster"
,
getYarnSessionClusterEntrypoint
(),
null
,
false
);
}
catch
(
Exception
e
)
{
throw
new
ClusterDeploymentException
(
"Couldn't deploy Yarn session cluster"
,
e
);
}
}
@Override
public
ClusterClientProvider
<
ApplicationId
>
deployApplicationCluster
(
final
ClusterSpecification
clusterSpecification
,
final
ApplicationConfiguration
applicationConfiguration
)
throws
ClusterDeploymentException
{
checkNotNull
(
clusterSpecification
);
checkNotNull
(
applicationConfiguration
);
final
YarnDeploymentTarget
deploymentTarget
=
YarnDeploymentTarget
.
fromConfig
(
flinkConfiguration
);
if
(
YarnDeploymentTarget
.
APPLICATION
!=
deploymentTarget
)
{
throw
new
ClusterDeploymentException
(
"Couldn't deploy Yarn Application Cluster."
+
" Expected deployment.target="
+
YarnDeploymentTarget
.
APPLICATION
.
getName
()
+
" but actual one was \""
+
deploymentTarget
.
getName
()
+
"\""
);
}
applicationConfiguration
.
applyToConfiguration
(
flinkConfiguration
);
final
List
<
String
>
pipelineJars
=
flinkConfiguration
.
getOptional
(
PipelineOptions
.
JARS
)
.
orElse
(
Collections
.
emptyList
());
Preconditions
.
checkArgument
(
pipelineJars
.
size
()
==
1
,
"Should only have one jar"
);
try
{
return
deployInternal
(
clusterSpecification
,
"Flink Application Cluster"
,
YarnApplicationClusterEntryPoint
.
class
.
getName
(),
null
,
false
);
}
catch
(
Exception
e
)
{
throw
new
ClusterDeploymentException
(
"Couldn't deploy Yarn Application Cluster"
,
e
);
}
}
@Override
public
ClusterClientProvider
<
ApplicationId
>
deployJobCluster
(
ClusterSpecification
clusterSpecification
,
JobGraph
jobGraph
,
boolean
detached
)
throws
ClusterDeploymentException
{
try
{
return
deployInternal
(
clusterSpecification
,
"Flink per-job cluster"
,
getYarnJobClusterEntrypoint
(),
jobGraph
,
detached
);
}
catch
(
Exception
e
)
{
throw
new
ClusterDeploymentException
(
"Could not deploy Yarn job cluster."
,
e
);
}
}
@Override
public
void
killCluster
(
ApplicationId
applicationId
)
throws
FlinkException
{
try
{
yarnClient
.
killApplication
(
applicationId
);
try
(
final
FileSystem
fs
=
FileSystem
.
get
(
yarnConfiguration
))
{
final
Path
applicationDir
=
YarnApplicationFileUploader
.
getApplicationDirPath
(
getStagingDir
(
fs
),
applicationId
);
Utils
.
deleteApplicationFiles
(
applicationDir
.
toUri
().
toString
());
}
}
catch
(
YarnException
|
IOException
e
)
{
throw
new
FlinkException
(
"Could not kill the Yarn Flink cluster with id "
+
applicationId
+
'.'
,
e
);
}
}
/**
* This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
*
* @param clusterSpecification Initial cluster specification for the Flink cluster to be
* deployed
* @param applicationName name of the Yarn application to start
* @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.
* @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
* @param detached True if the cluster should be started in detached mode
*/
public
ClusterClientProvider
<
ApplicationId
>
deployInternal
(
ClusterSpecification
clusterSpecification
,
String
applicationName
,
String
yarnClusterEntrypoint
,
@Nullable
JobGraph
jobGraph
,
boolean
detached
)
throws
Exception
{
final
UserGroupInformation
currentUser
=
UserGroupInformation
.
getCurrentUser
();
if
(
HadoopUtils
.
isKerberosSecurityEnabled
(
currentUser
))
{
boolean
useTicketCache
=
flinkConfiguration
.
getBoolean
(
SecurityOptions
.
KERBEROS_LOGIN_USETICKETCACHE
);
if
(!
HadoopUtils
.
areKerberosCredentialsValid
(
currentUser
,
useTicketCache
))
{
throw
new
RuntimeException
(
"Hadoop security with Kerberos is enabled but the login user "
+
"does not have Kerberos credentials or delegation tokens!"
);
}
}
isReadyForDeployment
(
clusterSpecification
);
// ------------------ Check if the specified queue exists --------------------
checkYarnQueues
(
yarnClient
);
// ------------------ Check if the YARN ClusterClient has the requested resources
// --------------
// Create application via yarnClient
final
YarnClientApplication
yarnApplication
=
yarnClient
.
createApplication
();
final
GetNewApplicationResponse
appResponse
=
yarnApplication
.
getNewApplicationResponse
();
Resource
maxRes
=
appResponse
.
getMaximumResourceCapability
();
final
ClusterResourceDescription
freeClusterMem
;
try
{
freeClusterMem
=
getCurrentFreeClusterResources
(
yarnClient
);
}
catch
(
YarnException
|
IOException
e
)
{
failSessionDuringDeployment
(
yarnClient
,
yarnApplication
);
throw
new
YarnDeploymentException
(
"Could not retrieve information about free cluster resources."
,
e
);
}
final
int
yarnMinAllocationMB
=
yarnConfiguration
.
getInt
(
YarnConfiguration
.
RM_SCHEDULER_MINIMUM_ALLOCATION_MB
,
YarnConfiguration
.
DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB
);
if
(
yarnMinAllocationMB
<=
0
)
{
throw
new
YarnDeploymentException
(
"The minimum allocation memory "
+
"("
+
yarnMinAllocationMB
+
" MB) configured via '"
+
YarnConfiguration
.
RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+
"' should be greater than 0."
);
}
final
ClusterSpecification
validClusterSpecification
;
try
{
validClusterSpecification
=
validateClusterResources
(
clusterSpecification
,
yarnMinAllocationMB
,
maxRes
,
freeClusterMem
);
}
catch
(
YarnDeploymentException
yde
)
{
failSessionDuringDeployment
(
yarnClient
,
yarnApplication
);
throw
yde
;
}
LOG
.
info
(
"Cluster specification: {}"
,
validClusterSpecification
);
final
ClusterEntrypoint
.
ExecutionMode
executionMode
=
detached
?
ClusterEntrypoint
.
ExecutionMode
.
DETACHED
:
ClusterEntrypoint
.
ExecutionMode
.
NORMAL
;
flinkConfiguration
.
setString
(
ClusterEntrypoint
.
EXECUTION_MODE
,
executionMode
.
toString
());
ApplicationReport
report
=
startAppMaster
(
flinkConfiguration
,
applicationName
,
yarnClusterEntrypoint
,
jobGraph
,
yarnClient
,
yarnApplication
,
validClusterSpecification
);
// print the application id for user to cancel themselves.
if
(
detached
)
{
final
ApplicationId
yarnApplicationId
=
report
.
getApplicationId
();
logDetachedClusterInformation
(
yarnApplicationId
,
LOG
);
}
setClusterEntrypointInfoToConfig
(
report
);
return
()
->
{
try
{
return
new
RestClusterClient
<>(
flinkConfiguration
,
report
.
getApplicationId
());
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Error while creating RestClusterClient."
,
e
);
}
};
}
private
ClusterSpecification
validateClusterResources
(
ClusterSpecification
clusterSpecification
,
int
yarnMinAllocationMB
,
Resource
maximumResourceCapability
,
ClusterResourceDescription
freeClusterResources
)
throws
YarnDeploymentException
{
int
jobManagerMemoryMb
=
clusterSpecification
.
getMasterMemoryMB
();
final
int
taskManagerMemoryMb
=
clusterSpecification
.
getTaskManagerMemoryMB
();
logIfComponentMemNotIntegerMultipleOfYarnMinAllocation
(
"JobManager"
,
jobManagerMemoryMb
,
yarnMinAllocationMB
);
logIfComponentMemNotIntegerMultipleOfYarnMinAllocation
(
"TaskManager"
,
taskManagerMemoryMb
,
yarnMinAllocationMB
);
// set the memory to minAllocationMB to do the next checks correctly
if
(
jobManagerMemoryMb
<
yarnMinAllocationMB
)
{
jobManagerMemoryMb
=
yarnMinAllocationMB
;
}
final
String
note
=
"Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"
;
if
(
jobManagerMemoryMb
>
maximumResourceCapability
.
getMemory
())
{
throw
new
YarnDeploymentException
(
"The cluster does not have the requested resources for the JobManager available!\n"
+
"Maximum Memory: "
+
maximumResourceCapability
.
getMemory
()
+
"MB Requested: "
+
jobManagerMemoryMb
+
"MB. "
+
note
);
}
if
(
taskManagerMemoryMb
>
maximumResourceCapability
.
getMemory
())
{
throw
new
YarnDeploymentException
(
"The cluster does not have the requested resources for the TaskManagers available!\n"
+
"Maximum Memory: "
+
maximumResourceCapability
.
getMemory
()
+
" Requested: "
+
taskManagerMemoryMb
+
"MB. "
+
note
);
}
final
String
noteRsc
=
"\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are "
+
"connecting from the beginning because the resources are currently not available in the cluster. "
+
"The allocation might take more time than usual because the Flink YARN client needs to wait until "
+
"the resources become available."
;
if
(
taskManagerMemoryMb
>
freeClusterResources
.
containerLimit
)
{
LOG
.
warn
(
"The requested amount of memory for the TaskManagers ("
+
taskManagerMemoryMb
+
"MB) is more than "
+
"the largest possible YARN container: "
+
freeClusterResources
.
containerLimit
+
noteRsc
);
}
if
(
jobManagerMemoryMb
>
freeClusterResources
.
containerLimit
)
{
LOG
.
warn
(
"The requested amount of memory for the JobManager ("
+
jobManagerMemoryMb
+
"MB) is more than "
+
"the largest possible YARN container: "
+
freeClusterResources
.
containerLimit
+
noteRsc
);
}
return
new
ClusterSpecification
.
ClusterSpecificationBuilder
()
.
setMasterMemoryMB
(
jobManagerMemoryMb
)
.
setTaskManagerMemoryMB
(
taskManagerMemoryMb
)
.
setSlotsPerTaskManager
(
clusterSpecification
.
getSlotsPerTaskManager
())
.
createClusterSpecification
();
}
private
void
logIfComponentMemNotIntegerMultipleOfYarnMinAllocation
(
String
componentName
,
int
componentMemoryMB
,
int
yarnMinAllocationMB
)
{
int
normalizedMemMB
=
(
componentMemoryMB
+
(
yarnMinAllocationMB
-
1
))
/
yarnMinAllocationMB
*
yarnMinAllocationMB
;
if
(
normalizedMemMB
<=
0
)
{
normalizedMemMB
=
yarnMinAllocationMB
;
}
if
(
componentMemoryMB
!=
normalizedMemMB
)
{
LOG
.
info
(
"The configured {} memory is {} MB. YARN will allocate {} MB to make up an integer multiple of its "
+
"minimum allocation memory ({} MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra {} MB "
+
"may not be used by Flink."
,
componentName
,
componentMemoryMB
,
normalizedMemMB
,
yarnMinAllocationMB
,
normalizedMemMB
-
componentMemoryMB
);
}
}
private
void
checkYarnQueues
(
YarnClient
yarnClient
)
{
try
{
List
<
QueueInfo
>
queues
=
yarnClient
.
getAllQueues
();
if
(
queues
.
size
()
>
0
&&
this
.
yarnQueue
!=
null
)
{
// check only if there are queues configured in yarn and for
// this session.
boolean
queueFound
=
false
;
for
(
QueueInfo
queue
:
queues
)
{
if
(
queue
.
getQueueName
().
equals
(
this
.
yarnQueue
)
||
queue
.
getQueueName
().
equals
(
"root."
+
this
.
yarnQueue
))
{
queueFound
=
true
;
break
;
}
}
if
(!
queueFound
)
{
String
queueNames
=
""
;
for
(
QueueInfo
queue
:
queues
)
{
queueNames
+=
queue
.
getQueueName
()
+
", "
;
}
LOG
.
warn
(
"The specified queue '"
+
this
.
yarnQueue
+
"' does not exist. "
+
"Available queues: "
+
queueNames
);
}
}
else
{
LOG
.
debug
(
"The YARN cluster does not have any queues configured"
);
}
}
catch
(
Throwable
e
)
{
LOG
.
warn
(
"Error while getting queue information from YARN: "
+
e
.
getMessage
());
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Error details"
,
e
);
}
}
}
private
ApplicationReport
startAppMaster
(
Configuration
configuration
,
String
applicationName
,
String
yarnClusterEntrypoint
,
JobGraph
jobGraph
,
YarnClient
yarnClient
,
YarnClientApplication
yarnApplication
,
ClusterSpecification
clusterSpecification
)
throws
Exception
{
// ------------------ Initialize the file systems -------------------------
org
.
apache
.
flink
.
core
.
fs
.
FileSystem
.
initialize
(
configuration
,
PluginUtils
.
createPluginManagerFromRootFolder
(
configuration
));
final
FileSystem
fs
=
FileSystem
.
get
(
yarnConfiguration
);
// hard coded check for the GoogleHDFS client because its not overriding the getScheme()
// method.
if
(!
fs
.
getClass
().
getSimpleName
().
equals
(
"GoogleHadoopFileSystem"
)
&&
fs
.
getScheme
().
startsWith
(
"file"
))
{
LOG
.
warn
(
"The file system scheme is '"
+
fs
.
getScheme
()
+
"'. This indicates that the "
+
"specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+
"The Flink YARN client needs to store its files in a distributed file system"
);
}
ApplicationSubmissionContext
appContext
=
yarnApplication
.
getApplicationSubmissionContext
();
final
List
<
Path
>
providedLibDirs
=
Utils
.
getQualifiedRemoteSharedPaths
(
configuration
,
yarnConfiguration
);
final
YarnApplicationFileUploader
fileUploader
=
YarnApplicationFileUploader
.
from
(
fs
,
getStagingDir
(
fs
),
providedLibDirs
,
appContext
.
getApplicationId
(),
getFileReplication
());
// The files need to be shipped and added to classpath.
Set
<
File
>
systemShipFiles
=
new
HashSet
<>(
shipFiles
.
size
());
for
(
File
file
:
shipFiles
)
{
systemShipFiles
.
add
(
file
.
getAbsoluteFile
());
}
final
String
logConfigFilePath
=
configuration
.
getString
(
YarnConfigOptionsInternal
.
APPLICATION_LOG_CONFIG_FILE
);
if
(
logConfigFilePath
!=
null
)
{
systemShipFiles
.
add
(
new
File
(
logConfigFilePath
));
}
// Set-up ApplicationSubmissionContext for the application
final
ApplicationId
appId
=
appContext
.
getApplicationId
();
// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
setHAClusterIdIfNotSet
(
configuration
,
appId
);
if
(
HighAvailabilityMode
.
isHighAvailabilityModeActivated
(
configuration
))
{
// activate re-execution of failed applications
appContext
.
setMaxAppAttempts
(
configuration
.
getInteger
(
YarnConfigOptions
.
APPLICATION_ATTEMPTS
.
key
(),
YarnConfiguration
.
DEFAULT_RM_AM_MAX_ATTEMPTS
));
activateHighAvailabilitySupport
(
appContext
);
}
else
{
// set number of application retries to 1 in the default case
appContext
.
setMaxAppAttempts
(
configuration
.
getInteger
(
YarnConfigOptions
.
APPLICATION_ATTEMPTS
.
key
(),
1
));
}
final
Set
<
Path
>
userJarFiles
=
new
HashSet
<>();
if
(
jobGraph
!=
null
)
{
userJarFiles
.
addAll
(
jobGraph
.
getUserJars
().
stream
()
.
map
(
f
->
f
.
toUri
())
.
map
(
Path:
:
new
)
.
collect
(
Collectors
.
toSet
()));
}
final
List
<
URI
>
jarUrls
=
ConfigUtils
.
decodeListFromConfig
(
configuration
,
PipelineOptions
.
JARS
,
URI:
:
create
);
if
(
jarUrls
!=
null
&&
YarnApplicationClusterEntryPoint
.
class
.
getName
().
equals
(
yarnClusterEntrypoint
))
{
userJarFiles
.
addAll
(
jarUrls
.
stream
().
map
(
Path:
:
new
).
collect
(
Collectors
.
toSet
()));
}
// only for per job mode
if
(
jobGraph
!=
null
)
{
for
(
Map
.
Entry
<
String
,
DistributedCache
.
DistributedCacheEntry
>
entry
:
jobGraph
.
getUserArtifacts
().
entrySet
())
{
// only upload local files
if
(!
Utils
.
isRemotePath
(
entry
.
getValue
().
filePath
))
{
Path
localPath
=
new
Path
(
entry
.
getValue
().
filePath
);
Tuple2
<
Path
,
Long
>
remoteFileInfo
=
fileUploader
.
uploadLocalFileToRemote
(
localPath
,
entry
.
getKey
());
jobGraph
.
setUserArtifactRemotePath
(
entry
.
getKey
(),
remoteFileInfo
.
f0
.
toString
());
}
}
jobGraph
.
writeUserArtifactEntriesToConfiguration
();
}
if
(
providedLibDirs
==
null
||
providedLibDirs
.
isEmpty
())
{
addLibFoldersToShipFiles
(
systemShipFiles
);
}
// Register all files in provided lib dirs as local resources with public visibility
// and upload the remaining dependencies as local resources with APPLICATION visibility.
final
List
<
String
>
systemClassPaths
=
fileUploader
.
registerProvidedLocalResources
();
final
List
<
String
>
uploadedDependencies
=
fileUploader
.
registerMultipleLocalResources
(
systemShipFiles
.
stream
()
.
map
(
e
->
new
Path
(
e
.
toURI
()))
.
collect
(
Collectors
.
toSet
()),
Path
.
CUR_DIR
,
LocalResourceType
.
FILE
);
systemClassPaths
.
addAll
(
uploadedDependencies
);
// upload and register ship-only files
// Plugin files only need to be shipped and should not be added to classpath.
if
(
providedLibDirs
==
null
||
providedLibDirs
.
isEmpty
())
{
Set
<
File
>
shipOnlyFiles
=
new
HashSet
<>();
addPluginsFoldersToShipFiles
(
shipOnlyFiles
);
fileUploader
.
registerMultipleLocalResources
(
shipOnlyFiles
.
stream
()
.
map
(
e
->
new
Path
(
e
.
toURI
()))
.
collect
(
Collectors
.
toSet
()),
Path
.
CUR_DIR
,
LocalResourceType
.
FILE
);
}
if
(!
shipArchives
.
isEmpty
())
{
fileUploader
.
registerMultipleLocalResources
(
shipArchives
.
stream
().
map
(
e
->
new
Path
(
e
.
toURI
())).
collect
(
Collectors
.
toSet
()),
Path
.
CUR_DIR
,
LocalResourceType
.
ARCHIVE
);
}
// Upload and register user jars
final
List
<
String
>
userClassPaths
=
fileUploader
.
registerMultipleLocalResources
(
userJarFiles
,
userJarInclusion
==
YarnConfigOptions
.
UserJarInclusion
.
DISABLED
?
ConfigConstants
.
DEFAULT_FLINK_USR_LIB_DIR
:
Path
.
CUR_DIR
,
LocalResourceType
.
FILE
);
if
(
userJarInclusion
==
YarnConfigOptions
.
UserJarInclusion
.
ORDER
)
{
systemClassPaths
.
addAll
(
userClassPaths
);
}
// normalize classpath by sorting
Collections
.
sort
(
systemClassPaths
);
Collections
.
sort
(
userClassPaths
);
// classpath assembler
StringBuilder
classPathBuilder
=
new
StringBuilder
();
if
(
userJarInclusion
==
YarnConfigOptions
.
UserJarInclusion
.
FIRST
)
{
for
(
String
userClassPath
:
userClassPaths
)
{
classPathBuilder
.
append
(
userClassPath
).
append
(
File
.
pathSeparator
);
}
}
for
(
String
classPath
:
systemClassPaths
)
{
classPathBuilder
.
append
(
classPath
).
append
(
File
.
pathSeparator
);
}
// Setup jar for ApplicationMaster
final
YarnLocalResourceDescriptor
localResourceDescFlinkJar
=
fileUploader
.
uploadFlinkDist
(
flinkJarPath
);
classPathBuilder
.
append
(
localResourceDescFlinkJar
.
getResourceKey
())
.
append
(
File
.
pathSeparator
);
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if
(
jobGraph
!=
null
)
{
File
tmpJobGraphFile
=
null
;
try
{
tmpJobGraphFile
=
File
.
createTempFile
(
appId
.
toString
(),
null
);
try
(
FileOutputStream
output
=
new
FileOutputStream
(
tmpJobGraphFile
);
ObjectOutputStream
obOutput
=
new
ObjectOutputStream
(
output
))
{
obOutput
.
writeObject
(
jobGraph
);
}
final
String
jobGraphFilename
=
"job.graph"
;
configuration
.
setString
(
JOB_GRAPH_FILE_PATH
,
jobGraphFilename
);
fileUploader
.
registerSingleLocalResource
(
jobGraphFilename
,
new
Path
(
tmpJobGraphFile
.
toURI
()),
""
,
LocalResourceType
.
FILE
,
true
,
false
);
classPathBuilder
.
append
(
jobGraphFilename
).
append
(
File
.
pathSeparator
);
}
catch
(
Exception
e
)
{
LOG
.
warn
(
"Add job graph to local resource fail."
);
throw
e
;
}
finally
{
if
(
tmpJobGraphFile
!=
null
&&
!
tmpJobGraphFile
.
delete
())
{
LOG
.
warn
(
"Fail to delete temporary file {}."
,
tmpJobGraphFile
.
toPath
());
}
}
}
// Upload the flink configuration
// write out configuration file
File
tmpConfigurationFile
=
null
;
try
{
tmpConfigurationFile
=
File
.
createTempFile
(
appId
+
"-flink-conf.yaml"
,
null
);
BootstrapTools
.
writeConfiguration
(
configuration
,
tmpConfigurationFile
);
String
flinkConfigKey
=
"flink-conf.yaml"
;
fileUploader
.
registerSingleLocalResource
(
flinkConfigKey
,
new
Path
(
tmpConfigurationFile
.
getAbsolutePath
()),
""
,
LocalResourceType
.
FILE
,
true
,
true
);
classPathBuilder
.
append
(
"flink-conf.yaml"
).
append
(
File
.
pathSeparator
);
}
finally
{
if
(
tmpConfigurationFile
!=
null
&&
!
tmpConfigurationFile
.
delete
())
{
LOG
.
warn
(
"Fail to delete temporary file {}."
,
tmpConfigurationFile
.
toPath
());
}
}
if
(
userJarInclusion
==
YarnConfigOptions
.
UserJarInclusion
.
LAST
)
{
for
(
String
userClassPath
:
userClassPaths
)
{
classPathBuilder
.
append
(
userClassPath
).
append
(
File
.
pathSeparator
);
}
}
// To support Yarn Secure Integration Test Scenario
// In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
// the Yarn site XML
// and KRB5 configuration files. We are adding these files as container local resources for
// the container
// applications (JM/TMs) to have proper secure cluster setup
Path
remoteYarnSiteXmlPath
=
null
;
if
(
System
.
getenv
(
"IN_TESTS"
)
!=
null
)
{
File
f
=
new
File
(
System
.
getenv
(
"YARN_CONF_DIR"
),
Utils
.
YARN_SITE_FILE_NAME
);
LOG
.
info
(
"Adding Yarn configuration {} to the AM container local resource bucket"
,
f
.
getAbsolutePath
());
Path
yarnSitePath
=
new
Path
(
f
.
getAbsolutePath
());
remoteYarnSiteXmlPath
=
fileUploader
.
registerSingleLocalResource
(
Utils
.
YARN_SITE_FILE_NAME
,
yarnSitePath
,
""
,
LocalResourceType
.
FILE
,
false
,
false
)
.
getPath
();
if
(
System
.
getProperty
(
"java.security.krb5.conf"
)
!=
null
)
{
configuration
.
set
(
SecurityOptions
.
KERBEROS_KRB5_PATH
,
System
.
getProperty
(
"java.security.krb5.conf"
));
}
}
Path
remoteKrb5Path
=
null
;
boolean
hasKrb5
=
false
;
String
krb5Config
=
configuration
.
get
(
SecurityOptions
.
KERBEROS_KRB5_PATH
);
if
(!
StringUtils
.
isNullOrWhitespaceOnly
(
krb5Config
))
{
final
File
krb5
=
new
File
(
krb5Config
);
LOG
.
info
(
"Adding KRB5 configuration {} to the AM container local resource bucket"
,
krb5
.
getAbsolutePath
());
final
Path
krb5ConfPath
=
new
Path
(
krb5
.
getAbsolutePath
());
remoteKrb5Path
=
fileUploader
.
registerSingleLocalResource
(
Utils
.
KRB5_FILE_NAME
,
krb5ConfPath
,
""
,
LocalResourceType
.
FILE
,
false
,
false
)
.
getPath
();
hasKrb5
=
true
;
}
Path
remotePathKeytab
=
null
;
String
localizedKeytabPath
=
null
;
String
keytab
=
configuration
.
getString
(
SecurityOptions
.
KERBEROS_LOGIN_KEYTAB
);
if
(
keytab
!=
null
)
{
boolean
localizeKeytab
=
flinkConfiguration
.
getBoolean
(
YarnConfigOptions
.
SHIP_LOCAL_KEYTAB
);
localizedKeytabPath
=
flinkConfiguration
.
getString
(
YarnConfigOptions
.
LOCALIZED_KEYTAB_PATH
);
if
(
localizeKeytab
)
{
// Localize the keytab to YARN containers via local resource.
LOG
.
info
(
"Adding keytab {} to the AM container local resource bucket"
,
keytab
);
remotePathKeytab
=
fileUploader
.
registerSingleLocalResource
(
localizedKeytabPath
,
new
Path
(
keytab
),
""
,
LocalResourceType
.
FILE
,
false
,
false
)
.
getPath
();
}
else
{
// // Assume Keytab is pre-installed in the container.
localizedKeytabPath
=
flinkConfiguration
.
getString
(
YarnConfigOptions
.
LOCALIZED_KEYTAB_PATH
);
}
}
final
JobManagerProcessSpec
processSpec
=
JobManagerProcessUtils
.
processSpecFromConfigWithNewOptionToInterpretLegacyHeap
(
flinkConfiguration
,
JobManagerOptions
.
TOTAL_PROCESS_MEMORY
);
final
ContainerLaunchContext
amContainer
=
setupApplicationMasterContainer
(
yarnClusterEntrypoint
,
hasKrb5
,
processSpec
);
// setup security tokens
if
(
UserGroupInformation
.
isSecurityEnabled
())
{
// set HDFS delegation tokens when security is enabled
LOG
.
info
(
"Adding delegation token to the AM container."
);
List
<
Path
>
yarnAccessList
=
ConfigUtils
.
decodeListFromConfig
(
configuration
,
YarnConfigOptions
.
YARN_ACCESS
,
Path:
:
new
);
Utils
.
setTokensFor
(
amContainer
,
ListUtils
.
union
(
yarnAccessList
,
fileUploader
.
getRemotePaths
()),
yarnConfiguration
);
}
amContainer
.
setLocalResources
(
fileUploader
.
getRegisteredLocalResources
());
fileUploader
.
close
();
// Setup CLASSPATH and environment variables for ApplicationMaster
final
Map
<
String
,
String
>
appMasterEnv
=
new
HashMap
<>();
// set user specified app master environment variables
appMasterEnv
.
putAll
(
ConfigurationUtils
.
getPrefixedKeyValuePairs
(
ResourceManagerOptions
.
CONTAINERIZED_MASTER_ENV_PREFIX
,
configuration
));
// set Flink app class path
appMasterEnv
.
put
(
YarnConfigKeys
.
ENV_FLINK_CLASSPATH
,
classPathBuilder
.
toString
());
// set Flink on YARN internal configuration values
appMasterEnv
.
put
(
YarnConfigKeys
.
FLINK_DIST_JAR
,
localResourceDescFlinkJar
.
toString
());
appMasterEnv
.
put
(
YarnConfigKeys
.
ENV_APP_ID
,
appId
.
toString
());
appMasterEnv
.
put
(
YarnConfigKeys
.
ENV_CLIENT_HOME_DIR
,
fileUploader
.
getHomeDir
().
toString
());
appMasterEnv
.
put
(
YarnConfigKeys
.
ENV_CLIENT_SHIP_FILES
,
encodeYarnLocalResourceDescriptorListToString
(
fileUploader
.
getEnvShipResourceList
()));
appMasterEnv
.
put
(
YarnConfigKeys
.
FLINK_YARN_FILES
,
fileUploader
.
getApplicationDir
().
toUri
().
toString
());
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv
.
put
(
YarnConfigKeys
.
ENV_HADOOP_USER_NAME
,
UserGroupInformation
.
getCurrentUser
().
getUserName
());
if
(
localizedKeytabPath
!=
null
)
{
appMasterEnv
.
put
(
YarnConfigKeys
.
LOCAL_KEYTAB_PATH
,
localizedKeytabPath
);
String
principal
=
configuration
.
getString
(
SecurityOptions
.
KERBEROS_LOGIN_PRINCIPAL
);
appMasterEnv
.
put
(
YarnConfigKeys
.
KEYTAB_PRINCIPAL
,
principal
);
if
(
remotePathKeytab
!=
null
)
{
appMasterEnv
.
put
(
YarnConfigKeys
.
REMOTE_KEYTAB_PATH
,
remotePathKeytab
.
toString
());
}
}
// To support Yarn Secure Integration Test Scenario
if
(
remoteYarnSiteXmlPath
!=
null
)
{
appMasterEnv
.
put
(
YarnConfigKeys
.
ENV_YARN_SITE_XML_PATH
,
remoteYarnSiteXmlPath
.
toString
());
}
if
(
remoteKrb5Path
!=
null
)
{
appMasterEnv
.
put
(
YarnConfigKeys
.
ENV_KRB5_PATH
,
remoteKrb5Path
.
toString
());
}
// set classpath from YARN configuration
Utils
.
setupYarnClassPath
(
yarnConfiguration
,
appMasterEnv
);
amContainer
.
setEnvironment
(
appMasterEnv
);
// Set up resource type requirements for ApplicationMaster
Resource
capability
=
Records
.
newRecord
(
Resource
.
class
);
capability
.
setMemory
(
clusterSpecification
.
getMasterMemoryMB
());
capability
.
setVirtualCores
(
flinkConfiguration
.
getInteger
(
YarnConfigOptions
.
APP_MASTER_VCORES
));
final
String
customApplicationName
=
customName
!=
null
?
customName
:
applicationName
;
appContext
.
setApplicationName
(
customApplicationName
);
appContext
.
setApplicationType
(
applicationType
!=
null
?
applicationType
:
"Apache Flink"
);
appContext
.
setAMContainerSpec
(
amContainer
);
appContext
.
setResource
(
capability
);
// Set priority for application
int
priorityNum
=
flinkConfiguration
.
getInteger
(
YarnConfigOptions
.
APPLICATION_PRIORITY
);
if
(
priorityNum
>=
0
)
{
Priority
priority
=
Priority
.
newInstance
(
priorityNum
);
appContext
.
setPriority
(
priority
);
}
if
(
yarnQueue
!=
null
)
{
appContext
.
setQueue
(
yarnQueue
);
}
setApplicationNodeLabel
(
appContext
);
setApplicationTags
(
appContext
);
// add a hook to clean up in case deployment fails
Thread
deploymentFailureHook
=
new
DeploymentFailureHook
(
yarnApplication
,
fileUploader
.
getApplicationDir
());
Runtime
.
getRuntime
().
addShutdownHook
(
deploymentFailureHook
);
LOG
.
info
(
"Submitting application master "
+
appId
);
yarnClient
.
submitApplication
(
appContext
);
LOG
.
info
(
"Waiting for the cluster to be allocated"
);
final
long
startTime
=
System
.
currentTimeMillis
();
ApplicationReport
report
;
YarnApplicationState
lastAppState
=
YarnApplicationState
.
NEW
;
loop:
while
(
true
)
{
try
{
report
=
yarnClient
.
getApplicationReport
(
appId
);
}
catch
(
IOException
e
)
{
throw
new
YarnDeploymentException
(
"Failed to deploy the cluster."
,
e
);
}
YarnApplicationState
appState
=
report
.
getYarnApplicationState
();
LOG
.
debug
(
"Application State: {}"
,
appState
);
switch
(
appState
)
{
case
FAILED:
case
KILLED:
throw
new
YarnDeploymentException
(
"The YARN application unexpectedly switched to state "
+
appState
+
" during deployment. \n"
+
"Diagnostics from YARN: "
+
report
.
getDiagnostics
()
+
"\n"
+
"If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
+
"yarn logs -applicationId "
+
appId
);
// break ..
case
RUNNING:
LOG
.
info
(
"YARN application has been deployed successfully."
);
break
loop
;
case
FINISHED:
LOG
.
info
(
"YARN application has been finished successfully."
);
break
loop
;
default
:
if
(
appState
!=
lastAppState
)
{
LOG
.
info
(
"Deploying cluster, current state "
+
appState
);
}
if
(
System
.
currentTimeMillis
()
-
startTime
>
60000
)
{
LOG
.
info
(
"Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster"
);
}
}
lastAppState
=
appState
;
Thread
.
sleep
(
250
);
}
// since deployment was successful, remove the hook
ShutdownHookUtil
.
removeShutdownHook
(
deploymentFailureHook
,
getClass
().
getSimpleName
(),
LOG
);
return
report
;
}
/**
* Returns the configured remote target home directory if set, otherwise returns the default
* home directory.
*
* @param fileSystem file system used
* @return the remote target home directory
*/
private
Path
getStagingDir
(
FileSystem
fileSystem
)
{
final
String
configuredStagingDir
=
flinkConfiguration
.
getString
(
YarnConfigOptions
.
STAGING_DIRECTORY
);
return
configuredStagingDir
!=
null
?
fileSystem
.
makeQualified
(
new
Path
(
configuredStagingDir
))
:
fileSystem
.
getHomeDirectory
();
}
private
int
getFileReplication
()
{
final
int
yarnFileReplication
=
yarnConfiguration
.
getInt
(
DFSConfigKeys
.
DFS_REPLICATION_KEY
,
DFSConfigKeys
.
DFS_REPLICATION_DEFAULT
);
final
int
fileReplication
=
flinkConfiguration
.
getInteger
(
YarnConfigOptions
.
FILE_REPLICATION
);
return
fileReplication
>
0
?
fileReplication
:
yarnFileReplication
;
}
private
static
String
encodeYarnLocalResourceDescriptorListToString
(
List
<
YarnLocalResourceDescriptor
>
resources
)
{
return
String
.
join
(
LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR
,
resources
.
stream
()
.
map
(
YarnLocalResourceDescriptor:
:
toString
)
.
collect
(
Collectors
.
toList
()));
}
/**
* Kills YARN application and stops YARN client.
*
* <p>Use this method to kill the App before it has been properly deployed
*/
private
void
failSessionDuringDeployment
(
YarnClient
yarnClient
,
YarnClientApplication
yarnApplication
)
{
LOG
.
info
(
"Killing YARN application"
);
try
{
yarnClient
.
killApplication
(
yarnApplication
.
getNewApplicationResponse
().
getApplicationId
());
}
catch
(
Exception
e
)
{
// we only log a debug message here because the "killApplication" call is a best-effort
// call (we don't know if the application has been deployed when the error occurred).
LOG
.
debug
(
"Error while killing YARN application"
,
e
);
}
}
private
static
class
ClusterResourceDescription
{
public
final
int
totalFreeMemory
;
public
final
int
containerLimit
;
public
final
int
[]
nodeManagersFree
;
public
ClusterResourceDescription
(
int
totalFreeMemory
,
int
containerLimit
,
int
[]
nodeManagersFree
)
{
this
.
totalFreeMemory
=
totalFreeMemory
;
this
.
containerLimit
=
containerLimit
;
this
.
nodeManagersFree
=
nodeManagersFree
;
}
}
private
ClusterResourceDescription
getCurrentFreeClusterResources
(
YarnClient
yarnClient
)
throws
YarnException
,
IOException
{
List
<
NodeReport
>
nodes
=
yarnClient
.
getNodeReports
(
NodeState
.
RUNNING
);
int
totalFreeMemory
=
0
;
int
containerLimit
=
0
;
int
[]
nodeManagersFree
=
new
int
[
nodes
.
size
()];
for
(
int
i
=
0
;
i
<
nodes
.
size
();
i
++)
{
NodeReport
rep
=
nodes
.
get
(
i
);
int
free
=
rep
.
getCapability
().
getMemory
()
-
(
rep
.
getUsed
()
!=
null
?
rep
.
getUsed
().
getMemory
()
:
0
);
nodeManagersFree
[
i
]
=
free
;
totalFreeMemory
+=
free
;
if
(
free
>
containerLimit
)
{
containerLimit
=
free
;
}
}
return
new
ClusterResourceDescription
(
totalFreeMemory
,
containerLimit
,
nodeManagersFree
);
}
@Override
public
String
getClusterDescription
()
{
try
{
ByteArrayOutputStream
baos
=
new
ByteArrayOutputStream
();
PrintStream
ps
=
new
PrintStream
(
baos
);
YarnClusterMetrics
metrics
=
yarnClient
.
getYarnClusterMetrics
();
ps
.
append
(
"NodeManagers in the ClusterClient "
+
metrics
.
getNumNodeManagers
());
List
<
NodeReport
>
nodes
=
yarnClient
.
getNodeReports
(
NodeState
.
RUNNING
);
final
String
format
=
"|%-16s |%-16s %n"
;
ps
.
printf
(
"|Property |Value %n"
);
ps
.
println
(
"+---------------------------------------+"
);
int
totalMemory
=
0
;
int
totalCores
=
0
;
for
(
NodeReport
rep
:
nodes
)
{
final
Resource
res
=
rep
.
getCapability
();
totalMemory
+=
res
.
getMemory
();
totalCores
+=
res
.
getVirtualCores
();
ps
.
format
(
format
,
"NodeID"
,
rep
.
getNodeId
());
ps
.
format
(
format
,
"Memory"
,
res
.
getMemory
()
+
" MB"
);
ps
.
format
(
format
,
"vCores"
,
res
.
getVirtualCores
());
ps
.
format
(
format
,
"HealthReport"
,
rep
.
getHealthReport
());
ps
.
format
(
format
,
"Containers"
,
rep
.
getNumContainers
());
ps
.
println
(
"+---------------------------------------+"
);
}
ps
.
println
(
"Summary: totalMemory "
+
totalMemory
+
" totalCores "
+
totalCores
);
List
<
QueueInfo
>
qInfo
=
yarnClient
.
getAllQueues
();
for
(
QueueInfo
q
:
qInfo
)
{
ps
.
println
(
"Queue: "
+
q
.
getQueueName
()
+
", Current Capacity: "
+
q
.
getCurrentCapacity
()
+
" Max Capacity: "
+
q
.
getMaximumCapacity
()
+
" Applications: "
+
q
.
getApplications
().
size
());
}
return
baos
.
toString
();
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Couldn't get cluster description"
,
e
);
}
}
private
void
activateHighAvailabilitySupport
(
ApplicationSubmissionContext
appContext
)
throws
InvocationTargetException
,
IllegalAccessException
{
ApplicationSubmissionContextReflector
reflector
=
ApplicationSubmissionContextReflector
.
getInstance
();
reflector
.
setKeepContainersAcrossApplicationAttempts
(
appContext
,
true
);
reflector
.
setAttemptFailuresValidityInterval
(
appContext
,
flinkConfiguration
.
getLong
(
YarnConfigOptions
.
APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL
));
}
private
void
setApplicationTags
(
final
ApplicationSubmissionContext
appContext
)
throws
InvocationTargetException
,
IllegalAccessException
{
final
ApplicationSubmissionContextReflector
reflector
=
ApplicationSubmissionContextReflector
.
getInstance
();
final
String
tagsString
=
flinkConfiguration
.
getString
(
YarnConfigOptions
.
APPLICATION_TAGS
);
final
Set
<
String
>
applicationTags
=
new
HashSet
<>();
// Trim whitespace and cull empty tags
for
(
final
String
tag
:
tagsString
.
split
(
","
))
{
final
String
trimmedTag
=
tag
.
trim
();
if
(!
trimmedTag
.
isEmpty
())
{
applicationTags
.
add
(
trimmedTag
);
}
}
reflector
.
setApplicationTags
(
appContext
,
applicationTags
);
}
private
void
setApplicationNodeLabel
(
final
ApplicationSubmissionContext
appContext
)
throws
InvocationTargetException
,
IllegalAccessException
{
if
(
nodeLabel
!=
null
)
{
final
ApplicationSubmissionContextReflector
reflector
=
ApplicationSubmissionContextReflector
.
getInstance
();
reflector
.
setApplicationNodeLabel
(
appContext
,
nodeLabel
);
}
}
/**
* Singleton object which uses reflection to determine whether the {@link
* ApplicationSubmissionContext} supports various methods which, depending on the Hadoop
* version, may or may not be supported.
*
* <p>If an unsupported method is invoked, nothing happens.
*
* <p>Currently three methods are proxied: - setApplicationTags (>= 2.4.0) -
* setAttemptFailuresValidityInterval (>= 2.6.0) - setKeepContainersAcrossApplicationAttempts
* (>= 2.4.0) - setNodeLabelExpression (>= 2.6.0)
*/
private
static
class
ApplicationSubmissionContextReflector
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ApplicationSubmissionContextReflector
.
class
);
private
static
final
ApplicationSubmissionContextReflector
instance
=
new
ApplicationSubmissionContextReflector
(
ApplicationSubmissionContext
.
class
);
public
static
ApplicationSubmissionContextReflector
getInstance
()
{
return
instance
;
}
private
static
final
String
APPLICATION_TAGS_METHOD_NAME
=
"setApplicationTags"
;
private
static
final
String
ATTEMPT_FAILURES_METHOD_NAME
=
"setAttemptFailuresValidityInterval"
;
private
static
final
String
KEEP_CONTAINERS_METHOD_NAME
=
"setKeepContainersAcrossApplicationAttempts"
;
private
static
final
String
NODE_LABEL_EXPRESSION_NAME
=
"setNodeLabelExpression"
;
private
final
Method
applicationTagsMethod
;
private
final
Method
attemptFailuresValidityIntervalMethod
;
private
final
Method
keepContainersMethod
;
@Nullable
private
final
Method
nodeLabelExpressionMethod
;
private
ApplicationSubmissionContextReflector
(
Class
<
ApplicationSubmissionContext
>
clazz
)
{
Method
applicationTagsMethod
;
Method
attemptFailuresValidityIntervalMethod
;
Method
keepContainersMethod
;
Method
nodeLabelExpressionMethod
;
try
{
// this method is only supported by Hadoop 2.4.0 onwards
applicationTagsMethod
=
clazz
.
getMethod
(
APPLICATION_TAGS_METHOD_NAME
,
Set
.
class
);
LOG
.
debug
(
"{} supports method {}."
,
clazz
.
getCanonicalName
(),
APPLICATION_TAGS_METHOD_NAME
);
}
catch
(
NoSuchMethodException
e
)
{
LOG
.
debug
(
"{} does not support method {}."
,
clazz
.
getCanonicalName
(),
APPLICATION_TAGS_METHOD_NAME
);
// assign null because the Hadoop version apparently does not support this call.
applicationTagsMethod
=
null
;
}
this
.
applicationTagsMethod
=
applicationTagsMethod
;
try
{
// this method is only supported by Hadoop 2.6.0 onwards
attemptFailuresValidityIntervalMethod
=
clazz
.
getMethod
(
ATTEMPT_FAILURES_METHOD_NAME
,
long
.
class
);
LOG
.
debug
(
"{} supports method {}."
,
clazz
.
getCanonicalName
(),
ATTEMPT_FAILURES_METHOD_NAME
);
}
catch
(
NoSuchMethodException
e
)
{
LOG
.
debug
(
"{} does not support method {}."
,
clazz
.
getCanonicalName
(),
ATTEMPT_FAILURES_METHOD_NAME
);
// assign null because the Hadoop version apparently does not support this call.
attemptFailuresValidityIntervalMethod
=
null
;
}
this
.
attemptFailuresValidityIntervalMethod
=
attemptFailuresValidityIntervalMethod
;
try
{
// this method is only supported by Hadoop 2.4.0 onwards
keepContainersMethod
=
clazz
.
getMethod
(
KEEP_CONTAINERS_METHOD_NAME
,
boolean
.
class
);
LOG
.
debug
(
"{} supports method {}."
,
clazz
.
getCanonicalName
(),
KEEP_CONTAINERS_METHOD_NAME
);
}
catch
(
NoSuchMethodException
e
)
{
LOG
.
debug
(
"{} does not support method {}."
,
clazz
.
getCanonicalName
(),
KEEP_CONTAINERS_METHOD_NAME
);
// assign null because the Hadoop version apparently does not support this call.
keepContainersMethod
=
null
;
}
this
.
keepContainersMethod
=
keepContainersMethod
;
try
{
nodeLabelExpressionMethod
=
clazz
.
getMethod
(
NODE_LABEL_EXPRESSION_NAME
,
String
.
class
);
LOG
.
debug
(
"{} supports method {}."
,
clazz
.
getCanonicalName
(),
NODE_LABEL_EXPRESSION_NAME
);
}
catch
(
NoSuchMethodException
e
)
{
LOG
.
debug
(
"{} does not support method {}."
,
clazz
.
getCanonicalName
(),
NODE_LABEL_EXPRESSION_NAME
);
nodeLabelExpressionMethod
=
null
;
}
this
.
nodeLabelExpressionMethod
=
nodeLabelExpressionMethod
;
}
public
void
setApplicationTags
(
ApplicationSubmissionContext
appContext
,
Set
<
String
>
applicationTags
)
throws
InvocationTargetException
,
IllegalAccessException
{
if
(
applicationTagsMethod
!=
null
)
{
LOG
.
debug
(
"Calling method {} of {}."
,
applicationTagsMethod
.
getName
(),
appContext
.
getClass
().
getCanonicalName
());
applicationTagsMethod
.
invoke
(
appContext
,
applicationTags
);
}
else
{
LOG
.
debug
(
"{} does not support method {}. Doing nothing."
,
appContext
.
getClass
().
getCanonicalName
(),
APPLICATION_TAGS_METHOD_NAME
);
}
}
public
void
setApplicationNodeLabel
(
ApplicationSubmissionContext
appContext
,
String
nodeLabel
)
throws
InvocationTargetException
,
IllegalAccessException
{
if
(
nodeLabelExpressionMethod
!=
null
)
{
LOG
.
debug
(
"Calling method {} of {}."
,
nodeLabelExpressionMethod
.
getName
(),
appContext
.
getClass
().
getCanonicalName
());
nodeLabelExpressionMethod
.
invoke
(
appContext
,
nodeLabel
);
}
else
{
LOG
.
debug
(
"{} does not support method {}. Doing nothing."
,
appContext
.
getClass
().
getCanonicalName
(),
NODE_LABEL_EXPRESSION_NAME
);
}
}
public
void
setAttemptFailuresValidityInterval
(
ApplicationSubmissionContext
appContext
,
long
validityInterval
)
throws
InvocationTargetException
,
IllegalAccessException
{
if
(
attemptFailuresValidityIntervalMethod
!=
null
)
{
LOG
.
debug
(
"Calling method {} of {}."
,
attemptFailuresValidityIntervalMethod
.
getName
(),
appContext
.
getClass
().
getCanonicalName
());
attemptFailuresValidityIntervalMethod
.
invoke
(
appContext
,
validityInterval
);
}
else
{
LOG
.
debug
(
"{} does not support method {}. Doing nothing."
,
appContext
.
getClass
().
getCanonicalName
(),
ATTEMPT_FAILURES_METHOD_NAME
);
}
}
public
void
setKeepContainersAcrossApplicationAttempts
(
ApplicationSubmissionContext
appContext
,
boolean
keepContainers
)
throws
InvocationTargetException
,
IllegalAccessException
{
if
(
keepContainersMethod
!=
null
)
{
LOG
.
debug
(
"Calling method {} of {}."
,
keepContainersMethod
.
getName
(),
appContext
.
getClass
().
getCanonicalName
());
keepContainersMethod
.
invoke
(
appContext
,
keepContainers
);
}
else
{
LOG
.
debug
(
"{} does not support method {}. Doing nothing."
,
appContext
.
getClass
().
getCanonicalName
(),
KEEP_CONTAINERS_METHOD_NAME
);
}
}
}
private
static
class
YarnDeploymentException
extends
RuntimeException
{
private
static
final
long
serialVersionUID
=
-
812040641215388943L
;
public
YarnDeploymentException
(
String
message
)
{
super
(
message
);
}
public
YarnDeploymentException
(
String
message
,
Throwable
cause
)
{
super
(
message
,
cause
);
}
}
private
class
DeploymentFailureHook
extends
Thread
{
private
final
YarnClient
yarnClient
;
private
final
YarnClientApplication
yarnApplication
;
private
final
Path
yarnFilesDir
;
DeploymentFailureHook
(
YarnClientApplication
yarnApplication
,
Path
yarnFilesDir
)
{
this
.
yarnApplication
=
Preconditions
.
checkNotNull
(
yarnApplication
);
this
.
yarnFilesDir
=
Preconditions
.
checkNotNull
(
yarnFilesDir
);
// A new yarn client need to be created in shutdown hook in order to avoid
// the yarn client has been closed by YarnClusterDescriptor.
this
.
yarnClient
=
YarnClient
.
createYarnClient
();
this
.
yarnClient
.
init
(
yarnConfiguration
);
}
@Override
public
void
run
()
{
LOG
.
info
(
"Cancelling deployment from Deployment Failure Hook"
);
yarnClient
.
start
();
failSessionDuringDeployment
(
yarnClient
,
yarnApplication
);
yarnClient
.
stop
();
LOG
.
info
(
"Deleting files in {}."
,
yarnFilesDir
);
try
{
FileSystem
fs
=
FileSystem
.
get
(
yarnConfiguration
);
if
(!
fs
.
delete
(
yarnFilesDir
,
true
))
{
throw
new
IOException
(
"Deleting files in "
+
yarnFilesDir
+
" was unsuccessful"
);
}
fs
.
close
();
}
catch
(
IOException
e
)
{
LOG
.
error
(
"Failed to delete Flink Jar and configuration files in HDFS"
,
e
);
}
}
}
@VisibleForTesting
void
addLibFoldersToShipFiles
(
Collection
<
File
>
effectiveShipFiles
)
{
// Add lib folder to the ship files if the environment variable is set.
// This is for convenience when running from the command-line.
// (for other files users explicitly set the ship files)
String
libDir
=
System
.
getenv
().
get
(
ENV_FLINK_LIB_DIR
);
if
(
libDir
!=
null
)
{
File
directoryFile
=
new
File
(
libDir
);
if
(
directoryFile
.
isDirectory
())
{
effectiveShipFiles
.
add
(
directoryFile
);
}
else
{
throw
new
YarnDeploymentException
(
"The environment variable '"
+
ENV_FLINK_LIB_DIR
+
"' is set to '"
+
libDir
+
"' but the directory doesn't exist."
);
}
}
else
if
(
shipFiles
.
isEmpty
())
{
LOG
.
warn
(
"Environment variable '{}' not set and ship files have not been provided manually. "
+
"Not shipping any library files."
,
ENV_FLINK_LIB_DIR
);
}
}
@VisibleForTesting
void
addPluginsFoldersToShipFiles
(
Collection
<
File
>
effectiveShipFiles
)
{
final
Optional
<
File
>
pluginsDir
=
PluginConfig
.
getPluginsDir
();
pluginsDir
.
ifPresent
(
effectiveShipFiles:
:
add
);
}
ContainerLaunchContext
setupApplicationMasterContainer
(
String
yarnClusterEntrypoint
,
boolean
hasKrb5
,
JobManagerProcessSpec
processSpec
)
{
// ------------------ Prepare Application Master Container ------------------------------
// respect custom JVM options in the YAML file
String
javaOpts
=
flinkConfiguration
.
getString
(
CoreOptions
.
FLINK_JVM_OPTIONS
);
if
(
flinkConfiguration
.
getString
(
CoreOptions
.
FLINK_JM_JVM_OPTIONS
).
length
()
>
0
)
{
javaOpts
+=
" "
+
flinkConfiguration
.
getString
(
CoreOptions
.
FLINK_JM_JVM_OPTIONS
);
}
// krb5.conf file will be available as local resource in JM/TM container
if
(
hasKrb5
)
{
javaOpts
+=
" -Djava.security.krb5.conf=krb5.conf"
;
}
// Set up the container launch context for the application master
ContainerLaunchContext
amContainer
=
Records
.
newRecord
(
ContainerLaunchContext
.
class
);
final
Map
<
String
,
String
>
startCommandValues
=
new
HashMap
<>();
startCommandValues
.
put
(
"java"
,
"$JAVA_HOME/bin/java"
);
String
jvmHeapMem
=
JobManagerProcessUtils
.
generateJvmParametersStr
(
processSpec
,
flinkConfiguration
);
startCommandValues
.
put
(
"jvmmem"
,
jvmHeapMem
);
startCommandValues
.
put
(
"jvmopts"
,
javaOpts
);
startCommandValues
.
put
(
"logging"
,
YarnLogConfigUtil
.
getLoggingYarnCommand
(
flinkConfiguration
));
startCommandValues
.
put
(
"class"
,
yarnClusterEntrypoint
);
startCommandValues
.
put
(
"redirects"
,
"1> "
+
ApplicationConstants
.
LOG_DIR_EXPANSION_VAR
+
"/jobmanager.out "
+
"2> "
+
ApplicationConstants
.
LOG_DIR_EXPANSION_VAR
+
"/jobmanager.err"
);
String
dynamicParameterListStr
=
JobManagerProcessUtils
.
generateDynamicConfigsStr
(
processSpec
);
startCommandValues
.
put
(
"args"
,
dynamicParameterListStr
);
final
String
commandTemplate
=
flinkConfiguration
.
getString
(
ConfigConstants
.
YARN_CONTAINER_START_COMMAND_TEMPLATE
,
ConfigConstants
.
DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE
);
final
String
amCommand
=
BootstrapTools
.
getStartCommand
(
commandTemplate
,
startCommandValues
);
amContainer
.
setCommands
(
Collections
.
singletonList
(
amCommand
));
LOG
.
debug
(
"Application Master start command: "
+
amCommand
);
return
amContainer
;
}
private
static
YarnConfigOptions
.
UserJarInclusion
getUserJarInclusionMode
(
org
.
apache
.
flink
.
configuration
.
Configuration
config
)
{
return
config
.
getEnum
(
YarnConfigOptions
.
UserJarInclusion
.
class
,
YarnConfigOptions
.
CLASSPATH_INCLUDE_USER_JAR
);
}
private
static
boolean
isUsrLibDirIncludedInShipFiles
(
List
<
File
>
shipFiles
)
{
return
shipFiles
.
stream
()
.
filter
(
File:
:
isDirectory
)
.
map
(
File:
:
getName
)
.
noneMatch
(
name
->
name
.
equals
(
DEFAULT_FLINK_USR_LIB_DIR
));
}
private
void
setClusterEntrypointInfoToConfig
(
final
ApplicationReport
report
)
{
checkNotNull
(
report
);
final
ApplicationId
appId
=
report
.
getApplicationId
();
final
String
host
=
report
.
getHost
();
final
int
port
=
report
.
getRpcPort
();
LOG
.
info
(
"Found Web Interface {}:{} of application '{}'."
,
host
,
port
,
appId
);
flinkConfiguration
.
setString
(
JobManagerOptions
.
ADDRESS
,
host
);
flinkConfiguration
.
setInteger
(
JobManagerOptions
.
PORT
,
port
);
flinkConfiguration
.
setString
(
RestOptions
.
ADDRESS
,
host
);
flinkConfiguration
.
setInteger
(
RestOptions
.
PORT
,
port
);
flinkConfiguration
.
set
(
YarnConfigOptions
.
APPLICATION_ID
,
ConverterUtils
.
toString
(
appId
));
setHAClusterIdIfNotSet
(
flinkConfiguration
,
appId
);
}
private
void
setHAClusterIdIfNotSet
(
Configuration
configuration
,
ApplicationId
appId
)
{
// set cluster-id to app id if not specified
if
(!
configuration
.
contains
(
HighAvailabilityOptions
.
HA_CLUSTER_ID
))
{
configuration
.
set
(
HighAvailabilityOptions
.
HA_CLUSTER_ID
,
ConverterUtils
.
toString
(
appId
));
}
}
public
static
void
logDetachedClusterInformation
(
ApplicationId
yarnApplicationId
,
Logger
logger
)
{
logger
.
info
(
"The Flink YARN session cluster has been started in detached mode. In order to "
+
"stop Flink gracefully, use the following command:\n"
+
"$ echo \"stop\" | ./bin/yarn-session.sh -id {}\n"
+
"If this should not be possible, then you can also kill Flink via YARN's web interface or via:\n"
+
"$ yarn application -kill {}\n"
+
"Note that killing Flink might not clean up all job artifacts and temporary files."
,
yarnApplicationId
,
yarnApplicationId
);
}
}
dlink-gateway/src/main/resources/META-INF/services/com.dlink.gateway.Gateway
0 → 100644
View file @
59c7b06d
com.dlink.gateway.yarn.YarnApplicationGateway
\ No newline at end of file
dlink-gateway/src/test/java/com/dlink/gateway/GatewayTest.java
0 → 100644
View file @
59c7b06d
package
com
.
dlink
.
gateway
;
import
org.junit.Test
;
/**
* GatewayTest
*
* @author qiwenkai
* @since 2021/10/29 17:06
**/
public
class
GatewayTest
{
@Test
public
void
getTest
(){
GatewayConfig
config
=
new
GatewayConfig
();
config
.
setJobName
(
"apptest"
);
config
.
setType
(
GatewayType
.
get
(
"yarn-application"
));
config
.
setConfigDir
(
"/opt/src/flink-1.12.2_pj/conf"
);
config
.
setUserJarPath
(
"hdfs:///flink12/jar/currencyAppJar.jar"
);
config
.
setUserJarParas
(
"--id 2410,2412,2411"
.
split
(
"\\s+"
));
config
.
setUserJarMainAppClass
(
"com.app.MainApp"
);
String
longValue
=
Gateway
.
build
(
config
).
getType
().
getLongValue
();
System
.
out
.
println
(
longValue
);
}
}
dlink-web/src/components/Studio/StudioConsole/StudioProcess/index.tsx
View file @
59c7b06d
...
@@ -50,10 +50,13 @@ const StudioProcess = (props: any) => {
...
@@ -50,10 +50,13 @@ const StudioProcess = (props: any) => {
(
row
.
state
==
'INITIALIZE'
)
?
(
row
.
state
==
'INITIALIZE'
)
?
(<
Tag
icon=
{
<
ClockCircleOutlined
/>
}
color=
"default"
>
(<
Tag
icon=
{
<
ClockCircleOutlined
/>
}
color=
"default"
>
INITIALIZE
INITIALIZE
</
Tag
>)
:
</
Tag
>)
:(
row
.
state
==
'RESTARTING'
)
?
(<
Tag
color=
"default"
>
(<
Tag
icon=
{
<
ClockCircleOutlined
/>
}
color=
"default"
>
UNKNOWEN
RESTARTING
</
Tag
>)
</
Tag
>)
:
(<
Tag
color=
"default"
>
UNKNOWEN
</
Tag
>)
}
</>)
}
</>)
;
;
}
}
...
...
pom.xml
View file @
59c7b06d
...
@@ -10,17 +10,20 @@
...
@@ -10,17 +10,20 @@
<version>
0.3.2
</version>
<version>
0.3.2
</version>
<modules>
<modules>
<module>
dlink-core
</module>
<module>
dlink-common
</module>
<module>
dlink-connectors
</module>
<module>
dlink-executor
</module>
<module>
dlink-client
</module>
<module>
dlink-client
</module>
<module>
dlink-function
</module>
<module>
dlink-function
</module>
<module>
dlink-common
</module>
<module>
dlink-metadata
</module>
<module>
dlink-metadata
</module>
<module>
dlink-gateway
</module>
<module>
dlink-connectors
</module>
<module>
dlink-executor
</module>
<module>
dlink-extends
</module>
<module>
dlink-extends
</module>
<module>
dlink-core
</module>
<module>
dlink-app
</module>
<module>
dlink-web
</module>
<module>
dlink-web
</module>
<module>
dlink-admin
</module>
<module>
dlink-admin
</module>
<module>
dlink-assembly
</module>
<module>
dlink-assembly
</module>
</modules>
</modules>
<properties>
<properties>
...
@@ -204,6 +207,11 @@
...
@@ -204,6 +207,11 @@
<artifactId>
dlink-metadata-mysql
</artifactId>
<artifactId>
dlink-metadata-mysql
</artifactId>
<version>
${project.version}
</version>
<version>
${project.version}
</version>
</dependency>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-gateway
</artifactId>
<version>
${project.version}
</version>
</dependency>
</dependencies>
</dependencies>
</dependencyManagement>
</dependencyManagement>
<build>
<build>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment