Commit 51b383b4 authored by wenmo's avatar wenmo

解决set在perjob和application模式不生效的问题

parent a43bde30
...@@ -15,6 +15,7 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -15,6 +15,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
...@@ -75,6 +76,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -75,6 +76,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
} }
} }
config.buildGatewayConfig(gatewayConfig); config.buildGatewayConfig(gatewayConfig);
config.addGatewayConfig(task.parseConfig());
} }
switch (config.getSavePointStrategy()) { switch (config.getSavePointStrategy()) {
case LATEST: case LATEST:
......
...@@ -11,6 +11,7 @@ import com.dlink.session.SessionConfig; ...@@ -11,6 +11,7 @@ import com.dlink.session.SessionConfig;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
...@@ -168,4 +169,22 @@ public class JobConfig { ...@@ -168,4 +169,22 @@ public class JobConfig {
gatewayConfig.setFlinkConfig(FlinkConfig.build((Map<String, String>)config.get("flinkConfig"))); gatewayConfig.setFlinkConfig(FlinkConfig.build((Map<String, String>)config.get("flinkConfig")));
} }
} }
public void addGatewayConfig(List<Map<String, String>> configList){
if(Asserts.isNull(gatewayConfig)){
gatewayConfig = new GatewayConfig();
}
for(Map<String, String> item : configList){
gatewayConfig.getFlinkConfig().getConfiguration().put(item.get("key"),item.get("value"));
}
}
public void addGatewayConfig(Map<String, Object> config){
if(Asserts.isNull(gatewayConfig)){
gatewayConfig = new GatewayConfig();
}
for (Map.Entry<String, Object> entry : config.entrySet()) {
gatewayConfig.getFlinkConfig().getConfiguration().put(entry.getKey(), (String) entry.getValue());
}
}
} }
...@@ -237,6 +237,7 @@ public class JobManager { ...@@ -237,6 +237,7 @@ public class JobManager {
if (GatewayType.YARN_APPLICATION.equals(runMode)) { if (GatewayType.YARN_APPLICATION.equals(runMode)) {
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar(); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
} else { } else {
config.addGatewayConfig(executor.getSetConfig());
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
} }
job.setResult(InsertResult.success(gatewayResult.getAppId())); job.setResult(InsertResult.success(gatewayResult.getAppId()));
...@@ -272,6 +273,7 @@ public class JobManager { ...@@ -272,6 +273,7 @@ public class JobManager {
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())) { if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())) {
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar(); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
} else { } else {
config.addGatewayConfig(executor.getSetConfig());
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
} }
job.setResult(InsertResult.success(gatewayResult.getAppId())); job.setResult(InsertResult.success(gatewayResult.getAppId()));
......
...@@ -39,6 +39,7 @@ public abstract class Executor { ...@@ -39,6 +39,7 @@ public abstract class Executor {
protected CustomTableEnvironmentImpl stEnvironment; protected CustomTableEnvironmentImpl stEnvironment;
protected EnvironmentSetting environmentSetting; protected EnvironmentSetting environmentSetting;
protected ExecutorSetting executorSetting; protected ExecutorSetting executorSetting;
protected Map<String,Object> setConfig = new HashMap<>();
protected SqlManager sqlManager = new SqlManager(); protected SqlManager sqlManager = new SqlManager();
protected boolean useSqlFragment = true; protected boolean useSqlFragment = true;
...@@ -92,6 +93,14 @@ public abstract class Executor { ...@@ -92,6 +93,14 @@ public abstract class Executor {
return environmentSetting; return environmentSetting;
} }
public Map<String, Object> getSetConfig() {
return setConfig;
}
public void setSetConfig(Map<String, Object> setConfig) {
this.setConfig = setConfig;
}
protected void init(){ protected void init(){
initEnvironment(); initEnvironment();
initStreamExecutionEnvironment(); initStreamExecutionEnvironment();
...@@ -293,6 +302,7 @@ public abstract class Executor { ...@@ -293,6 +302,7 @@ public abstract class Executor {
String value = setOperation.getValue().get().trim(); String value = setOperation.getValue().get().trim();
Map<String,String> confMap = new HashMap<>(); Map<String,String> confMap = new HashMap<>();
confMap.put(key,value); confMap.put(key,value);
setConfig.put(key,value);
Configuration configuration = Configuration.fromMap(confMap); Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null); environment.getConfig().configure(configuration,null);
stEnvironment.getConfig().addConfiguration(configuration); stEnvironment.getConfig().addConfiguration(configuration);
...@@ -300,6 +310,16 @@ public abstract class Executor { ...@@ -300,6 +310,16 @@ public abstract class Executor {
} }
private void callReset(ResetOperation resetOperation) { private void callReset(ResetOperation resetOperation) {
// to do nothing if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,null);
setConfig.remove(key);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
stEnvironment.getConfig().addConfiguration(configuration);
}else {
setConfig.clear();
}
} }
} }
...@@ -27,7 +27,7 @@ public class FlinkConfig { ...@@ -27,7 +27,7 @@ public class FlinkConfig {
private SavePointType savePointType; private SavePointType savePointType;
private String savePoint; private String savePoint;
// private List<ConfigPara> configParas; // private List<ConfigPara> configParas;
private Map<String, String> configuration; private Map<String, String> configuration = new HashMap<>();
private static final ObjectMapper mapper = new ObjectMapper(); private static final ObjectMapper mapper = new ObjectMapper();
......
...@@ -2,18 +2,14 @@ package com.dlink.gateway.yarn; ...@@ -2,18 +2,14 @@ package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.gateway.AbstractGateway; import com.dlink.gateway.AbstractGateway;
import com.dlink.gateway.config.ConfigPara;
import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.ActionType; import com.dlink.gateway.config.ActionType;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException; import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult; import com.dlink.gateway.result.TestResult;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobStatusMessage;
...@@ -28,9 +24,7 @@ import org.apache.hadoop.service.Service; ...@@ -28,9 +24,7 @@ import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
......
...@@ -493,6 +493,15 @@ export default (): React.ReactNode => { ...@@ -493,6 +493,15 @@ export default (): React.ReactNode => {
<li> <li>
<Link>解决 Yarn Application 解析数组异常问题</Link> <Link>解决 Yarn Application 解析数组异常问题</Link>
</li> </li>
<li>
<Link>解决自定义Jar配置为空会导致异常的bug</Link>
</li>
<li>
<Link>解决任务提交失败时注册集群报错的bug</Link>
</li>
<li>
<Link>解决set在perjob和application模式不生效的问题</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment