Unverified Commit 521b4d6f authored by cong's avatar cong Committed by GitHub

[Fix][dlink-gateway]修复k8s application模式提交失败,优化增加获取JobId等待时间 (#972)

* [Fix][dlink-gateway]修复k8s Application、Yarn Application、Yarn PerJob提交任务时未指定集群配置MasterMemoryMB、TaskManagerMemoryMB、SlotsPerTaskManager

* [Fix][dlink-gateway]修复k8s Application、Yarn Application、Yarn PerJob提交任务时未指定集群配置MasterMemoryMB、TaskManagerMemoryMB、SlotsPerTaskManager

* [Fix][dlink-gateway]修复k8s application模式提交失败,优化增加获取JobId等待时间,

* [Fix][dlink-common]扩展系统配置参数添加jobid的最大等待时间
parent 0cbf09f2
......@@ -49,6 +49,7 @@ public class SystemConfiguration {
add(systemConfiguration.useRestAPI);
add(systemConfiguration.useLogicalPlan);
add(systemConfiguration.sqlSeparator);
add(systemConfiguration.jobIdWait);
}
};
......@@ -94,6 +95,13 @@ public class SystemConfiguration {
false,
"在计算 Flink 任务的字段血缘分析时是否基于逻辑计划进行,只支持 1.14 版本"
);
private Configuration jobIdWait = new Configuration(
"jobIdWait",
"获取 Job ID 的最大等待时间(秒)",
ValueType.INT,
30,
"提交 Application 或 PerJob 任务时获取 Job ID 的最大等待时间(秒)"
);
public void setConfiguration(JsonNode jsonNode) {
for (Configuration item : CONFIGURATION_LIST) {
......@@ -172,6 +180,14 @@ public class SystemConfiguration {
this.useLogicalPlan.setValue(useLogicalPlan);
}
public int getJobIdWait() {
return (int )jobIdWait.getValue();
}
public void setJobIdWait(Configuration jobIdWait) {
this.jobIdWait.setValue(jobIdWait);
}
enum ValueType {
STRING, INT, DOUBLE, FLOAT, BOOLEAN, DATE
}
......
......@@ -25,6 +25,7 @@ import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.KubernetesResult;
import com.dlink.model.SystemConfiguration;
import com.dlink.utils.LogUtil;
import org.apache.flink.client.deployment.ClusterSpecification;
......@@ -82,7 +83,7 @@ public class KubernetesApplicationGateway extends KubernetesGateway {
ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient();
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
int counts = 10;
int counts = SystemConfiguration.getInstances().getJobIdWait();
while (jobStatusMessages.size() == 0 && counts > 0) {
Thread.sleep(1000);
counts--;
......
......@@ -26,6 +26,7 @@ import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult;
import com.dlink.model.SystemConfiguration;
import com.dlink.utils.LogUtil;
import org.apache.flink.client.deployment.ClusterSpecification;
......@@ -96,7 +97,7 @@ public class YarnApplicationGateway extends YarnGateway {
applicationConfiguration);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
int counts = 30;
int counts = SystemConfiguration.getInstances().getJobIdWait();
while (jobStatusMessages.size() == 0 && counts > 0) {
Thread.sleep(1000);
counts--;
......
......@@ -25,6 +25,7 @@ import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult;
import com.dlink.model.SystemConfiguration;
import com.dlink.utils.LogUtil;
import org.apache.flink.client.deployment.ClusterSpecification;
......@@ -81,7 +82,7 @@ public class YarnPerJobGateway extends YarnGateway {
result.setAppId(applicationId.toString());
result.setWebURL(clusterClient.getWebInterfaceURL());
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
int counts = 10;
int counts = SystemConfiguration.getInstances().getJobIdWait();
while (jobStatusMessages.size() == 0 && counts > 0) {
Thread.sleep(1000);
counts--;
......
......@@ -30,6 +30,7 @@ type FlinkConfigProps = {
useRestAPI: SettingsStateType['useRestAPI'];
useLogicalPlan: SettingsStateType['useLogicalPlan'];
sqlSeparator: SettingsStateType['sqlSeparator'];
jobIdWait: SettingsStateType['jobIdWait'];
dispatch: any;
};
......@@ -42,6 +43,7 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
useRestAPI,
useLogicalPlan,
sqlSeparator,
jobIdWait,
dispatch
} = props;
const [editName, setEditName] = useState<string>('');
......@@ -127,6 +129,20 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
checked={useLogicalPlan}
/></Form.Item>],
},
{
title: '获取 Job ID 的最大等待时间(秒)',
description: (
editName != 'jobIdWait' ?
(jobIdWait ? jobIdWait : '30') : (
<Input
id='jobIdWait'
defaultValue={jobIdWait}
onChange={onChange}
placeholder="30"/>)),
actions: editName != 'jobIdWait' ? [<a onClick={({}) => handleEditClick('jobIdWait')}>修改</a>] :
[<a onClick={({}) => handleSaveClick('jobIdWait')}>保存</a>,
<a onClick={({}) => handleCancelClick()}>取消</a>],
},
];
const onChange = e => {
......@@ -189,4 +205,5 @@ export default connect(({Settings}: { Settings: SettingsStateType }) => ({
useRestAPI: Settings.useRestAPI,
useLogicalPlan: Settings.useLogicalPlan,
sqlSeparator: Settings.sqlSeparator,
jobIdWait: Settings.jobIdWait,
}))(FlinkConfigView);
......@@ -26,6 +26,7 @@ export type SettingsStateType = {
useRestAPI: boolean,
useLogicalPlan: boolean,
sqlSeparator: string,
jobIdWait: number,
};
export type ModelType = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment