Unverified Commit 40211153 authored by cong's avatar cong Committed by GitHub

[Fix][dlink-gateway]修复k8s Application、Yarn Application、Yarn...

[Fix][dlink-gateway]修复k8s Application、Yarn Application、Yarn PerJob提交任务时未指定集群配置MasterMemoryMB、TaskManagerMemoryMB、SlotsPerTaskManager (#970)

* [Fix][dlink-gateway]修复k8s Application、Yarn Application、Yarn PerJob提交任务时未指定集群配置MasterMemoryMB、TaskManagerMemoryMB、SlotsPerTaskManager
parent 12887ef5
...@@ -31,7 +31,9 @@ import org.apache.flink.client.deployment.ClusterSpecification; ...@@ -31,7 +31,9 @@ import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor; import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
...@@ -66,13 +68,16 @@ public class KubernetesApplicationGateway extends KubernetesGateway { ...@@ -66,13 +68,16 @@ public class KubernetesApplicationGateway extends KubernetesGateway {
KubernetesResult result = KubernetesResult.build(getType()); KubernetesResult result = KubernetesResult.build(getType());
AppConfig appConfig = config.getAppConfig(); AppConfig appConfig = config.getAppConfig();
configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath())); configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
String[] userJarParas = appConfig.getUserJarParas(); String[] userJarParas = appConfig.getUserJarParas();
if (Asserts.isNull(userJarParas)) { if (Asserts.isNull(userJarParas)) {
userJarParas = new String[0]; userJarParas = new String[0];
} }
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass()); ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass());
KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration, client); KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration, client);
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes())
.setTaskManagerMemoryMB(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes())
.setSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS)).createClusterSpecification();
try { try {
ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration); ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient(); ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient();
......
...@@ -32,7 +32,9 @@ import org.apache.flink.client.deployment.ClusterSpecification; ...@@ -32,7 +32,9 @@ import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
...@@ -77,7 +79,6 @@ public class YarnApplicationGateway extends YarnGateway { ...@@ -77,7 +79,6 @@ public class YarnApplicationGateway extends YarnGateway {
YarnResult result = YarnResult.build(getType()); YarnResult result = YarnResult.build(getType());
AppConfig appConfig = config.getAppConfig(); AppConfig appConfig = config.getAppConfig();
configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath())); configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
String[] userJarParas = appConfig.getUserJarParas(); String[] userJarParas = appConfig.getUserJarParas();
if (Asserts.isNull(userJarParas)) { if (Asserts.isNull(userJarParas)) {
userJarParas = new String[0]; userJarParas = new String[0];
...@@ -85,6 +86,10 @@ public class YarnApplicationGateway extends YarnGateway { ...@@ -85,6 +86,10 @@ public class YarnApplicationGateway extends YarnGateway {
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass()); ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass());
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true); configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes())
.setTaskManagerMemoryMB(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes())
.setSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS)).createClusterSpecification();
try { try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster( ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification, clusterSpecification,
......
...@@ -30,6 +30,8 @@ import com.dlink.utils.LogUtil; ...@@ -30,6 +30,8 @@ import com.dlink.utils.LogUtil;
import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
...@@ -66,9 +68,12 @@ public class YarnPerJobGateway extends YarnGateway { ...@@ -66,9 +68,12 @@ public class YarnPerJobGateway extends YarnGateway {
init(); init();
} }
YarnResult result = YarnResult.build(getType()); YarnResult result = YarnResult.build(getType());
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true); configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes())
.setTaskManagerMemoryMB(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes())
.setSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS)).createClusterSpecification();
try { try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true); ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment