Commit d7ff5934 authored by wenmo's avatar wenmo

执行重构

parent 03ce78a5
package com.dlink.model;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.db.model.SuperEntity;
......@@ -8,6 +9,8 @@ import com.dlink.executor.ExecutorSetting;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.HashMap;
/**
* 任务
*
......@@ -35,6 +38,8 @@ public class Task extends SuperEntity{
private Integer clusterId;
private String config;
private String note;
@TableField(exist = false)
......@@ -43,12 +48,12 @@ public class Task extends SuperEntity{
@TableField(exist = false)
private String clusterName;
public ExecutorSetting getLocalExecutorSetting(){
return new ExecutorSetting(Executor.LOCAL,checkPoint,parallelism,fragment,savePointPath,alias);
public ExecutorSetting getExecutorSetting(){
HashMap configMap = new HashMap();
if(config!=null&&!"".equals(clusterName)) {
configMap = JSONUtil.toBean(config, HashMap.class);
}
public ExecutorSetting getRemoteExecutorSetting(){
return new ExecutorSetting(Executor.REMOTE,checkPoint,parallelism,fragment,savePointPath,alias);
return new ExecutorSetting(checkPoint,parallelism,fragment,savePointPath,alias,configMap);
}
......
......@@ -24,6 +24,8 @@ import com.dlink.session.SessionPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
/**
......@@ -38,61 +40,37 @@ public class StudioServiceImpl implements StudioService {
@Autowired
private ClusterService clusterService;
/*@Override
public RunResult executeSql(StudioExecuteDTO studioExecuteDTO) {
studioExecuteDTO.setSession(studioExecuteDTO.getClusterId()+"_"+studioExecuteDTO.getSession());
String ExecuteType = Executor.REMOTE;
String host =null;
Cluster cluster = clusterService.getById(studioExecuteDTO.getClusterId());
if(studioExecuteDTO.getClusterId()==0&&cluster==null){
ExecuteType = Executor.LOCAL;
}else if(cluster==null){
throw new BusException("未获取到集群信息");
}else {
Assert.check(cluster);
host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
Assert.checkHost(host);
if(!host.equals(cluster.getJobManagerHost())){
cluster.setJobManagerHost(host);
clusterService.updateById(cluster);
}
}
JobManager jobManager = new JobManager(
host,
studioExecuteDTO.getSession(),
studioExecuteDTO.getMaxRowNum(),
new ExecutorSetting(
ExecuteType,
studioExecuteDTO.getCheckPoint(),
studioExecuteDTO.getParallelism(),
studioExecuteDTO.isFragment(),
studioExecuteDTO.getSavePointPath(),
studioExecuteDTO.getJobName()));
return jobManager.execute(studioExecuteDTO.getStatement());
}*/
@Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
buildEnvironmentAddress(config,studioExecuteDTO.getClusterId());
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(studioExecuteDTO.getStatement());
}
private void buildEnvironmentAddress(JobConfig config,Integer clusterId){
if(config.isUseRemote()) {
config.setHost(clusterService.getJobManagerAddress(
clusterService.getById(studioExecuteDTO.getClusterId())
config.setAddress(clusterService.getJobManagerAddress(
clusterService.getById(clusterId)
));
}else{
config.setHost(FlinkConstant.LOCAL_HOST);
try {
InetAddress address = InetAddress.getLocalHost();
if(address!=null) {
config.setAddress(address.getHostAddress());
}else{
config.setAddress(FlinkConstant.LOCAL_HOST);
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(studioExecuteDTO.getStatement());
}
@Override
public IResult executeDDL(StudioDDLDTO studioDDLDTO) {
JobConfig config = studioDDLDTO.getJobConfig();
if(config.isUseRemote()) {
config.setHost(clusterService.getJobManagerAddress(
clusterService.getById(studioDDLDTO.getClusterId())
));
}
buildEnvironmentAddress(config,studioDDLDTO.getClusterId());
JobManager jobManager = JobManager.build(config);
return jobManager.executeDDL(studioDDLDTO.getStatement());
}
......
......@@ -47,7 +47,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
cluster.setJobManagerHost(host);
clusterService.updateById(cluster);
}
JobManager jobManager = new JobManager(host,task.getRemoteExecutorSetting());
JobManager jobManager = new JobManager(host,task.getExecutorSetting());
return jobManager.submit(statement.getStatement());
}else if(task.getClusterId()==0){
JobManager jobManager = new JobManager();
......
......@@ -13,6 +13,7 @@
<result column="parallelism" property="parallelism" />
<result column="fragment" property="fragment" />
<result column="cluster_id" property="clusterId" />
<result column="config" property="config" />
<result column="note" property="note" />
<result column="enabled" property="enabled" />
<result column="create_time" property="createTime" />
......@@ -21,7 +22,7 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,check_point,save_point_path, parallelism,fragment,cluster_id,note, enabled, create_time, update_time
id, name, alias, type,check_point,save_point_path, parallelism,fragment,cluster_id,config,note, enabled, create_time, update_time
</sql>
......
package com.dlink.assertion;
import com.dlink.exception.JobException;
/**
* Asserts
*
* @author wenmo
* @since 2021/7/5 21:57
*/
public class Asserts {
public static void checkNull(String key,String msg) {
if (key == null||"".equals(key)) {
throw new JobException(msg);
}
}
}
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkConstant;
import com.dlink.constant.NetConstant;
import lombok.Getter;
import lombok.Setter;
/**
* EnvironmentSetting
*
* @author wenmo
* @since 2021/5/25 13:45
**/
@Getter
@Setter
public class EnvironmentSetting {
private String host;
private int port;
private boolean useRemote;
public static final EnvironmentSetting LOCAL = new EnvironmentSetting(false);
public EnvironmentSetting(boolean useRemote) {
this.useRemote = useRemote;
}
public EnvironmentSetting(String host, int port) {
this.host = host;
this.port = port;
this.useRemote = true;
}
public String getHost() {
return host;
public static EnvironmentSetting build(String address){
Asserts.checkNull(address,"Flink 地址不能为空");
String[] strs = address.split(NetConstant.COLON);
if (strs.length >= 2) {
return new EnvironmentSetting(strs[0],Integer.parseInt(strs[1]));
} else {
return new EnvironmentSetting(strs[0],FlinkConstant.PORT);
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
public String getAddress(){
return host + NetConstant.COLON + port;
}
public void setPort(int port) {
this.port = port;
}
}
......@@ -12,6 +12,8 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.Map;
/**
* Executor
* @author wenmo
......@@ -19,9 +21,6 @@ import org.apache.flink.table.functions.UserDefinedFunction;
**/
public abstract class Executor {
public static final String LOCAL = "LOCAL";
public static final String REMOTE = "REMOTE";
protected StreamExecutionEnvironment environment;
protected CustomTableEnvironmentImpl stEnvironment;
protected EnvironmentSetting environmentSetting;
......@@ -29,17 +28,24 @@ public abstract class Executor {
public static Executor build(){
return new LocalStreamExecutor(new ExecutorSetting(LOCAL,true));
return new LocalStreamExecutor(ExecutorSetting.DEFAULT);
}
public static Executor build(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
if(LOCAL.equals(executorSetting.getType())){
return new LocalStreamExecutor(executorSetting);
}else if(REMOTE.equals(executorSetting.getType())){
return new RemoteStreamExecutor(environmentSetting,executorSetting);
if(environmentSetting.isUseRemote()){
return buildRemoteExecutor(environmentSetting,executorSetting);
}else{
return buildLocalExecutor(executorSetting);
}
}
public static Executor buildLocalExecutor(ExecutorSetting executorSetting){
return new LocalStreamExecutor(executorSetting);
}
public static Executor buildRemoteExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
environmentSetting.setUseRemote(true);
return new RemoteStreamExecutor(environmentSetting,executorSetting);
}
public StreamExecutionEnvironment getEnvironment(){
......@@ -58,6 +64,37 @@ public abstract class Executor {
return environmentSetting;
}
protected void init(){
initEnvironment();
initStreamExecutionEnvironment();
}
private void initEnvironment(){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
}
private void initStreamExecutionEnvironment(){
stEnvironment = CustomTableEnvironmentImpl.create(environment);
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
}
if(executorSetting.getConfig()!=null){
for (Map.Entry<String, String> entry : executorSetting.getConfig().entrySet()) {
stEnvironment.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
}
}
}
public JobExecutionResult execute(String jobName) throws Exception{
return stEnvironment.execute(jobName);
}
......
......@@ -3,6 +3,8 @@ package com.dlink.executor;
import lombok.Getter;
import lombok.Setter;
import java.util.Map;
/**
* ExecutorSetting
*
......@@ -12,36 +14,28 @@ import lombok.Setter;
@Setter
@Getter
public class ExecutorSetting {
private String host;
private String type;
private Integer checkpoint;
private Integer parallelism;
private boolean useSqlFragment;
private String savePointPath;
private String jobName;
private Map<String,String> config;
public static final ExecutorSetting DEFAULT = new ExecutorSetting(false);
public ExecutorSetting(String type) {
this.type = type;
}
public ExecutorSetting(String type, boolean useSqlFragment) {
this.type = type;
public ExecutorSetting(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(String type, Integer checkpoint) {
this.type = type;
public ExecutorSetting(Integer checkpoint) {
this.checkpoint = checkpoint;
}
public ExecutorSetting(String type, Integer checkpoint, boolean useSqlFragment) {
this.type = type;
public ExecutorSetting(Integer checkpoint, boolean useSqlFragment) {
this.checkpoint = checkpoint;
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(String type, Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath,String jobName) {
this.type = type;
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath,String jobName) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
......@@ -49,25 +43,19 @@ public class ExecutorSetting {
this.jobName = jobName;
}
public ExecutorSetting(String type, Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) {
this.type = type;
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
}
public ExecutorSetting(String host, String type, Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName) {
this.host = host;
this.type = type;
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName, Map<String, String> config) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
this.jobName = jobName;
}
public boolean isRemote(){
return type.equals(Executor.REMOTE);
this.config = config;
}
}
......@@ -14,20 +14,6 @@ public class LocalStreamExecutor extends Executor {
public LocalStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
stEnvironment = CustomTableEnvironmentImpl.create(environment);
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
}
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
init();
}
}
......@@ -14,8 +14,9 @@ public class RemoteStreamExecutor extends Executor {
public RemoteStreamExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting;
synchronized (RemoteStreamExecutor.class){
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
init();
/*synchronized (RemoteStreamExecutor.class){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
......@@ -33,7 +34,7 @@ public class RemoteStreamExecutor extends Executor {
}else{
stEnvironment.unUseSqlFragment();
}
}
}*/
}
}
......@@ -20,7 +20,7 @@ public class JobConfig {
private String session;
private boolean useRemote;
private Integer clusterId;
private String host;
private String address;
private Integer taskId;
private String jobName;
private boolean useSqlFragment;
......@@ -55,10 +55,6 @@ public class JobConfig {
}
public ExecutorSetting getExecutorSetting(){
String type = Executor.LOCAL;
if(useRemote){
type = Executor.REMOTE;
}
return new ExecutorSetting(host,type,checkpoint,parallelism,useSqlFragment,savePointPath,jobName);
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,savePointPath,jobName);
}
}
package com.dlink.job;
import com.dlink.constant.FlinkConstant;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.constant.NetConstant;
import com.dlink.executor.EnvironmentSetting;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
......@@ -15,11 +13,10 @@ import com.dlink.session.SessionPool;
import com.dlink.trans.Operations;
import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.TableResult;
import org.junit.Assert;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*;
import java.util.Arrays;
import java.util.List;
/**
* JobManager
......@@ -30,12 +27,9 @@ import java.util.*;
public class JobManager extends RunTime {
private JobHandler handler;
private String flinkHost;
private String jobManagerHost;
private Integer jobManagerPort;
private Integer port;
private String sessionId;
private Integer maxRowNum = 100;
private EnvironmentSetting environmentSetting;
private ExecutorSetting executorSetting;
private JobConfig config;
private Executor executor;
......@@ -43,32 +37,16 @@ public class JobManager extends RunTime {
public JobManager() {
}
public JobManager(String host, ExecutorSetting executorSetting) {
if (host != null) {
String[] strs = host.split(NetConstant.COLON);
if (strs.length >= 2) {
this.flinkHost = strs[0];
this.port = Integer.parseInt(strs[1]);
} else {
this.flinkHost = strs[0];
this.port = FlinkConstant.PORT;
}
public JobManager(String address, ExecutorSetting executorSetting) {
if (address != null) {
this.environmentSetting = EnvironmentSetting.build(address);
this.executorSetting = executorSetting;
this.executor = createExecutor();
}
}
public JobManager(String host, String sessionId, Integer maxRowNum, ExecutorSetting executorSetting) {
if (host != null) {
String[] strs = host.split(NetConstant.COLON);
if (strs.length >= 2) {
this.flinkHost = strs[0];
this.port = Integer.parseInt(strs[1]);
} else {
this.flinkHost = strs[0];
this.port = FlinkConstant.PORT;
}
}
public JobManager(String address, String sessionId, Integer maxRowNum, ExecutorSetting executorSetting) {
this.environmentSetting = EnvironmentSetting.build(address);
this.sessionId = sessionId;
this.maxRowNum = maxRowNum;
this.executorSetting = executorSetting;
......@@ -93,10 +71,10 @@ public class JobManager extends RunTime {
private Executor createExecutor() {
if (config.isUseRemote()) {
executor = Executor.build(new EnvironmentSetting(jobManagerHost, jobManagerPort), config.getExecutorSetting());
executor = Executor.buildRemoteExecutor(environmentSetting, config.getExecutorSetting());
return executor;
} else {
executor = Executor.build(null, config.getExecutorSetting());
executor = Executor.buildLocalExecutor(config.getExecutorSetting());
return executor;
}
}
......@@ -119,17 +97,8 @@ public class JobManager extends RunTime {
@Override
public boolean init() {
handler = JobHandler.build();
String host = config.getHost();
if (host != null && !("").equals(host)) {
String[] strs = host.split(NetConstant.COLON);
if (strs.length >= 2) {
jobManagerHost = strs[0];
jobManagerPort = Integer.parseInt(strs[1]);
} else {
jobManagerHost = strs[0];
jobManagerPort = FlinkConstant.PORT;
}
}
String address = config.getAddress();
environmentSetting = EnvironmentSetting.build(address);
createExecutorWithSession();
return false;
}
......@@ -156,7 +125,7 @@ public class JobManager extends RunTime {
}
public RunResult execute(String statement) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost, port, executorSetting, executorSetting.getJobName());
RunResult runResult = new RunResult(sessionId, statement, environmentSetting.getHost(), environmentSetting.getPort(), executorSetting, executorSetting.getJobName());
Executor executor = createExecutorWithSession();
String[] Statements = statement.split(";");
int currentIndex = 0;
......@@ -209,7 +178,7 @@ public class JobManager extends RunTime {
}
public SubmitResult submit(List<String> sqlList) {
SubmitResult result = new SubmitResult(sessionId, sqlList, flinkHost, executorSetting.getJobName());
SubmitResult result = new SubmitResult(sessionId, sqlList, environmentSetting.getHost(), executorSetting.getJobName());
int currentIndex = 0;
try {
if (sqlList != null && sqlList.size() > 0) {
......@@ -263,7 +232,7 @@ public class JobManager extends RunTime {
}
public JobResult executeSql(String statement) {
Job job = new Job(config,jobManagerHost+NetConstant.COLON+jobManagerPort,
Job job = new Job(config,environmentSetting.getAddress(),
Job.JobStatus.INITIALIZE,statement,executorSetting, LocalDateTime.now(),executor);
JobContextHolder.setJob(job);
job.setType(Operations.getSqlTypeFromStatements(statement));
......
......@@ -24,7 +24,7 @@ public class JobManagerTest {
@Test
public void submitJobTest2(){
ExecutorSetting setting = new ExecutorSetting(Executor.REMOTE);
ExecutorSetting setting = new ExecutorSetting(true);
JobManager jobManager = new JobManager("192.168.123.157:8081","test2",100, setting);
String sql1 ="CREATE TABLE student (\n" +
" sid INT,\n" +
......@@ -59,7 +59,7 @@ public class JobManagerTest {
@Test
public void executeJobTest(){
ExecutorSetting setting = new ExecutorSetting(Executor.REMOTE,0,1,false,null);
ExecutorSetting setting = new ExecutorSetting(0,1,false,null);
JobManager jobManager = new JobManager("192.168.123.157:8081","test2",100, setting);
String sql1 ="CREATE TABLE student (\n" +
......@@ -101,7 +101,7 @@ public class JobManagerTest {
null, "测试", false, 100, 0,
1, null);
if(config.isUseRemote()) {
config.setHost("192.168.123.157:8081");
config.setAddress("192.168.123.157:8081");
}
JobManager jobManager = JobManager.build(config);
String sql1 ="CREATE TABLE Orders (\n" +
......
......@@ -327,4 +327,7 @@ CREATE TABLE `dlink_history` (
INDEX `cluster_index`(`cluster_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '执行历史' ROW_FORMAT = Dynamic;
ALTER TABLE `dlink`.`dlink_task`
ADD COLUMN `config` text NULL COMMENT '配置' AFTER `cluster_id`;
SET FOREIGN_KEY_CHECKS = 1;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment