Unverified Commit 59dd7adc authored by aiwenmo's avatar aiwenmo Committed by GitHub

Merge pull request #217 from aiwenmo/dev

[fix-214][gateway,common] Failed to submit fink jar when configuratio…
parents 40b0726e 9df82827
......@@ -25,10 +25,32 @@ public class Asserts {
return isNull(str) || "".equals(str);
}
public static boolean isAllNullString(String... str) {
boolean isNull = true;
for (String item : str) {
if (isNotNullString(item)) {
isNull = false;
}
}
return isNull;
}
public static boolean isNotNullString(String str) {
return !isNullString(str);
}
public static boolean isAllNotNullString(String... str) {
boolean isNotNull = true;
for (String item : str) {
if (isNullString(item)) {
isNotNull = false;
}
}
return isNotNull;
}
public static boolean isEquals(String str1, String str2) {
if (isNull(str1) && isNull(str2)) {
return true;
......
......@@ -7,6 +7,7 @@ import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.KubernetesResult;
import com.dlink.utils.LogUtil;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
......@@ -47,7 +48,11 @@ public class KubernetesApplicationGateway extends KubernetesGateway {
AppConfig appConfig = config.getAppConfig();
configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(appConfig.getUserJarParas(), appConfig.getUserJarMainAppClass());
String[] userJarParas = appConfig.getUserJarParas();
if (Asserts.isNull(userJarParas)) {
userJarParas = new String[0];
}
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass());
KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration, client);
try {
ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
......
......@@ -9,6 +9,7 @@ import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import com.dlink.utils.LogUtil;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
......@@ -78,7 +79,9 @@ public abstract class KubernetesGateway extends AbstractGateway {
private void addConfigParas(Map<String, String> configMap) {
if (Asserts.isNotNull(configMap)) {
for (Map.Entry<String, String> entry : configMap.entrySet()) {
this.configuration.setString(entry.getKey(), entry.getValue());
if (Asserts.isAllNotNullString(entry.getKey(), entry.getValue())) {
this.configuration.setString(entry.getKey(), entry.getValue());
}
}
}
}
......@@ -100,7 +103,7 @@ public abstract class KubernetesGateway extends AbstractGateway {
}
KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
try (ClusterClient<String> clusterClient = clusterDescriptor.retrieve(
clusterId).getClusterClient()) {
clusterId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>();
CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs();
for (JobStatusMessage jobStatusMessage : listJobsFuture.get()) {
......
......@@ -8,6 +8,7 @@ import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
......@@ -59,13 +60,17 @@ public class YarnApplicationGateway extends YarnGateway {
AppConfig appConfig = config.getAppConfig();
configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(appConfig.getUserJarParas(), appConfig.getUserJarMainAppClass());
String[] userJarParas = appConfig.getUserJarParas();
if (Asserts.isNull(userJarParas)) {
userJarParas = new String[0];
}
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass());
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
applicationConfiguration);
clusterSpecification,
applicationConfiguration);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
int counts = 10;
......
......@@ -9,6 +9,7 @@ import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import com.dlink.utils.LogUtil;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
......@@ -110,7 +111,9 @@ public abstract class YarnGateway extends AbstractGateway {
private void addConfigParas(Map<String, String> configMap) {
if (Asserts.isNotNull(configMap)) {
for (Map.Entry<String, String> entry : configMap.entrySet()) {
this.configuration.setString(entry.getKey(), entry.getValue());
if (Asserts.isAllNotNullString(entry.getKey(), entry.getValue())) {
this.configuration.setString(entry.getKey(), entry.getValue());
}
}
}
}
......@@ -134,15 +137,15 @@ public abstract class YarnGateway extends AbstractGateway {
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (applicationId == null) {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
/*YarnClusterDescriptor clusterDescriptor = clusterClientFactory
.createClusterDescriptor(
configuration);*/
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try (ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()) {
applicationId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>();
CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs();
for (JobStatusMessage jobStatusMessage : listJobsFuture.get()) {
......@@ -173,7 +176,7 @@ public abstract class YarnGateway extends AbstractGateway {
}
if (Asserts.isNull(config.getFlinkConfig().getJobId())) {
throw new GatewayException(
"No job id was specified. Please specify a job to which you would like to savepont.");
"No job id was specified. Please specify a job to which you would like to savepont.");
}
/*if(Asserts.isNotNullString(config.getClusterConfig().getYarnConfigPath())) {
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getYarnConfigPath());
......@@ -186,15 +189,15 @@ public abstract class YarnGateway extends AbstractGateway {
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(applicationId)) {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
/*YarnClusterDescriptor clusterDescriptor = clusterClientFactory
.createClusterDescriptor(
configuration);*/
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try (ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()) {
applicationId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>();
jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(), JobInfo.JobStatus.FAIL));
runSavePointJob(jobInfos, clusterClient, savePoint);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment