Commit fa487535 authored by wenmo's avatar wenmo

studio支持perjob提交

parent 3cb783b0
......@@ -39,6 +39,8 @@ public class Task extends SuperEntity{
private boolean fragment;
private boolean statementSet;
private Integer clusterId;
private Integer clusterConfigurationId;
......@@ -66,7 +68,7 @@ public class Task extends SuperEntity{
if(clusterId==null||clusterId==0){
useRemote = false;
}
return new JobConfig(false,false,useRemote,clusterId,getId(),alias,fragment,checkPoint,parallelism,savePointPath);
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointPath);
}
}
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.model.ClusterConfiguration;
import java.util.List;
......@@ -17,4 +18,6 @@ public interface ClusterConfigurationService extends ISuperService<ClusterConfig
List<ClusterConfiguration> listEnabledAll();
GatewayConfig buildGatewayConfig(Integer id);
}
......@@ -2,6 +2,8 @@ package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.mapper.ClusterConfigurationMapper;
import com.dlink.model.ClusterConfiguration;
import com.dlink.service.ClusterConfigurationService;
......@@ -28,4 +30,14 @@ public class ClusterConfigurationServiceImpl extends SuperServiceImpl<ClusterCon
public List<ClusterConfiguration> listEnabledAll() {
return this.list(new QueryWrapper<ClusterConfiguration>().eq("enabled",1));
}
@Override
public GatewayConfig buildGatewayConfig(Integer id) {
ClusterConfiguration clusterConfiguration = this.getClusterConfigById(id);
GatewayConfig gatewayConfig = new GatewayConfig();
gatewayConfig.setClusterConfig(ClusterConfig.build(clusterConfiguration.getConfig().get("flinkConfigPath"),
clusterConfiguration.getConfig().get("flinkLibPath"),
clusterConfiguration.getConfig().get("hadoopConfigPath")));
return gatewayConfig;
}
}
......@@ -14,6 +14,7 @@ import com.dlink.model.Cluster;
import com.dlink.model.Statement;
import com.dlink.model.Task;
import com.dlink.result.SubmitResult;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.StatementService;
import com.dlink.service.TaskService;
......@@ -33,6 +34,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private StatementService statementService;
@Autowired
private ClusterService clusterService;
@Autowired
private ClusterConfigurationService clusterConfigurationService;
@Override
public JobResult submitByTaskId(Integer id) {
......@@ -41,7 +44,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Statement statement = statementService.getById(id);
Assert.check(statement);
JobConfig config = task.buildSubmitConfig();
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(),task.getClusterId()));
if(!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
}else{
config.setGatewayConfig(clusterConfigurationService.buildGatewayConfig(task.getClusterConfigurationId()));
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(statement.getStatement());
}
......
......@@ -12,6 +12,7 @@
<result column="save_point_path" property="savePointPath" />
<result column="parallelism" property="parallelism" />
<result column="fragment" property="fragment" />
<result column="statement_set" property="statementSet" />
<result column="cluster_id" property="clusterId" />
<result column="cluster_configuration_id" property="clusterConfigurationId" />
<result column="config" property="config" />
......@@ -23,7 +24,7 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,check_point,save_point_path, parallelism,fragment,cluster_id,cluster_configuration_id,config,note, enabled, create_time, update_time
id, name, alias, type,check_point,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,config,note, enabled, create_time, update_time
</sql>
......
......@@ -71,8 +71,8 @@ public class Asserts {
return !isNullMap(map);
}
public static void checkNull(String key,String msg) {
if (key == null||"".equals(key)) {
public static void checkNull(Object key,String msg) {
if (key == null) {
throw new RunTimeException(msg);
}
}
......
......@@ -3,6 +3,7 @@ package com.dlink.job;
import com.dlink.executor.EnvironmentSetting;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.session.SessionConfig;
import lombok.Getter;
import lombok.Setter;
......@@ -19,19 +20,24 @@ import java.util.Map;
@Setter
public class JobConfig {
private String type;
private boolean useResult;
private boolean useSession;
private String session;
private boolean useRemote;
private Integer clusterId;
private Integer clusterConfigurationId;
private String address;
private Integer taskId;
private String jobName;
private boolean useSqlFragment;
private boolean useStatementSet;
private Integer maxRowNum;
private Integer checkpoint;
private Integer parallelism;
private String savePointPath;
private GatewayConfig gatewayConfig;
//private Map<String,String> config;
public JobConfig(boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
......@@ -60,16 +66,19 @@ public class JobConfig {
this.clusterId = clusterId;
}
public JobConfig(boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
Integer taskId, String jobName, boolean useSqlFragment, Integer checkpoint,
Integer parallelism, String savePointPath) {
public JobConfig(String type,boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet,Integer checkpoint, Integer parallelism, String savePointPath) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
this.useRemote = useRemote;
this.clusterId = clusterId;
this.clusterConfigurationId = clusterConfigurationId;
this.taskId = taskId;
this.jobName = jobName;
this.useSqlFragment = useSqlFragment;
this.useStatementSet = useStatementSet;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.savePointPath = savePointPath;
......@@ -83,11 +92,6 @@ public class JobConfig {
return EnvironmentSetting.build(address);
}
public JobConfig buildSubmitConfig(Integer clusterId, Integer taskId, String jobName,boolean useSqlFragment, Integer checkpoint,
Integer parallelism, String savePointPath){
return new JobConfig(false,false,false,clusterId,taskId,jobName,useSqlFragment,checkpoint,parallelism,savePointPath);
}
public void setSessionConfig(SessionConfig sessionConfig){
if(sessionConfig!=null) {
address = sessionConfig.getAddress();
......
......@@ -8,6 +8,8 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.explainer.Explainer;
import com.dlink.gateway.Gateway;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.interceptor.FlinkInterceptor;
......@@ -43,6 +45,7 @@ public class JobManager extends RunTime {
private ExecutorSetting executorSetting;
private JobConfig config;
private Executor executor;
private boolean useGateway = false;
public JobManager() {
}
......@@ -74,14 +77,30 @@ public class JobManager extends RunTime {
}
public static JobManager build(JobConfig config) {
initGatewayConfig(config);
JobManager manager = new JobManager(config);
manager.init();
return manager;
}
private static void initGatewayConfig(JobConfig config){
if(useGateway(config.getType())){
Asserts.checkNull(config.getGatewayConfig(),"GatewayConfig 不能为空");
config.getGatewayConfig().setType(GatewayType.get(config.getType()));
config.getGatewayConfig().setTaskId(config.getTaskId());
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(config.getJobName(),
null,null,null,config.getSavePointPath(),null));
}
}
public static boolean useGateway(String type){
return !(GatewayType.STANDALONE.equalsValue(type)||
GatewayType.YARN_SESSION.equalsValue(type));
}
private Executor createExecutor() {
initEnvironmentSetting();
if (config.isUseRemote()&&config.getClusterId()!=0) {
if (!useGateway&& config.isUseRemote()&&config.getClusterId()!=0) {
executor = Executor.buildRemoteExecutor(environmentSetting, config.getExecutorSetting());
return executor;
} else {
......@@ -120,6 +139,7 @@ public class JobManager extends RunTime {
@Override
public boolean init() {
useGateway = useGateway(config.getType());
handler = JobHandler.build();
initExecutorSetting();
createExecutorWithSession();
......@@ -261,9 +281,7 @@ public class JobManager extends RunTime {
resMsg.append(" \n " + s + " ");
}
result.setSuccess(false);
// result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage());
result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>堆栈信息<<<" + resMsg.toString());
// result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>异常原因<<< \n" + e.toString());
return result;
}
......@@ -273,34 +291,59 @@ public class JobManager extends RunTime {
}
public JobResult executeSql(String statement) {
Job job = new Job(config,environmentSetting.getAddress(),
String address = null;
if(!useGateway){
address = environmentSetting.getAddress();
}
Job job = new Job(config,address,
Job.JobStatus.INITIALIZE,statement,executorSetting, LocalDateTime.now(),executor);
JobContextHolder.setJob(job);
job.setType(Operations.getSqlTypeFromStatements(statement));
ready();
String[] statements = statement.split(";");
int currentIndex = 0;
String currentSql = "";
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
List<String> inserts = new ArrayList<>();
try {
for (String item : statements) {
if (item.trim().isEmpty()) {
continue;
}
currentIndex++;
currentSql = item;
SqlType operationType = Operations.getOperationType(item);
if (!FlinkInterceptor.build(executor.getCustomTableEnvironmentImpl(), item)) {
if(config.isUseStatementSet()){
if (!FlinkInterceptor.build(stEnvironment, item)) {
if (operationType.equals(SqlType.INSERT)) {
inserts.add(item);
}else if (operationType.equals(SqlType.SELECT)) {
}else{
executor.executeSql(item);
}
}
}else {
if (!FlinkInterceptor.build(stEnvironment, item)) {
TableResult tableResult = executor.executeSql(item);
if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
}
if(config.isUseResult()) {
if (config.isUseResult()) {
IResult result = ResultBuilder.build(operationType, maxRowNum, "", true).getResult(tableResult);
job.setResult(result);
}
}
if(operationType==SqlType.INSERT||operationType==SqlType.SELECT){
if (operationType == SqlType.INSERT || operationType == SqlType.SELECT) {
break;
}
}
}
if(config.isUseStatementSet()){
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
InsertResult insertResult = new InsertResult(gatewayResult.getAppId(), true);
job.setResult(insertResult);
job.setJobId(gatewayResult.getAppId());
}
job.setEndTime(LocalDateTime.now());
job.setStatus(Job.JobStatus.SUCCESS);
success();
......@@ -314,7 +357,7 @@ public class JobManager extends RunTime {
LocalDateTime now = LocalDateTime.now();
job.setEndTime(now);
job.setStatus(Job.JobStatus.FAILED);
String error = now.toString() + ":" + "运行第" + currentIndex + "个sql时出现异常:" + e.getMessage() + " \n >>>堆栈信息<<<" + resMsg.toString();
String error = now.toString() + ":" + "运行语句:\n" + currentSql + " \n时出现异常:" + e.getMessage() + " \n >>>堆栈信息<<<" + resMsg.toString();
job.setError(error);
failed();
close();
......
......@@ -399,4 +399,8 @@ CREATE TABLE `dlink_job` (
ALTER TABLE `dlink`.`dlink_task`
ADD COLUMN `cluster_configuration_id` int(11) NULL COMMENT '集群配置ID' AFTER `cluster_id`;
ALTER TABLE `dlink`.`dlink_task`
ADD COLUMN `statement_set` tinyint(1) NULL COMMENT '启用语句集' AFTER `fragment`;
SET FOREIGN_KEY_CHECKS = 1;
......@@ -10,7 +10,7 @@ import com.dlink.assertion.Asserts;
**/
public enum GatewayType {
YARN_APPLICATION("ya","yarn-application"),YARN_PER_JOB("ypj","yarn-per-job");
STANDALONE("s","standalone"),YARN_SESSION("ys","yarn-session"),YARN_APPLICATION("ya","yarn-application"),YARN_PER_JOB("ypj","yarn-per-job");
private String value;
private String longValue;
......@@ -36,4 +36,11 @@ public enum GatewayType {
}
return GatewayType.YARN_APPLICATION;
}
public boolean equalsValue(String type){
if(Asserts.isEquals(value,type)||Asserts.isEquals(longValue,type)){
return true;
}
return false;
}
}
package com.dlink.gateway.config;
import com.dlink.gateway.ConfigPara;
import com.dlink.assertion.Asserts;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
/**
......@@ -22,7 +26,35 @@ public class FlinkConfig {
private String savePoint;
private List<ConfigPara> configParas;
private static final ObjectMapper mapper = new ObjectMapper();
public static final String DEFAULT_SAVEPOINT_PREFIX = "hdfs:///flink/savepoints/";
public FlinkConfig() {
}
public FlinkConfig(String jobName, String jobId, ActionType action, SavePointType savePointType, String savePoint, List<ConfigPara> configParas) {
this.jobName = jobName;
this.jobId = jobId;
this.action = action;
this.savePointType = savePointType;
this.savePoint = savePoint;
this.configParas = configParas;
}
public static FlinkConfig build(String jobName, String jobId, String actionStr, String savePointTypeStr, String savePoint, String configParasStr){
List<ConfigPara> configParasList = new ArrayList<>();
JsonNode paras = null;
if(Asserts.isNotNullString(configParasStr)) {
try {
paras = mapper.readTree(configParasStr);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
paras.forEach((JsonNode node) -> {
configParasList.add(new ConfigPara(node.get("key").asText(), node.get("value").asText()));
}
);
}
return new FlinkConfig(jobName,jobId,ActionType.get(actionStr),SavePointType.get(savePointTypeStr),savePoint,configParasList);
}
}
package com.dlink.gateway.config;
import com.dlink.gateway.ConfigPara;
import com.dlink.gateway.GatewayType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
......
......@@ -2,7 +2,7 @@ package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.AbstractGateway;
import com.dlink.gateway.ConfigPara;
import com.dlink.gateway.config.ConfigPara;
import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.ActionType;
......@@ -10,7 +10,6 @@ import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.YarnResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.DeploymentOptions;
......
......@@ -159,6 +159,15 @@ const StudioSetting = (props: any) => {
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="启用语句集" className={styles.form_item} name="statementSet" valuePropName="checked"
tooltip={{ title: '【增强特性】 开启语句集机制,将把多个 Insert 语句合成一个 JobGraph 再进行提交,Select 语句无效', icon: <InfoCircleOutlined /> }}
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
</Row>
......
......@@ -60,6 +60,7 @@ export type TaskType = {
savePointPath?: string,
parallelism?: number,
fragment?: boolean,
statementSet?: boolean,
config?: [],
clusterId?: any,
clusterName?: string,
......@@ -181,6 +182,7 @@ const Model: ModelType = {
savePointPath: '',
parallelism: 1,
fragment: true,
statementSet: false,
clusterId: 0,
clusterName: "本地环境",
clusterConfigurationId:undefined,
......@@ -216,6 +218,7 @@ const Model: ModelType = {
savePointPath: '',
parallelism: 1,
fragment: true,
statementSet: false,
clusterId: 0,
clusterName: "本地环境",
clusterConfigurationId:undefined,
......
......@@ -12,9 +12,9 @@
<modules>
<module>dlink-common</module>
<module>dlink-client</module>
<module>dlink-gateway</module>
<module>dlink-function</module>
<module>dlink-metadata</module>
<module>dlink-gateway</module>
<module>dlink-connectors</module>
<module>dlink-executor</module>
<module>dlink-extends</module>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment