Commit 26fb0d36 authored by godkaikai's avatar godkaikai

JobManager优化

parent c35d0f33
...@@ -47,7 +47,6 @@ ...@@ -47,7 +47,6 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId> <artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<!--<scope>provided</scope>-->
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
...@@ -81,7 +80,7 @@ ...@@ -81,7 +80,7 @@
</dependency> </dependency>
</dependencies> </dependencies>
<build> <!--<build>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
...@@ -136,6 +135,6 @@ ...@@ -136,6 +135,6 @@
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>-->
</project> </project>
\ No newline at end of file
...@@ -48,7 +48,6 @@ ...@@ -48,7 +48,6 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId> <artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<!--<scope>provided</scope>-->
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
...@@ -72,60 +71,16 @@ ...@@ -72,60 +71,16 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
&lt;!&ndash;<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
</exclusions>&ndash;&gt;
&lt;!&ndash;<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
</exclusions>&ndash;&gt;
</dependency>-->
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<!--<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>-->
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId> <artifactId>dlink-common</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<!--<build> <!--<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
<finalName>${project.artifactId}</finalName>
</build>-->
<build>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
...@@ -136,7 +91,7 @@ ...@@ -136,7 +91,7 @@
<target>1.8</target> <target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
<!--打jar包--> &lt;!&ndash;打jar包&ndash;&gt;
<plugin> <plugin>
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-assembly-plugin</artifactId>
<configuration> <configuration>
...@@ -182,6 +137,6 @@ ...@@ -182,6 +137,6 @@
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>-->
</project> </project>
\ No newline at end of file
...@@ -48,7 +48,6 @@ ...@@ -48,7 +48,6 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId> <artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<!--<scope>provided</scope>-->
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
...@@ -81,7 +80,7 @@ ...@@ -81,7 +80,7 @@
<artifactId>dlink-common</artifactId> <artifactId>dlink-common</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <!--<build>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
...@@ -92,7 +91,7 @@ ...@@ -92,7 +91,7 @@
<target>1.8</target> <target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
<!--打jar包--> &lt;!&ndash;打jar包&ndash;&gt;
<plugin> <plugin>
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-assembly-plugin</artifactId>
<configuration> <configuration>
...@@ -138,6 +137,5 @@ ...@@ -138,6 +137,5 @@
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>-->
</project> </project>
\ No newline at end of file
...@@ -47,7 +47,6 @@ ...@@ -47,7 +47,6 @@
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId> <artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<!--<scope>provided</scope>-->
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
...@@ -80,7 +79,7 @@ ...@@ -80,7 +79,7 @@
<artifactId>dlink-common</artifactId> <artifactId>dlink-common</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <!--<build>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
...@@ -91,7 +90,7 @@ ...@@ -91,7 +90,7 @@
<target>1.8</target> <target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
<!--打jar包--> &lt;!&ndash;打jar包&ndash;&gt;
<plugin> <plugin>
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-assembly-plugin</artifactId>
<configuration> <configuration>
...@@ -137,7 +136,5 @@ ...@@ -137,7 +136,5 @@
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>-->
</project> </project>
\ No newline at end of file
package com.dlink.utils;
/**
* RunTimeUtil
*
* @author wenmo
* @since 2021/12/11
**/
public class RunTimeUtil {
public static void recovery(Object obj){
obj = null;
System.gc();
}
}
...@@ -42,7 +42,7 @@ import java.util.List; ...@@ -42,7 +42,7 @@ import java.util.List;
* @author wenmo * @author wenmo
* @since 2021/5/25 15:27 * @since 2021/5/25 15:27
**/ **/
public class JobManager extends RunTime { public class JobManager {
private static final Logger logger = LoggerFactory.getLogger(JobManager.class); private static final Logger logger = LoggerFactory.getLogger(JobManager.class);
...@@ -54,6 +54,8 @@ public class JobManager extends RunTime { ...@@ -54,6 +54,8 @@ public class JobManager extends RunTime {
private Executor executor; private Executor executor;
private boolean useGateway = false; private boolean useGateway = false;
private boolean isPlanMode = false; private boolean isPlanMode = false;
private boolean useStatementSet = false;
private boolean useRestAPI = false;
private GatewayType runMode = GatewayType.LOCAL; private GatewayType runMode = GatewayType.LOCAL;
public JobManager() { public JobManager() {
...@@ -63,10 +65,34 @@ public class JobManager extends RunTime { ...@@ -63,10 +65,34 @@ public class JobManager extends RunTime {
this.useGateway = useGateway; this.useGateway = useGateway;
} }
public boolean isUseGateway() {
return useGateway;
}
public void setPlanMode(boolean planMode) { public void setPlanMode(boolean planMode) {
isPlanMode = planMode; isPlanMode = planMode;
} }
public boolean isPlanMode() {
return isPlanMode;
}
public boolean isUseStatementSet() {
return useStatementSet;
}
public void setUseStatementSet(boolean useStatementSet) {
this.useStatementSet = useStatementSet;
}
public boolean isUseRestAPI() {
return useRestAPI;
}
public void setUseRestAPI(boolean useRestAPI) {
this.useRestAPI = useRestAPI;
}
public JobManager(JobConfig config) { public JobManager(JobConfig config) {
this.config = config; this.config = config;
} }
...@@ -145,56 +171,52 @@ public class JobManager extends RunTime { ...@@ -145,56 +171,52 @@ public class JobManager extends RunTime {
executorSetting = config.getExecutorSetting(); executorSetting = config.getExecutorSetting();
} }
@Override
public boolean init() { public boolean init() {
if(!isPlanMode) { if(!isPlanMode) {
runMode = GatewayType.get(config.getType()); runMode = GatewayType.get(config.getType());
useGateway = useGateway(config.getType()); useGateway = useGateway(config.getType());
handler = JobHandler.build(); handler = JobHandler.build();
} }
useStatementSet = config.isUseStatementSet();
useRestAPI = config.isUseRestAPI();
initExecutorSetting(); initExecutorSetting();
createExecutorWithSession(); createExecutorWithSession();
return false; return false;
} }
@Override private boolean ready() {
public boolean ready() {
return handler.init(); return handler.init();
} }
@Override private boolean success() {
public boolean success() {
return handler.success(); return handler.success();
} }
@Override private boolean failed() {
public boolean failed() {
return handler.failed(); return handler.failed();
} }
@Override
public boolean close() { public boolean close() {
JobContextHolder.clear(); JobContextHolder.clear();
return false; return false;
} }
public JobResult executeSql(String statement) { public JobResult executeSql(String statement) {
Job job = Job.init(GatewayType.get(config.getType()), config, executorSetting, executor, statement, useGateway); Job job = Job.init(runMode, config, executorSetting, executor, statement, useGateway);
JobContextHolder.setJob(job);
if (!useGateway) { if (!useGateway) {
job.setJobManagerAddress(environmentSetting.getAddress()); job.setJobManagerAddress(environmentSetting.getAddress());
} }
JobContextHolder.setJob(job);
ready(); ready();
String currentSql = ""; String currentSql = "";
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement)); JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
try { try {
for (StatementParam item : jobParam.getDdl()) { for (StatementParam item : jobParam.getDdl()) {
currentSql = item.getValue(); currentSql = item.getValue();
executor.executeSql(item.getValue()); executor.executeSql(item.getValue());
} }
if (jobParam.getTrans().size() > 0) { if (jobParam.getTrans().size() > 0) {
if (config.isUseStatementSet() && useGateway) { if (useStatementSet && useGateway) {
List<String> inserts = new ArrayList<>(); List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) { for (StatementParam item : jobParam.getTrans()) {
inserts.add(item.getValue()); inserts.add(item.getValue());
...@@ -202,7 +224,7 @@ public class JobManager extends RunTime { ...@@ -202,7 +224,7 @@ public class JobManager extends RunTime {
currentSql = String.join(FlinkSQLConstant.SEPARATOR, inserts); currentSql = String.join(FlinkSQLConstant.SEPARATOR, inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts); JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = null; GatewayResult gatewayResult = null;
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())) { if (GatewayType.YARN_APPLICATION.equals(runMode)) {
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar(); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
} else { } else {
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
...@@ -210,18 +232,16 @@ public class JobManager extends RunTime { ...@@ -210,18 +232,16 @@ public class JobManager extends RunTime {
job.setResult(InsertResult.success(gatewayResult.getAppId())); job.setResult(InsertResult.success(gatewayResult.getAppId()));
job.setJobId(gatewayResult.getAppId()); job.setJobId(gatewayResult.getAppId());
job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL())); job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL()));
} else if (config.isUseStatementSet() && !useGateway) { } else if (useStatementSet && !useGateway) {
List<String> inserts = new ArrayList<>(); List<String> inserts = new ArrayList<>();
StatementSet statementSet = stEnvironment.createStatementSet();
for (StatementParam item : jobParam.getTrans()) { for (StatementParam item : jobParam.getTrans()) {
if (item.getType().equals(SqlType.INSERT)) { if (item.getType().equals(SqlType.INSERT)) {
statementSet.addInsertSql(item.getValue());
inserts.add(item.getValue()); inserts.add(item.getValue());
} }
} }
if (inserts.size() > 0) { if (inserts.size() > 0) {
currentSql = String.join(FlinkSQLConstant.SEPARATOR, inserts); currentSql = String.join(FlinkSQLConstant.SEPARATOR, inserts);
TableResult tableResult = statementSet.execute(); TableResult tableResult = executor.executeStatementSet(inserts);
if (tableResult.getJobClient().isPresent()) { if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
} }
...@@ -230,7 +250,7 @@ public class JobManager extends RunTime { ...@@ -230,7 +250,7 @@ public class JobManager extends RunTime {
job.setResult(result); job.setResult(result);
} }
} }
} else if (!config.isUseStatementSet() && useGateway) { } else if (!useStatementSet && useGateway) {
List<String> inserts = new ArrayList<>(); List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) { for (StatementParam item : jobParam.getTrans()) {
inserts.add(item.getValue()); inserts.add(item.getValue());
...@@ -297,14 +317,14 @@ public class JobManager extends RunTime { ...@@ -297,14 +317,14 @@ public class JobManager extends RunTime {
List<StatementParam> ddl = new ArrayList<>(); List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>(); List<StatementParam> trans = new ArrayList<>();
for (String item : statements) { for (String item : statements) {
String statement = FlinkInterceptor.pretreatStatement(executor, item); String statement = executor.pretreatStatement(item);
if (statement.isEmpty()) { if (statement.isEmpty()) {
continue; continue;
} }
SqlType operationType = Operations.getOperationType(statement); SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) { if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
trans.add(new StatementParam(statement, operationType)); trans.add(new StatementParam(statement, operationType));
if (!config.isUseStatementSet()) { if (!useStatementSet) {
break; break;
} }
} else { } else {
...@@ -315,18 +335,19 @@ public class JobManager extends RunTime { ...@@ -315,18 +335,19 @@ public class JobManager extends RunTime {
} }
public IResult executeDDL(String statement) { public IResult executeDDL(String statement) {
String[] statements = statement.split(";"); String[] statements = SqlUtil.getStatements(statement);
try { try {
for (String item : statements) { for (String item : statements) {
if (item.trim().isEmpty()) { String newStatement = executor.pretreatStatement(item);
if (newStatement.trim().isEmpty()) {
continue; continue;
} }
SqlType operationType = Operations.getOperationType(item); SqlType operationType = Operations.getOperationType(newStatement);
if (SqlType.INSERT == operationType || SqlType.SELECT == operationType) { if (SqlType.INSERT == operationType || SqlType.SELECT == operationType) {
continue; continue;
} }
LocalDateTime startTime = LocalDateTime.now(); LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(item); TableResult tableResult = executor.executeSql(newStatement);
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult); IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult);
result.setStartTime(startTime); result.setStartTime(startTime);
return result; return result;
...@@ -373,7 +394,7 @@ public class JobManager extends RunTime { ...@@ -373,7 +394,7 @@ public class JobManager extends RunTime {
} }
public boolean cancel(String jobId) { public boolean cancel(String jobId) {
if (useGateway && !config.isUseRestAPI()) { if (useGateway && !useRestAPI) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(), config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(),
null, null)); null, null));
Gateway.build(config.getGatewayConfig()).savepointJob(); Gateway.build(config.getGatewayConfig()).savepointJob();
...@@ -389,7 +410,7 @@ public class JobManager extends RunTime { ...@@ -389,7 +410,7 @@ public class JobManager extends RunTime {
} }
public SavePointResult savepoint(String jobId,String savePointType) { public SavePointResult savepoint(String jobId,String savePointType) {
if (useGateway && !config.isUseRestAPI()) { if (useGateway && !useRestAPI) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(), config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(),
savePointType, null)); savePointType, null));
return Gateway.build(config.getGatewayConfig()).savepointJob(); return Gateway.build(config.getGatewayConfig()).savepointJob();
...@@ -399,7 +420,7 @@ public class JobManager extends RunTime { ...@@ -399,7 +420,7 @@ public class JobManager extends RunTime {
} }
public JobResult executeJar() { public JobResult executeJar() {
Job job = Job.init(GatewayType.get(config.getType()), config, executorSetting, executor, null, useGateway); Job job = Job.init(runMode, config, executorSetting, executor, null, useGateway);
JobContextHolder.setJob(job); JobContextHolder.setJob(job);
ready(); ready();
try { try {
......
...@@ -162,7 +162,7 @@ public abstract class Executor { ...@@ -162,7 +162,7 @@ public abstract class Executor {
stEnvironment = newstEnvironment; stEnvironment = newstEnvironment;
} }
private String pretreatStatement(String statement){ public String pretreatStatement(String statement){
return FlinkInterceptor.pretreatStatement(this,statement); return FlinkInterceptor.pretreatStatement(this,statement);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment