Commit 0f217e3f authored by wenmo's avatar wenmo

会话优化

parent 2b9841c5
...@@ -13,6 +13,10 @@ import com.dlink.model.Task; ...@@ -13,6 +13,10 @@ import com.dlink.model.Task;
*/ */
public interface Assert { public interface Assert {
static boolean checkNotNull(Object object){
return object!=null;
}
static void check(Cluster cluster) { static void check(Cluster cluster) {
if (cluster.getId() == null) { if (cluster.getId() == null) {
throw new BusException("Flink 集群【" + cluster.getId() + "】不存在"); throw new BusException("Flink 集群【" + cluster.getId() + "】不存在");
......
...@@ -4,6 +4,8 @@ import com.dlink.job.JobConfig; ...@@ -4,6 +4,8 @@ import com.dlink.job.JobConfig;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.util.Map;
/** /**
* StudioExecuteDTO * StudioExecuteDTO
* *
...@@ -26,6 +28,7 @@ public class StudioExecuteDTO { ...@@ -26,6 +28,7 @@ public class StudioExecuteDTO {
private Integer checkPoint; private Integer checkPoint;
private Integer parallelism; private Integer parallelism;
private String savePointPath; private String savePointPath;
// private Map<String,String> config;
public JobConfig getJobConfig() { public JobConfig getJobConfig() {
return new JobConfig(useResult, useSession, session, useRemote, clusterId, taskId, jobName, fragment, maxRowNum, checkPoint, parallelism, savePointPath); return new JobConfig(useResult, useSession, session, useRemote, clusterId, taskId, jobName, fragment, maxRowNum, checkPoint, parallelism, savePointPath);
......
...@@ -49,7 +49,7 @@ public class Task extends SuperEntity{ ...@@ -49,7 +49,7 @@ public class Task extends SuperEntity{
@TableField(exist = false) @TableField(exist = false)
private String clusterName; private String clusterName;
public ExecutorSetting getExecutorSetting(){ public ExecutorSetting buildExecutorSetting(){
HashMap configMap = new HashMap(); HashMap configMap = new HashMap();
if(config!=null&&!"".equals(clusterName)) { if(config!=null&&!"".equals(clusterName)) {
configMap = JSONUtil.toBean(config, HashMap.class); configMap = JSONUtil.toBean(config, HashMap.class);
...@@ -57,7 +57,7 @@ public class Task extends SuperEntity{ ...@@ -57,7 +57,7 @@ public class Task extends SuperEntity{
return new ExecutorSetting(checkPoint,parallelism,fragment,savePointPath,alias,configMap); return new ExecutorSetting(checkPoint,parallelism,fragment,savePointPath,alias,configMap);
} }
public JobConfig getSubmitConfig(){ public JobConfig buildSubmitConfig(){
boolean useRemote = true; boolean useRemote = true;
if(clusterId==null||clusterId==0){ if(clusterId==null||clusterId==0){
useRemote = false; useRemote = false;
......
...@@ -41,31 +41,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -41,31 +41,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override @Override
public JobResult submitByTaskId(Integer id) { public JobResult submitByTaskId(Integer id) {
/*Task task = this.getById(id);
Assert.check(task);
Cluster cluster = clusterService.getById(task.getClusterId());
Statement statement = statementService.getById(id);
Assert.check(statement);
if(cluster!=null) {
String host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
Assert.checkHost(host);
if (!host.equals(cluster.getJobManagerHost())) {
cluster.setJobManagerHost(host);
clusterService.updateById(cluster);
}
JobManager jobManager = new JobManager(host,task.getExecutorSetting());
return jobManager.submit(statement.getStatement());
}else if(task.getClusterId()==0){
JobManager jobManager = new JobManager();
return jobManager.submit(statement.getStatement());
}else{
throw new BusException("该任务的集群不存在");
}*/
Task task = this.getById(id); Task task = this.getById(id);
Assert.check(task); Assert.check(task);
Statement statement = statementService.getById(id); Statement statement = statementService.getById(id);
Assert.check(statement); Assert.check(statement);
JobConfig config = task.getSubmitConfig(); JobConfig config = task.buildSubmitConfig();
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(),task.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(),task.getClusterId()));
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(statement.getStatement()); return jobManager.executeSql(statement.getStatement());
......
...@@ -55,10 +55,10 @@ public final class SqlManager { ...@@ -55,10 +55,10 @@ public final class SqlManager {
"sql fragment name cannot be null or empty."); "sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null"); checkNotNull(sqlFragment, "sql fragment cannot be null");
if (sqlFragments.containsKey(sqlFragmentName)) { /*if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException( throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName)); format("The fragment of sql %s already exists.", sqlFragmentName));
} }*/
sqlFragments.put(sqlFragmentName, sqlFragment); sqlFragments.put(sqlFragmentName, sqlFragment);
} }
......
...@@ -10,6 +10,10 @@ import com.dlink.exception.JobException; ...@@ -10,6 +10,10 @@ import com.dlink.exception.JobException;
*/ */
public class Asserts { public class Asserts {
public static boolean checkNotNull(Object object){
return object!=null;
}
public static void checkNull(String key,String msg) { public static void checkNull(String key,String msg) {
if (key == null||"".equals(key)) { if (key == null||"".equals(key)) {
throw new JobException(msg); throw new JobException(msg);
......
...@@ -26,7 +26,6 @@ public abstract class Executor { ...@@ -26,7 +26,6 @@ public abstract class Executor {
protected EnvironmentSetting environmentSetting; protected EnvironmentSetting environmentSetting;
protected ExecutorSetting executorSetting; protected ExecutorSetting executorSetting;
public static Executor build(){ public static Executor build(){
return new LocalStreamExecutor(ExecutorSetting.DEFAULT); return new LocalStreamExecutor(ExecutorSetting.DEFAULT);
} }
...@@ -69,6 +68,11 @@ public abstract class Executor { ...@@ -69,6 +68,11 @@ public abstract class Executor {
initStreamExecutionEnvironment(); initStreamExecutionEnvironment();
} }
public void update(ExecutorSetting executorSetting){
updateEnvironment(executorSetting);
updateStreamExecutionEnvironment(executorSetting);
}
private void initEnvironment(){ private void initEnvironment(){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){ if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint()); environment.enableCheckpointing(executorSetting.getCheckpoint());
...@@ -78,6 +82,15 @@ public abstract class Executor { ...@@ -78,6 +82,15 @@ public abstract class Executor {
} }
} }
private void updateEnvironment(ExecutorSetting executorSetting){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
}
private void initStreamExecutionEnvironment(){ private void initStreamExecutionEnvironment(){
stEnvironment = CustomTableEnvironmentImpl.create(environment); stEnvironment = CustomTableEnvironmentImpl.create(environment);
if(executorSetting.isUseSqlFragment()){ if(executorSetting.isUseSqlFragment()){
...@@ -95,6 +108,22 @@ public abstract class Executor { ...@@ -95,6 +108,22 @@ public abstract class Executor {
} }
} }
private void updateStreamExecutionEnvironment(ExecutorSetting executorSetting){
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
}
if(executorSetting.getConfig()!=null){
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
}
}
}
public JobExecutionResult execute(String jobName) throws Exception{ public JobExecutionResult execute(String jobName) throws Exception{
return stEnvironment.execute(jobName); return stEnvironment.execute(jobName);
} }
......
package com.dlink.executor; package com.dlink.executor;
import com.dlink.job.JobConfig;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
......
...@@ -16,4 +16,5 @@ public class LocalStreamExecutor extends Executor { ...@@ -16,4 +16,5 @@ public class LocalStreamExecutor extends Executor {
this.environment = StreamExecutionEnvironment.createLocalEnvironment(); this.environment = StreamExecutionEnvironment.createLocalEnvironment();
init(); init();
} }
} }
...@@ -16,25 +16,6 @@ public class RemoteStreamExecutor extends Executor { ...@@ -16,25 +16,6 @@ public class RemoteStreamExecutor extends Executor {
this.executorSetting = executorSetting; this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort()); this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
init(); init();
/*synchronized (RemoteStreamExecutor.class){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(stEnvironment == null){
stEnvironment = CustomTableEnvironmentImpl.create(environment);
}
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
}
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
}*/
} }
} }
package com.dlink.job; package com.dlink.job;
import com.dlink.executor.EnvironmentSetting;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.util.Map;
/** /**
* JobConfig * JobConfig
* *
...@@ -29,6 +32,7 @@ public class JobConfig { ...@@ -29,6 +32,7 @@ public class JobConfig {
private Integer checkpoint; private Integer checkpoint;
private Integer parallelism; private Integer parallelism;
private String savePointPath; private String savePointPath;
//private Map<String,String> config;
public JobConfig(boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId, public JobConfig(boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
Integer taskId, String jobName, boolean useSqlFragment, Integer maxRowNum, Integer checkpoint, Integer taskId, String jobName, boolean useSqlFragment, Integer maxRowNum, Integer checkpoint,
...@@ -45,6 +49,7 @@ public class JobConfig { ...@@ -45,6 +49,7 @@ public class JobConfig {
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.parallelism = parallelism; this.parallelism = parallelism;
this.savePointPath = savePointPath; this.savePointPath = savePointPath;
// this.config = config;
} }
public JobConfig(boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId) { public JobConfig(boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId) {
...@@ -74,6 +79,10 @@ public class JobConfig { ...@@ -74,6 +79,10 @@ public class JobConfig {
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,savePointPath,jobName); return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,savePointPath,jobName);
} }
public EnvironmentSetting getEnvironmentSetting(){
return EnvironmentSetting.build(address);
}
public JobConfig buildSubmitConfig(Integer clusterId, Integer taskId, String jobName,boolean useSqlFragment, Integer checkpoint, public JobConfig buildSubmitConfig(Integer clusterId, Integer taskId, String jobName,boolean useSqlFragment, Integer checkpoint,
Integer parallelism, String savePointPath){ Integer parallelism, String savePointPath){
return new JobConfig(false,false,false,clusterId,taskId,jobName,useSqlFragment,checkpoint,parallelism,savePointPath); return new JobConfig(false,false,false,clusterId,taskId,jobName,useSqlFragment,checkpoint,parallelism,savePointPath);
......
package com.dlink.job; package com.dlink.job;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.EnvironmentSetting; import com.dlink.executor.EnvironmentSetting;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
...@@ -85,10 +86,11 @@ public class JobManager extends RunTime { ...@@ -85,10 +86,11 @@ public class JobManager extends RunTime {
private Executor createExecutorWithSession() { private Executor createExecutorWithSession() {
if(config.isUseSession()) { if(config.isUseSession()) {
ExecutorEntity executorEntity = SessionPool.get(config.getSession()); ExecutorEntity executorEntity = SessionPool.get(config.getSession());
if (executorEntity != null) { if (Asserts.checkNotNull(executorEntity)) {
executor = executorEntity.getExecutor(); executor = executorEntity.getExecutor();
config.setSessionConfig(executorEntity.getSessionConfig()); config.setSessionConfig(executorEntity.getSessionConfig());
initEnvironmentSetting(); initEnvironmentSetting();
executor.update(executorSetting);
} else { } else {
createExecutor(); createExecutor();
SessionPool.push(new ExecutorEntity(config.getSession(), executor)); SessionPool.push(new ExecutorEntity(config.getSession(), executor));
...@@ -103,9 +105,14 @@ public class JobManager extends RunTime { ...@@ -103,9 +105,14 @@ public class JobManager extends RunTime {
environmentSetting = EnvironmentSetting.build(config.getAddress()); environmentSetting = EnvironmentSetting.build(config.getAddress());
} }
private void initExecutorSetting(){
executorSetting = config.getExecutorSetting();
}
@Override @Override
public boolean init() { public boolean init() {
handler = JobHandler.build(); handler = JobHandler.build();
initExecutorSetting();
createExecutorWithSession(); createExecutorWithSession();
return false; return false;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment