Commit 9df82827 authored by wenmo's avatar wenmo

[fix-214][gateway,common] Failed to submit fink jar when configuration has empty string

parent 5277697c
...@@ -25,10 +25,32 @@ public class Asserts { ...@@ -25,10 +25,32 @@ public class Asserts {
return isNull(str) || "".equals(str); return isNull(str) || "".equals(str);
} }
public static boolean isAllNullString(String... str) {
boolean isNull = true;
for (String item : str) {
if (isNotNullString(item)) {
isNull = false;
}
}
return isNull;
}
public static boolean isNotNullString(String str) { public static boolean isNotNullString(String str) {
return !isNullString(str); return !isNullString(str);
} }
public static boolean isAllNotNullString(String... str) {
boolean isNotNull = true;
for (String item : str) {
if (isNullString(item)) {
isNotNull = false;
}
}
return isNotNull;
}
public static boolean isEquals(String str1, String str2) { public static boolean isEquals(String str1, String str2) {
if (isNull(str1) && isNull(str2)) { if (isNull(str1) && isNull(str2)) {
return true; return true;
......
...@@ -7,6 +7,7 @@ import com.dlink.gateway.exception.GatewayException; ...@@ -7,6 +7,7 @@ import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.KubernetesResult; import com.dlink.gateway.result.KubernetesResult;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
...@@ -47,7 +48,11 @@ public class KubernetesApplicationGateway extends KubernetesGateway { ...@@ -47,7 +48,11 @@ public class KubernetesApplicationGateway extends KubernetesGateway {
AppConfig appConfig = config.getAppConfig(); AppConfig appConfig = config.getAppConfig();
configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath())); configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(appConfig.getUserJarParas(), appConfig.getUserJarMainAppClass()); String[] userJarParas = appConfig.getUserJarParas();
if (Asserts.isNull(userJarParas)) {
userJarParas = new String[0];
}
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass());
KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration, client); KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration, client);
try { try {
ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration); ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
......
...@@ -9,6 +9,7 @@ import com.dlink.gateway.model.JobInfo; ...@@ -9,6 +9,7 @@ import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult; import com.dlink.gateway.result.TestResult;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.CheckpointingOptions;
...@@ -78,7 +79,9 @@ public abstract class KubernetesGateway extends AbstractGateway { ...@@ -78,7 +79,9 @@ public abstract class KubernetesGateway extends AbstractGateway {
private void addConfigParas(Map<String, String> configMap) { private void addConfigParas(Map<String, String> configMap) {
if (Asserts.isNotNull(configMap)) { if (Asserts.isNotNull(configMap)) {
for (Map.Entry<String, String> entry : configMap.entrySet()) { for (Map.Entry<String, String> entry : configMap.entrySet()) {
this.configuration.setString(entry.getKey(), entry.getValue()); if (Asserts.isAllNotNullString(entry.getKey(), entry.getValue())) {
this.configuration.setString(entry.getKey(), entry.getValue());
}
} }
} }
} }
...@@ -100,7 +103,7 @@ public abstract class KubernetesGateway extends AbstractGateway { ...@@ -100,7 +103,7 @@ public abstract class KubernetesGateway extends AbstractGateway {
} }
KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration); KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
try (ClusterClient<String> clusterClient = clusterDescriptor.retrieve( try (ClusterClient<String> clusterClient = clusterDescriptor.retrieve(
clusterId).getClusterClient()) { clusterId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>(); List<JobInfo> jobInfos = new ArrayList<>();
CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs(); CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs();
for (JobStatusMessage jobStatusMessage : listJobsFuture.get()) { for (JobStatusMessage jobStatusMessage : listJobsFuture.get()) {
......
...@@ -8,6 +8,7 @@ import com.dlink.gateway.exception.GatewayException; ...@@ -8,6 +8,7 @@ import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult; import com.dlink.gateway.result.YarnResult;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.ApplicationConfiguration;
...@@ -59,13 +60,17 @@ public class YarnApplicationGateway extends YarnGateway { ...@@ -59,13 +60,17 @@ public class YarnApplicationGateway extends YarnGateway {
AppConfig appConfig = config.getAppConfig(); AppConfig appConfig = config.getAppConfig();
configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath())); configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(appConfig.getUserJarParas(), appConfig.getUserJarMainAppClass()); String[] userJarParas = appConfig.getUserJarParas();
if (Asserts.isNull(userJarParas)) {
userJarParas = new String[0];
}
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass());
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true); configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try { try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster( ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification, clusterSpecification,
applicationConfiguration); applicationConfiguration);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get(); Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
int counts = 10; int counts = 10;
......
...@@ -9,6 +9,7 @@ import com.dlink.gateway.model.JobInfo; ...@@ -9,6 +9,7 @@ import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult; import com.dlink.gateway.result.TestResult;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.CheckpointingOptions;
...@@ -110,7 +111,9 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -110,7 +111,9 @@ public abstract class YarnGateway extends AbstractGateway {
private void addConfigParas(Map<String, String> configMap) { private void addConfigParas(Map<String, String> configMap) {
if (Asserts.isNotNull(configMap)) { if (Asserts.isNotNull(configMap)) {
for (Map.Entry<String, String> entry : configMap.entrySet()) { for (Map.Entry<String, String> entry : configMap.entrySet()) {
this.configuration.setString(entry.getKey(), entry.getValue()); if (Asserts.isAllNotNullString(entry.getKey(), entry.getValue())) {
this.configuration.setString(entry.getKey(), entry.getValue());
}
} }
} }
} }
...@@ -134,15 +137,15 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -134,15 +137,15 @@ public abstract class YarnGateway extends AbstractGateway {
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration); ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (applicationId == null) { if (applicationId == null) {
throw new GatewayException( throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect."); "No cluster id was specified. Please specify a cluster to which you would like to connect.");
} }
/*YarnClusterDescriptor clusterDescriptor = clusterClientFactory /*YarnClusterDescriptor clusterDescriptor = clusterClientFactory
.createClusterDescriptor( .createClusterDescriptor(
configuration);*/ configuration);*/
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true); configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try (ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve( try (ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()) { applicationId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>(); List<JobInfo> jobInfos = new ArrayList<>();
CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs(); CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs();
for (JobStatusMessage jobStatusMessage : listJobsFuture.get()) { for (JobStatusMessage jobStatusMessage : listJobsFuture.get()) {
...@@ -173,7 +176,7 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -173,7 +176,7 @@ public abstract class YarnGateway extends AbstractGateway {
} }
if (Asserts.isNull(config.getFlinkConfig().getJobId())) { if (Asserts.isNull(config.getFlinkConfig().getJobId())) {
throw new GatewayException( throw new GatewayException(
"No job id was specified. Please specify a job to which you would like to savepont."); "No job id was specified. Please specify a job to which you would like to savepont.");
} }
/*if(Asserts.isNotNullString(config.getClusterConfig().getYarnConfigPath())) { /*if(Asserts.isNotNullString(config.getClusterConfig().getYarnConfigPath())) {
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getYarnConfigPath()); configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getYarnConfigPath());
...@@ -186,15 +189,15 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -186,15 +189,15 @@ public abstract class YarnGateway extends AbstractGateway {
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration); ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(applicationId)) { if (Asserts.isNull(applicationId)) {
throw new GatewayException( throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect."); "No cluster id was specified. Please specify a cluster to which you would like to connect.");
} }
/*YarnClusterDescriptor clusterDescriptor = clusterClientFactory /*YarnClusterDescriptor clusterDescriptor = clusterClientFactory
.createClusterDescriptor( .createClusterDescriptor(
configuration);*/ configuration);*/
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true); configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try (ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve( try (ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()) { applicationId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>(); List<JobInfo> jobInfos = new ArrayList<>();
jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(), JobInfo.JobStatus.FAIL)); jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(), JobInfo.JobStatus.FAIL));
runSavePointJob(jobInfos, clusterClient, savePoint); runSavePointJob(jobInfos, clusterClient, savePoint);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment