Unverified Commit 1b28572c authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-108][gateway] Fix to cancel per-job bug

[Fix-108][gateway] Fix to cancel per-job bug
parents 77823144 611b6c0e
package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.AbstractGateway;
import com.dlink.gateway.config.ActionType;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import com.dlink.utils.LogUtil;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.DeploymentOptions;
......@@ -35,8 +26,25 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.AbstractGateway;
import com.dlink.gateway.config.ActionType;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import com.dlink.utils.LogUtil;
/**
* YarnSubmiter
......@@ -205,6 +213,13 @@ public abstract class YarnGateway extends AbstractGateway {
} catch (Exception e) {
result.fail(LogUtil.getError(e));
}
if (ActionType.CANCEL == config.getFlinkConfig().getAction() || SavePointType.CANCEL.equals(config.getFlinkConfig().getSavePointType())) {
try {
autoCancelCluster(clusterDescriptor.retrieve(applicationId).getClusterClient());
} catch (ClusterRetrieveException e) {
e.printStackTrace();
}
}
return result;
}
......@@ -235,6 +250,19 @@ public abstract class YarnGateway extends AbstractGateway {
}
}
private void autoCancelCluster(ClusterClient<ApplicationId> clusterClient) {
Executors.newCachedThreadPool().submit(() -> {
try {
Thread.sleep(3000);
clusterClient.shutDownCluster();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
clusterClient.close();
}
});
}
public TestResult test() {
try {
initConfig();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment