Unverified Commit 7b389ffc authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-127][core,executor] Add executeAsync datastream job

[Fix-127][core,executor] Add executeAsync datastream job
parents 50316a0c 62ab5f87
...@@ -35,6 +35,7 @@ import org.apache.flink.api.common.JobExecutionResult; ...@@ -35,6 +35,7 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
...@@ -355,9 +356,9 @@ public class JobManager { ...@@ -355,9 +356,9 @@ public class JobManager {
break; break;
} }
} }
JobExecutionResult jobExecutionResult = executor.execute(config.getJobName()); JobClient jobClient = executor.executeAsync(config.getJobName());
if (jobExecutionResult.isJobExecutionResult()) { if (Asserts.isNotNull(jobClient)) {
job.setJobId(jobExecutionResult.getJobID().toHexString()); job.setJobId(jobClient.getJobID().toHexString());
job.setJids(new ArrayList<String>() {{ job.setJids(new ArrayList<String>() {{
add(job.getJobId()); add(job.getJobId());
}}); }});
......
...@@ -11,6 +11,7 @@ import org.apache.flink.api.common.ExecutionConfig; ...@@ -11,6 +11,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
...@@ -206,6 +207,10 @@ public abstract class Executor { ...@@ -206,6 +207,10 @@ public abstract class Executor {
return environment.execute(jobName); return environment.execute(jobName);
} }
public JobClient executeAsync(String jobName) throws Exception {
return environment.executeAsync(jobName);
}
public TableResult executeSql(String statement) { public TableResult executeSql(String statement) {
statement = pretreatStatement(statement); statement = pretreatStatement(statement);
FlinkInterceptorResult flinkInterceptorResult = pretreatExecute(statement); FlinkInterceptorResult flinkInterceptorResult = pretreatExecute(statement);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment