Commit 4e11672c authored by tangxiuhong's avatar tangxiuhong

修复任务提交参数设置异常

parent 0781bc44
...@@ -8,6 +8,7 @@ import com.dlink.db.model.SuperEntity; ...@@ -8,6 +8,7 @@ import com.dlink.db.model.SuperEntity;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
...@@ -15,6 +16,7 @@ import java.util.ArrayList; ...@@ -15,6 +16,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
* 任务 * 任务
...@@ -102,10 +104,12 @@ public class Task extends SuperEntity { ...@@ -102,10 +104,12 @@ public class Task extends SuperEntity {
} }
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
for (Map<String, String> item : config) { for (Map<String, String> item : config) {
map.put(item.get("key"), item.get("value")); if (Asserts.isNotNull(item)) {
map.put(item.get("key"), item.get("value"));
}
} }
return new JobConfig(type, step, false, false, useRemote, clusterId, clusterConfigurationId, jarId, getId(), return new JobConfig(type, step, false, false, useRemote, clusterId, clusterConfigurationId, jarId, getId(),
alias, fragment, statementSet, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map); alias, fragment, statementSet, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map);
} }
} }
...@@ -5,6 +5,7 @@ import com.dlink.executor.ExecutorSetting; ...@@ -5,6 +5,7 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.*; import com.dlink.gateway.config.*;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
...@@ -164,12 +165,12 @@ public class JobConfig { ...@@ -164,12 +165,12 @@ public class JobConfig {
gatewayConfig = new GatewayConfig(); gatewayConfig = new GatewayConfig();
if (config.containsKey("hadoopConfigPath")) { if (config.containsKey("hadoopConfigPath")) {
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(), gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(), config.get("flinkLibPath").toString(),
config.get("hadoopConfigPath").toString())); config.get("hadoopConfigPath").toString()));
} else { } else {
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(), gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(), config.get("flinkLibPath").toString(),
"")); ""));
} }
AppConfig appConfig = new AppConfig(); AppConfig appConfig = new AppConfig();
if (config.containsKey("userJarPath") && Asserts.isNotNullString((String) config.get("userJarPath"))) { if (config.containsKey("userJarPath") && Asserts.isNotNullString((String) config.get("userJarPath"))) {
...@@ -204,7 +205,9 @@ public class JobConfig { ...@@ -204,7 +205,9 @@ public class JobConfig {
gatewayConfig = new GatewayConfig(); gatewayConfig = new GatewayConfig();
} }
for (Map<String, String> item : configList) { for (Map<String, String> item : configList) {
gatewayConfig.getFlinkConfig().getConfiguration().put(item.get("key"), item.get("value")); if (Asserts.isNotNull(item)) {
gatewayConfig.getFlinkConfig().getConfiguration().put(item.get("key"), item.get("value"));
}
} }
} }
......
...@@ -69,7 +69,7 @@ CREATE TABLE `dlink_cluster_configuration` ( ...@@ -69,7 +69,7 @@ CREATE TABLE `dlink_cluster_configuration` (
`alias` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '别名', `alias` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '别名',
`type` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '类型', `type` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '类型',
`config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON', `config_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '配置JSON',
`is_available` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '是否可用', `is_available` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否可用',
`note` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '注释', `note` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '注释',
`enabled` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否启用', `enabled` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否启用',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
...@@ -454,8 +454,8 @@ CREATE TABLE `dlink_task` ( ...@@ -454,8 +454,8 @@ CREATE TABLE `dlink_task` (
`save_point_strategy` int(1) UNSIGNED ZEROFILL NULL DEFAULT NULL COMMENT 'SavePoint策略', `save_point_strategy` int(1) UNSIGNED ZEROFILL NULL DEFAULT NULL COMMENT 'SavePoint策略',
`save_point_path` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 'SavePointPath', `save_point_path` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 'SavePointPath',
`parallelism` int(4) NULL DEFAULT NULL COMMENT 'parallelism', `parallelism` int(4) NULL DEFAULT NULL COMMENT 'parallelism',
`fragment` tinyint(1) NULL DEFAULT NULL COMMENT 'fragment', `fragment` tinyint(1) NULL DEFAULT 0 COMMENT 'fragment',
`statement_set` tinyint(1) NULL DEFAULT NULL COMMENT '启用语句集', `statement_set` tinyint(1) NULL DEFAULT 0 COMMENT '启用语句集',
`batch_model` tinyint(1) NULL DEFAULT 0 COMMENT '使用批模式', `batch_model` tinyint(1) NULL DEFAULT 0 COMMENT '使用批模式',
`cluster_id` int(11) NULL DEFAULT NULL COMMENT 'Flink集群ID', `cluster_id` int(11) NULL DEFAULT NULL COMMENT 'Flink集群ID',
`cluster_configuration_id` int(11) NULL DEFAULT NULL COMMENT '集群配置ID', `cluster_configuration_id` int(11) NULL DEFAULT NULL COMMENT '集群配置ID',
......
...@@ -668,3 +668,8 @@ ALTER TABLE `dlink_job_instance` ...@@ -668,3 +668,8 @@ ALTER TABLE `dlink_job_instance`
CREATE INDEX dlink_job_instance_task_id_IDX USING BTREE ON dlink_job_instance (task_id); CREATE INDEX dlink_job_instance_task_id_IDX USING BTREE ON dlink_job_instance (task_id);
SET FOREIGN_KEY_CHECKS = 1; SET FOREIGN_KEY_CHECKS = 1;
-- 0.6.2-SNAPSHOT 2022-04-17
-- ----------------------------
alter table dlink_task alter column fragment set default 0;
alter table dlink_task alter column statement_set set default 0;
alter table dlink_cluster_configuration modify column is_available tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否可用';
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment