Unverified Commit 8ca8a953 authored by Licho's avatar Licho Committed by GitHub

refactor: simple Job2MysqlHandler (#1171)

parent 0b3f8051
......@@ -25,9 +25,7 @@ import com.dlink.daemon.task.DaemonFactory;
import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.gateway.GatewayType;
import com.dlink.model.Cluster;
import com.dlink.model.ClusterConfiguration;
import com.dlink.model.History;
import com.dlink.model.Jar;
import com.dlink.model.JobHistory;
import com.dlink.model.JobInstance;
import com.dlink.model.JobStatus;
......@@ -54,13 +52,13 @@ import org.springframework.context.annotation.DependsOn;
@DependsOn("springContextUtils")
public class Job2MysqlHandler implements JobHandler {
private static HistoryService historyService;
private static ClusterService clusterService;
private static ClusterConfigurationService clusterConfigurationService;
private static JarService jarService;
private static JobInstanceService jobInstanceService;
private static JobHistoryService jobHistoryService;
private static TaskService taskService;
private static final HistoryService historyService;
private static final ClusterService clusterService;
private static final ClusterConfigurationService clusterConfigurationService;
private static final JarService jarService;
private static final JobInstanceService jobInstanceService;
private static final JobHistoryService jobHistoryService;
private static final TaskService taskService;
static {
historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class);
......@@ -91,7 +89,9 @@ public class Job2MysqlHandler implements JobHandler {
history.setTaskId(job.getJobConfig().getTaskId());
history.setConfigJson(JSONUtil.toJsonString(job.getJobConfig()));
historyService.save(history);
job.setId(history.getId());
return true;
}
......@@ -109,6 +109,7 @@ public class Job2MysqlHandler implements JobHandler {
public boolean success() {
Job job = JobContextHolder.getJob();
Integer taskId = job.getJobConfig().getTaskId();
History history = new History();
history.setId(job.getId());
if (job.isUseGateway() && Asserts.isNullString(job.getJobId())) {
......@@ -120,36 +121,38 @@ public class Job2MysqlHandler implements JobHandler {
historyService.updateById(history);
return false;
}
history.setStatus(job.getStatus().ordinal());
history.setJobId(job.getJobId());
history.setEndTime(job.getEndTime());
if (job.isUseGateway()) {
history.setJobManagerAddress(job.getJobManagerAddress());
}
history.setJobManagerAddress(job.isUseGateway() ? job.getJobManagerAddress() : null);
Integer clusterId = job.getJobConfig().getClusterId();
Cluster cluster = null;
Cluster cluster;
final Integer clusterConfigurationId = job.getJobConfig().getClusterConfigurationId();
if (job.isUseGateway()) {
cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(),
job.getJobId(), job.getJobConfig().getJobName() + LocalDateTime.now(), job.getType().getLongValue(),
job.getJobConfig().getClusterConfigurationId(), taskId));
cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(
job.getJobManagerAddress(),
job.getJobId(),
job.getJobConfig().getJobName() + LocalDateTime.now(),
job.getType().getLongValue(),
clusterConfigurationId,
taskId));
if (Asserts.isNotNull(cluster)) {
clusterId = cluster.getId();
}
} else {
cluster = clusterService.getById(clusterId);
}
history.setClusterId(clusterId);
historyService.updateById(history);
ClusterConfiguration clusterConfiguration = null;
if (Asserts.isNotNull(job.getJobConfig().getClusterConfigurationId())) {
clusterConfiguration = clusterConfigurationService.getClusterConfigById(job.getJobConfig().getClusterConfigurationId());
}
Jar jar = null;
if (Asserts.isNotNull(job.getJobConfig().getJarId())) {
jar = jarService.getById(job.getJobConfig().getJarId());
if (Asserts.isNullCollection(job.getJids()) || GatewayType.LOCAL.equalsValue(job.getJobConfig().getType())) {
return true;
}
if (Asserts.isNotNullCollection(job.getJids()) && !GatewayType.LOCAL.equalsValue(job.getJobConfig().getType())) {
for (String jid : job.getJids()) {
String jid = job.getJids().get(0);
JobInstance jobInstance = history.buildJobInstance();
jobInstance.setHistoryId(job.getId());
jobInstance.setClusterId(clusterId);
......@@ -159,21 +162,26 @@ public class Job2MysqlHandler implements JobHandler {
jobInstance.setStep(job.getJobConfig().getStep());
jobInstance.setStatus(JobStatus.INITIALIZING.getValue());
jobInstanceService.save(jobInstance);
job.setJobInstanceId(jobInstance.getId());
Task task = new Task();
task.setId(taskId);
task.setJobInstanceId(jobInstance.getId());
taskService.updateById(task);
JobHistory jobHistory = new JobHistory();
jobHistory.setId(jobInstance.getId());
jobHistory.setJarJson(JSONUtil.toJsonString(jar));
jobHistory.setClusterJson(JSONUtil.toJsonString(cluster));
jobHistory.setClusterConfigurationJson(JSONUtil.toJsonString(clusterConfiguration));
jobHistory.setJarJson(Asserts.isNotNull(job.getJobConfig().getJarId())
? JSONUtil.toJsonString(jarService.getById(job.getJobConfig().getJarId())) : null);
jobHistory.setClusterConfigurationJson(Asserts.isNotNull(clusterConfigurationId)
? JSONUtil.toJsonString(clusterConfigurationService.getClusterConfigById(clusterConfigurationId))
: null);
jobHistoryService.save(jobHistory);
DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId()));
break;
}
}
return true;
}
......
......@@ -28,10 +28,23 @@ import com.dlink.assertion.Asserts;
* @since 2021/12/13
**/
public enum Dialect {
//
FLINKSQL("FlinkSql"), FLINKJAR("FlinkJar"), FLINKSQLENV("FlinkSqlEnv"), SQL("Sql"), JAVA("Java"), PYTHON("Python"), SCALA("Scala"),
MYSQL("Mysql"), ORACLE("Oracle"), SQLSERVER("SqlServer"), POSTGRESQL("PostgreSql"), CLICKHOUSE("ClickHouse"),
DORIS("Doris"), PHOENIX("Phoenix"), HIVE("Hive"), STARROCKS("StarRocks"), KUBERNETES_APPLICATION("KubernetesApplaction");
FLINKSQL("FlinkSql"),
FLINKJAR("FlinkJar"),
FLINKSQLENV("FlinkSqlEnv"),
SQL("Sql"),
JAVA("Java"),
PYTHON("Python"),
SCALA("Scala"),
MYSQL("Mysql"),
ORACLE("Oracle"),
SQLSERVER("SqlServer"),
POSTGRESQL("PostgreSql"),
CLICKHOUSE("ClickHouse"),
DORIS("Doris"),
PHOENIX("Phoenix"),
HIVE("Hive"),
STARROCKS("StarRocks"),
KUBERNETES_APPLICATION("KubernetesApplaction");
private String value;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment