Unverified Commit 022044ab authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-1053][admin] Configured integration DolphinScheduler (#1056)

Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent e3a0e249
......@@ -24,6 +24,7 @@ import com.dlink.init.SystemInit;
import com.dlink.model.Catalogue;
import com.dlink.scheduler.client.ProcessClient;
import com.dlink.scheduler.client.TaskClient;
import com.dlink.scheduler.config.DolphinSchedulerProperties;
import com.dlink.scheduler.enums.ReleaseState;
import com.dlink.scheduler.exception.SchedulerException;
import com.dlink.scheduler.model.DagData;
......@@ -45,7 +46,6 @@ import javax.validation.Valid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
......@@ -74,11 +74,10 @@ public class SchedulerController {
private static final Logger logger = LoggerFactory.getLogger(SchedulerController.class);
@Value("${dinky.url}")
private String dinkyUrl;
@Autowired
private DolphinSchedulerProperties dolphinSchedulerProperties;
@Autowired
private ProcessClient processClient;
@Autowired
private TaskClient taskClient;
@Autowired
......@@ -160,7 +159,7 @@ public class SchedulerController {
@Valid @RequestBody TaskRequest taskRequest) {
DlinkTaskParams dlinkTaskParams = new DlinkTaskParams();
dlinkTaskParams.setTaskId(dinkyTaskId.toString());
dlinkTaskParams.setAddress(dinkyUrl);
dlinkTaskParams.setAddress(dolphinSchedulerProperties.getAddress());
taskRequest.setTaskParams(JSONUtil.parseObj(dlinkTaskParams).toString());
taskRequest.setTaskType("DINKY");
......
......@@ -19,11 +19,13 @@
package com.dlink.init;
import com.dlink.assertion.Asserts;
import com.dlink.daemon.task.DaemonFactory;
import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.job.FlinkJobTask;
import com.dlink.model.JobInstance;
import com.dlink.scheduler.client.ProjectClient;
import com.dlink.scheduler.config.DolphinSchedulerProperties;
import com.dlink.scheduler.exception.SchedulerException;
import com.dlink.scheduler.model.Project;
import com.dlink.service.JobInstanceService;
......@@ -60,33 +62,55 @@ public class SystemInit implements ApplicationRunner {
private JobInstanceService jobInstanceService;
@Autowired
private TaskService taskService;
@Autowired
private DolphinSchedulerProperties dolphinSchedulerProperties;
private static Project project;
@Override
public void run(ApplicationArguments args) throws Exception {
sysConfigService.initSysConfig();
taskService.initDefaultFlinkSQLEnv();
initTaskMonitor();
initDolphinScheduler();
}
/**
* init task monitor
*/
private void initTaskMonitor() {
List<JobInstance> jobInstances = jobInstanceService.listJobInstanceActive();
List<DaemonTaskConfig> configList = new ArrayList<>();
for (JobInstance jobInstance : jobInstances) {
configList.add(new DaemonTaskConfig(FlinkJobTask.TYPE, jobInstance.getId()));
}
log.info("启动的任务数量:" + configList.size());
log.info("Number of tasks started: " + configList.size());
DaemonFactory.start(configList);
try {
project = projectClient.getDinkyProject();
if (project == null) {
project = projectClient.createDinkyProject();
}
/**
* init DolphinScheduler
*/
private void initDolphinScheduler() {
if (dolphinSchedulerProperties.isEnabled()) {
try {
project = projectClient.getDinkyProject();
if (Asserts.isNull(project)) {
project = projectClient.createDinkyProject();
}
} catch (Exception e) {
log.error("Error in DolphinScheduler: {}", e);
}
} catch (Exception e) {
log.error("海豚调度异常: {}", e);
}
}
/**
* get dolphinscheduler's project
*
* @return: com.dlink.scheduler.model.Project
*/
public static Project getProject() {
if (project == null) {
throw new SchedulerException("请完善海豚调度配置,重启dlink");
if (Asserts.isNull(project)) {
throw new SchedulerException("Please complete the dolphinscheduler configuration.");
}
return project;
}
......
......@@ -90,12 +90,13 @@ knife4j:
enable: true
dinky:
# dinky 地址
url: http://127.0.0.1:8888
dolphinscheduler:
enabled: false
# dolphinscheduler 地址
url: http://127.0.0.1:12345/dolphinscheduler
# dolphinscheduler 生成的token
token: 466cb6f6455b1b8689fff770746e2e79
# dolphinscheduler 中指定的项目名不区分大小写
project-name: dinky
# Dolphinscheduler DinkyTask Address
address: http://127.0.0.1:8888
......@@ -19,6 +19,7 @@
package com.dlink.scheduler.client;
import com.dlink.scheduler.config.DolphinSchedulerProperties;
import com.dlink.scheduler.constant.Constants;
import com.dlink.scheduler.model.DagData;
import com.dlink.scheduler.model.ProcessDefinition;
......@@ -35,7 +36,7 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cn.hutool.core.lang.TypeReference;
......@@ -53,10 +54,8 @@ public class ProcessClient {
private static final Logger logger = LoggerFactory.getLogger(TaskClient.class);
@Value("${dinky.dolphinscheduler.url}")
private String url;
@Value("${dinky.dolphinscheduler.token}")
private String tokenKey;
@Autowired
private DolphinSchedulerProperties dolphinSchedulerProperties;
/**
* 查询工作流定义
......@@ -70,10 +69,10 @@ public class ProcessClient {
public List<ProcessDefinition> getProcessDefinition(Long projectCode, String processName) {
Map<String, Object> map = new HashMap<>();
map.put("projectCode", projectCode);
String format = StrUtil.format(url + "/projects/{projectCode}/process-definition", map);
String format = StrUtil.format(dolphinSchedulerProperties.getUrl() + "/projects/{projectCode}/process-definition", map);
String content = HttpRequest.get(format)
.header(Constants.TOKEN, tokenKey)
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.form(ParamUtil.getPageParams(processName))
.timeout(5000)
.execute().body();
......@@ -122,10 +121,10 @@ public class ProcessClient {
Map<String, Object> map = new HashMap<>();
map.put("projectCode", projectCode);
map.put("code", processCode);
String format = StrUtil.format(url + "/projects/{projectCode}/process-definition/{code}", map);
String format = StrUtil.format(dolphinSchedulerProperties.getUrl() + "/projects/{projectCode}/process-definition/{code}", map);
String content = HttpRequest.get(format)
.header(Constants.TOKEN, tokenKey)
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.timeout(5000)
.execute().body();
......@@ -145,7 +144,7 @@ public class ProcessClient {
public ProcessDefinition createProcessDefinition(Long projectCode, String processName, Long taskCode, String taskDefinitionJson) {
Map<String, Object> map = new HashMap<>();
map.put("projectCode", projectCode);
String format = StrUtil.format(url + "/projects/{projectCode}/process-definition", map);
String format = StrUtil.format(dolphinSchedulerProperties.getUrl() + "/projects/{projectCode}/process-definition", map);
Map<String, Object> taskMap = new HashMap<>();
taskMap.put("code", taskCode);
......@@ -161,7 +160,7 @@ public class ProcessClient {
params.put("executionType", "PARALLEL");
String content = HttpRequest.post(format)
.header(Constants.TOKEN, tokenKey)
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.form(params)
.timeout(5000)
.execute().body();
......
......@@ -19,6 +19,7 @@
package com.dlink.scheduler.client;
import com.dlink.scheduler.config.DolphinSchedulerProperties;
import com.dlink.scheduler.constant.Constants;
import com.dlink.scheduler.model.Project;
import com.dlink.scheduler.result.Result;
......@@ -30,7 +31,7 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cn.hutool.core.lang.TypeReference;
......@@ -46,12 +47,8 @@ public class ProjectClient {
private static final Logger logger = LoggerFactory.getLogger(TaskClient.class);
@Value("${dinky.dolphinscheduler.url}")
private String url;
@Value("${dinky.dolphinscheduler.token}")
private String tokenKey;
@Value("${dinky.dolphinscheduler.project-name}")
private String dinkyProjectName;
@Autowired
private DolphinSchedulerProperties dolphinSchedulerProperties;
/**
* 创建项目
......@@ -62,11 +59,11 @@ public class ProjectClient {
*/
public Project createDinkyProject() {
Map<String, Object> map = new HashMap<>();
map.put("projectName", dinkyProjectName);
map.put("projectName", dolphinSchedulerProperties.getProjectName());
map.put("description", "自动创建");
String content = HttpRequest.post(url + "/projects")
.header(Constants.TOKEN, tokenKey)
String content = HttpRequest.post(dolphinSchedulerProperties.getUrl() + "/projects")
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.form(map)
.timeout(5000)
.execute().body();
......@@ -83,12 +80,12 @@ public class ProjectClient {
*/
public Project getDinkyProject() {
String content = HttpRequest.get(url + "/projects")
.header(Constants.TOKEN, tokenKey)
.form(ParamUtil.getPageParams(dinkyProjectName))
String content = HttpRequest.get(dolphinSchedulerProperties.getUrl() + "/projects")
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.form(ParamUtil.getPageParams(dolphinSchedulerProperties.getProjectName()))
.timeout(5000)
.execute().body();
return MyJSONUtil.toPageBeanAndFindByName(content, dinkyProjectName, Project.class);
return MyJSONUtil.toPageBeanAndFindByName(content, dolphinSchedulerProperties.getProjectName(), Project.class);
}
}
......@@ -19,6 +19,7 @@
package com.dlink.scheduler.client;
import com.dlink.scheduler.config.DolphinSchedulerProperties;
import com.dlink.scheduler.constant.Constants;
import com.dlink.scheduler.exception.SchedulerException;
import com.dlink.scheduler.model.TaskDefinition;
......@@ -38,7 +39,7 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cn.hutool.core.lang.TypeReference;
......@@ -56,10 +57,8 @@ public class TaskClient {
private static final Logger logger = LoggerFactory.getLogger(TaskClient.class);
@Value("${dinky.dolphinscheduler.url}")
private String url;
@Value("${dinky.dolphinscheduler.token}")
private String tokenKey;
@Autowired
private DolphinSchedulerProperties dolphinSchedulerProperties;
/**
* 查询任务定义
......@@ -94,7 +93,7 @@ public class TaskClient {
public List<TaskMainInfo> getTaskMainInfos(Long projectCode, String processName, String taskName) {
Map<String, Object> map = new HashMap<>();
map.put("projectCode", projectCode);
String format = StrUtil.format(url + "/projects/{projectCode}/task-definition", map);
String format = StrUtil.format(dolphinSchedulerProperties.getUrl() + "/projects/{projectCode}/task-definition", map);
Map<String, Object> pageParams = ParamUtil.getPageParams();
pageParams.put("searchTaskName", taskName);
......@@ -102,7 +101,7 @@ public class TaskClient {
pageParams.put("taskType", "DINKY");
String content = HttpRequest.get(format)
.header(Constants.TOKEN, tokenKey)
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.form(pageParams)
.timeout(5000)
.execute().body();
......@@ -134,10 +133,10 @@ public class TaskClient {
Map<String, Object> map = new HashMap<>();
map.put("projectCode", projectCode);
map.put("code", taskCode);
String format = StrUtil.format(url + "/projects/{projectCode}/task-definition/{code}", map);
String format = StrUtil.format(dolphinSchedulerProperties.getUrl() + "/projects/{projectCode}/task-definition/{code}", map);
String content = HttpRequest.get(format)
.header(Constants.TOKEN, tokenKey)
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.timeout(5000)
.execute().body();
......@@ -157,7 +156,7 @@ public class TaskClient {
public TaskDefinitionLog createTaskDefinition(Long projectCode, Long processCode, String upstreamCodes, String taskDefinitionJsonObj) {
Map<String, Object> map = new HashMap<>();
map.put("projectCode", projectCode);
String format = StrUtil.format(url + "/projects/{projectCode}/task-definition/save-single", map);
String format = StrUtil.format(dolphinSchedulerProperties.getUrl() + "/projects/{projectCode}/task-definition/save-single", map);
Map<String, Object> pageParams = new HashMap<>();
pageParams.put("processDefinitionCode", processCode);
......@@ -168,7 +167,7 @@ public class TaskClient {
pageParams.put("taskDefinitionJsonObj", taskDefinitionJsonObj);
String content = HttpRequest.post(format)
.header(Constants.TOKEN, tokenKey)
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.form(pageParams)
.timeout(5000)
.execute().body();
......@@ -191,14 +190,14 @@ public class TaskClient {
Map<String, Object> map = new HashMap<>();
map.put("projectCode", projectCode);
map.put("code", taskCode);
String format = StrUtil.format(url + "/projects/{projectCode}/task-definition/{code}/with-upstream", map);
String format = StrUtil.format(dolphinSchedulerProperties.getUrl() + "/projects/{projectCode}/task-definition/{code}/with-upstream", map);
Map<String, Object> params = new HashMap<>();
params.put("upstreamCodes", upstreamCodes);
params.put("taskDefinitionJsonObj", taskDefinitionJsonObj);
String content = HttpRequest.put(format)
.header(Constants.TOKEN, tokenKey)
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.form(params)
.timeout(5000)
.execute().body();
......@@ -218,11 +217,11 @@ public class TaskClient {
public List<Long> genTaskCodes(Long projectCode, int genNum) {
Map<String, Object> map = new HashMap<>();
map.put("projectCode", projectCode);
String format = StrUtil.format(url + "/projects/{projectCode}/task-definition/gen-task-codes", map);
String format = StrUtil.format(dolphinSchedulerProperties.getUrl() + "/projects/{projectCode}/task-definition/gen-task-codes", map);
Map<String, Object> params = new HashMap<>();
params.put("genNum", genNum);
String content = HttpRequest.get(format)
.header(Constants.TOKEN, tokenKey)
.header(Constants.TOKEN, dolphinSchedulerProperties.getToken())
.form(params)
.timeout(5000)
.execute().body();
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.scheduler.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
/**
* DolphinSchedulerProperties
*
* @author wenmo
* @since 2022/9/25 12:57
*/
@Data
@Component
@ConfigurationProperties(prefix = "dinky.dolphinscheduler")
public class DolphinSchedulerProperties {
private boolean enabled;
private String url;
private String token;
private String projectName;
private String address;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment