Commit 88912918 authored by wenmo's avatar wenmo

解决会话执行NPE

parent b1a4d242
...@@ -8,10 +8,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -8,10 +8,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
...@@ -109,6 +111,7 @@ public abstract class Executor { ...@@ -109,6 +111,7 @@ public abstract class Executor {
} }
private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting){ private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting){
copyCatalog();
if(executorSetting.isUseSqlFragment()){ if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment(); stEnvironment.useSqlFragment();
}else{ }else{
...@@ -124,6 +127,17 @@ public abstract class Executor { ...@@ -124,6 +127,17 @@ public abstract class Executor {
} }
} }
private void copyCatalog(){
String[] catalogs = stEnvironment.listCatalogs();
CustomTableEnvironmentImpl newstEnvironment = CustomTableEnvironmentImpl.create(environment);
for (int i = 0; i < catalogs.length; i++) {
if(stEnvironment.getCatalog(catalogs[i]).isPresent()) {
newstEnvironment.getCatalogManager().unregisterCatalog(catalogs[i],true);
newstEnvironment.registerCatalog(catalogs[i], stEnvironment.getCatalog(catalogs[i]).get());
}
}
stEnvironment = newstEnvironment;
}
public JobExecutionResult execute(String jobName) throws Exception{ public JobExecutionResult execute(String jobName) throws Exception{
return stEnvironment.execute(jobName); return stEnvironment.execute(jobName);
} }
......
...@@ -21,7 +21,7 @@ public class ExecutorSetting { ...@@ -21,7 +21,7 @@ public class ExecutorSetting {
private String savePointPath; private String savePointPath;
private String jobName; private String jobName;
private Map<String,String> config; private Map<String,String> config;
public static final ExecutorSetting DEFAULT = new ExecutorSetting(true); public static final ExecutorSetting DEFAULT = new ExecutorSetting(0,1,true);
public ExecutorSetting(boolean useSqlFragment) { public ExecutorSetting(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment; this.useSqlFragment = useSqlFragment;
...@@ -36,7 +36,13 @@ public class ExecutorSetting { ...@@ -36,7 +36,13 @@ public class ExecutorSetting {
this.useSqlFragment = useSqlFragment; this.useSqlFragment = useSqlFragment;
} }
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath,String jobName) { public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName) {
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.parallelism = parallelism; this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment; this.useSqlFragment = useSqlFragment;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment