Commit 3b1e1308 authored by wenmo's avatar wenmo

解决perjob和application模式的任务名无法自定义的问题

parent 7f3f3a74
......@@ -126,12 +126,38 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
</dependency>
<dependency>
<!--<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-clickhouse</artifactId>
<version>0.5.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>1.13.3</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-3-uber</artifactId>
<version>3.1.1.7.2.1.0-327-9.0</version>
&lt;!&ndash; <scope>test</scope>&ndash;&gt;
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>-->
</dependencies>
<build>
<plugins>
......
......@@ -37,8 +37,8 @@ public class Submiter {
if (id == null) {
throw new SQLException("请指定任务ID");
}
return "select id, name, alias, type,check_point as checkPoint," +
"save_point_path as savePointPath, parallelism,fragment,statement_set as statementSet,config_json as config" +
return "select id, name, alias as jobName, type,check_point as checkpoint," +
"save_point_path as savePointPath, parallelism,fragment as useSqlFragment,statement_set as useStatementSet,config_json as config" +
" from dlink_task where id = " + id;
}
......
......@@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
......@@ -134,7 +135,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
if(tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
......
......@@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
......@@ -134,7 +135,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
if(tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
......
......@@ -6,6 +6,7 @@ import org.apache.flink.api.dag.Transformation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
......@@ -137,7 +138,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
if(tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
......
......@@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
......@@ -192,7 +193,11 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof DefaultExecutor){
return ((DefaultExecutor) execEnv).getExecutionEnvironment().generateStreamGraph(trans);
StreamGraph streamGraph = ((DefaultExecutor) execEnv).getExecutionEnvironment().generateStreamGraph(trans);
if(tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
......
......@@ -9,6 +9,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -141,8 +142,9 @@ public abstract class Executor {
useSqlFragment = executorSetting.isUseSqlFragment();
stEnvironment = CustomTableEnvironmentImpl.create(environment);
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
stEnvironment.getConfig().getConfiguration().setString(PipelineOptions.NAME.key(), executorSetting.getJobName());
}
setConfig.put(PipelineOptions.NAME.key(),executorSetting.getJobName());
if(executorSetting.getConfig()!=null){
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
......@@ -154,8 +156,9 @@ public abstract class Executor {
useSqlFragment = executorSetting.isUseSqlFragment();
copyCatalog();
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
stEnvironment.getConfig().getConfiguration().setString(PipelineOptions.NAME.key(), executorSetting.getJobName());
}
setConfig.put(PipelineOptions.NAME.key(),executorSetting.getJobName());
if(executorSetting.getConfig()!=null){
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
......
......@@ -502,6 +502,9 @@ export default (): React.ReactNode => {
<li>
<Link>解决set在perjob和application模式不生效的问题</Link>
</li>
<li>
<Link>解决perjob和application模式的任务名无法自定义的问题</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment