Commit ef0bce03 authored by godkaikai's avatar godkaikai

后台提交

parent 3da48e7d
...@@ -46,5 +46,33 @@ ...@@ -46,5 +46,33 @@
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.nepxion</groupId>
<artifactId>banner</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-core</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
package com.dlink;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* Dlink 启动器
* @author wenmo
* @since 2021/5/28
*/
@EnableTransactionManagement
@SpringBootApplication
public class Dlink {
public static void main(String[] args) {
SpringApplication.run(Dlink.class, args);
}
}
package com.dlink.common.banner;
import com.dlink.constant.CommonConstant;
import com.dlink.utils.CustomBanner;
import com.nepxion.banner.BannerConstant;
import com.nepxion.banner.Description;
import com.nepxion.banner.LogoBanner;
import com.taobao.text.Color;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
/**
* BannerInitializer
*
* @author wenmo
* @since 2021/5/10 21:40
*/
public class BannerInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
if (!(applicationContext instanceof AnnotationConfigApplicationContext)) {
LogoBanner logoBanner = new LogoBanner(BannerInitializer.class, "/dlink/logo.txt", "Welcome to Dlink", 5, 6, new Color[5], true);
CustomBanner.show(logoBanner, new Description(BannerConstant.VERSION + ":", CommonConstant.PROJECT_VERSION, 0, 1)
, new Description("Github:", "https://github.com/aiwenmo/dlink", 0, 1)
, new Description("公众号:", "DataLink数据中台", 0, 1)
);
}
}
}
\ No newline at end of file
package com.dlink.common.result; package com.dlink.common.result;
import com.dlink.model.CodeEnum;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
......
package com.dlink.constant;
/**
* CommonConstant
*
* @author wenmo
* @since 2021/5/28 9:35
**/
public interface CommonConstant {
/**
* 项目版本号(banner使用)
*/
String PROJECT_VERSION = "0.1.0";
}
package com.dlink.controller;
import com.dlink.common.result.Result;
import com.dlink.model.Task;
import com.dlink.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* AdminController
*
* @author wenmo
* @since 2021/5/28 15:52
**/
@Slf4j
@RestController
@RequestMapping("/user")
public class AdminController {
@Value("${dlink.login.username}")
private String username;
@Value("${dlink.login.password}")
private String password;
/**
* 获取指定ID的信息
*/
@PostMapping("/login")
public Result getOneById(@RequestBody User user) throws Exception {
if(username.equals(user.getUsername())&&password.equals(user.getPassword())) {
return Result.succeed(username, "登录成功");
}else{
return Result.failed("验证失败");
}
}
}
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.Catalogue;
import com.dlink.service.CatalogueService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
/**
* CatalogueController
*
* @author wenmo
* @since 2021/5/28 14:03
**/
@Slf4j
@RestController
@RequestMapping("/catalogue")
public class CatalogueController {
@Autowired
private CatalogueService catalogueService;
/**
* 新增或者更新
*/
@PutMapping
public Result saveOrUpdate(@RequestBody Catalogue catalogue) throws Exception {
if(catalogueService.saveOrUpdate(catalogue)){
return Result.succeed("新增成功");
}else {
return Result.failed("新增失败");
}
}
/**
* 动态查询列表
*/
@PostMapping
public ProTableResult<Catalogue> listCatalogues(@RequestBody JsonNode para) {
return catalogueService.selectForProTable(para);
}
/**
* 批量删除
*/
@DeleteMapping
public Result deleteMul(@RequestBody JsonNode para) {
if (para.size()>0){
boolean isAdmin = false;
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
if(!catalogueService.removeById(id)){
error.add(id);
}
}
if(error.size()==0&&!isAdmin) {
return Result.succeed("删除成功");
}else {
return Result.succeed("删除部分成功,但"+error.toString()+"删除失败,共"+error.size()+"次失败。");
}
}else{
return Result.failed("请选择要删除的记录");
}
}
/**
* 获取指定ID的信息
*/
@PostMapping("/getOneById")
public Result getOneById(@RequestBody Catalogue catalogue) throws Exception {
catalogue = catalogueService.getById(catalogue.getId());
return Result.succeed(catalogue,"获取成功");
}
}
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.Cluster;
import com.dlink.result.SubmitResult;
import com.dlink.service.ClusterService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
/**
* ClusterController
*
* @author wenmo
* @since 2021/5/28 14:03
**/
@Slf4j
@RestController
@RequestMapping("/cluster")
public class ClusterController {
@Autowired
private ClusterService clusterService;
/**
* 新增或者更新
*/
@PutMapping
public Result saveOrUpdate(@RequestBody Cluster cluster) throws Exception {
if(clusterService.saveOrUpdate(cluster)){
return Result.succeed("新增成功");
}else {
return Result.failed("新增失败");
}
}
/**
* 动态查询列表
*/
@PostMapping
public ProTableResult<Cluster> listClusters(@RequestBody JsonNode para) {
return clusterService.selectForProTable(para);
}
/**
* 批量删除
*/
@DeleteMapping
public Result deleteMul(@RequestBody JsonNode para) {
if (para.size()>0){
boolean isAdmin = false;
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
if(!clusterService.removeById(id)){
error.add(id);
}
}
if(error.size()==0&&!isAdmin) {
return Result.succeed("删除成功");
}else {
return Result.succeed("删除部分成功,但"+error.toString()+"删除失败,共"+error.size()+"次失败。");
}
}else{
return Result.failed("请选择要删除的记录");
}
}
/**
* 获取指定ID的信息
*/
@PostMapping("/getOneById")
public Result getOneById(@RequestBody Cluster cluster) throws Exception {
cluster = clusterService.getById(cluster.getId());
return Result.succeed(cluster,"获取成功");
}
/**
* 全部心跳监测
*/
@PostMapping("/heartbeats")
public Result heartbeat(@RequestBody JsonNode para) {
List<Cluster> clusters = clusterService.listEnabledAll();
for (int i = 0; i < clusters.size(); i++) {
Cluster cluster = clusters.get(i);
String jobManagerHost = clusterService.checkHeartBeat(cluster.getHosts(), cluster.getJobManagerHost());
if(jobManagerHost==null){
cluster.setJobManagerHost("");
cluster.setStatus(0);
}else{
cluster.setJobManagerHost(jobManagerHost);
cluster.setStatus(1);
}
clusterService.updateById(cluster);
}
return Result.succeed("状态刷新完成");
}
}
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.Statement;
import com.dlink.result.SubmitResult;
import com.dlink.service.StatementService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
/**
* StatementController
*
* @author wenmo
* @since 2021/5/28 13:48
**/
@Slf4j
@RestController
@RequestMapping("/statement")
public class StatementController {
@Autowired
private StatementService statementService;
/**
* 新增或者更新
*/
@PutMapping
public Result saveOrUpdate(@RequestBody Statement statement) throws Exception {
if(statementService.saveOrUpdate(statement)){
return Result.succeed("新增成功");
}else {
return Result.failed("新增失败");
}
}
/**
* 动态查询列表
*/
@PostMapping
public ProTableResult<Statement> listStatements(@RequestBody JsonNode para) {
return statementService.selectForProTable(para);
}
/**
* 批量删除
*/
@DeleteMapping
public Result deleteMul(@RequestBody JsonNode para) {
if (para.size()>0){
boolean isAdmin = false;
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
if(!statementService.removeById(id)){
error.add(id);
}
}
if(error.size()==0&&!isAdmin) {
return Result.succeed("删除成功");
}else {
return Result.succeed("删除部分成功,但"+error.toString()+"删除失败,共"+error.size()+"次失败。");
}
}else{
return Result.failed("请选择要删除的记录");
}
}
/**
* 获取指定ID的信息
*/
@PostMapping("/getOneById")
public Result getOneById(@RequestBody Statement statement) throws Exception {
statement = statementService.getById(statement.getId());
return Result.succeed(statement,"获取成功");
}
}
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.Task;
import com.dlink.result.SubmitResult;
import com.dlink.service.TaskService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
/**
* 任务 Controller
*
* @author wenmo
* @since 2021-05-24
*/
@Slf4j
@RestController
@RequestMapping("/task")
public class TaskController {
@Autowired
private TaskService taskService;
/**
* 新增或者更新
*/
@PutMapping
public Result saveOrUpdate(@RequestBody Task task) throws Exception {
if(taskService.saveOrUpdate(task)){
return Result.succeed("新增成功");
}else {
return Result.failed("新增失败");
}
}
/**
* 动态查询列表
*/
@PostMapping
public ProTableResult<Task> listTasks(@RequestBody JsonNode para) {
return taskService.selectForProTable(para);
}
/**
* 批量删除
*/
@DeleteMapping
public Result deleteMul(@RequestBody JsonNode para) {
if (para.size()>0){
boolean isAdmin = false;
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
if(!taskService.removeById(id)){
error.add(id);
}
}
if(error.size()==0&&!isAdmin) {
return Result.succeed("删除成功");
}else {
return Result.succeed("删除部分成功,但"+error.toString()+"删除失败,共"+error.size()+"次失败。");
}
}else{
return Result.failed("请选择要删除的记录");
}
}
/**
* 批量执行
*/
@PostMapping(value = "/submit")
public Result submit(@RequestBody JsonNode para) throws Exception {
if (para.size()>0){
List<SubmitResult> results = new ArrayList<>();
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
SubmitResult result = taskService.submitByTaskId(id);
if(!result.isSuccess()){
error.add(id);
}
results.add(result);
}
if(error.size()==0) {
return Result.succeed(results,"执行成功");
}else {
return Result.succeed(results,"执行部分成功,但"+error.toString()+"执行失败,共"+error.size()+"次失败。");
}
}else{
return Result.failed("请选择要执行的记录");
}
}
/**
* 获取指定ID的信息
*/
@PostMapping("/getOneById")
public Result getOneById(@RequestBody Task task) throws Exception {
task = taskService.getById(task.getId());
return Result.succeed(task,"获取成功");
}
}
package com.dlink.db.annotation;
/**
* 新增接口
*
* @author wenmo
* @since 2021/5/3 20:57
*/
public interface Save {
}
package com.dlink.db.model;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import com.dlink.db.annotation.Save;
import lombok.Getter;
import lombok.Setter;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* SuperEntity
*
* @author wenmo
* @since 2021/5/28
*/
@Setter
@Getter
public class SuperEntity<T extends Model<?>> extends Model<T> implements Serializable{
/**
* 主键ID
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
@NotNull(message = "名称不能为空", groups = {Save.class})
private String name;
@NotNull(message = "是否启用不能为空", groups = {Save.class})
private Boolean enabled;
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime;
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime;
@Override
protected Serializable pkVal() {
return this.id;
}
}
package com.dlink.exception;
/**
* BusException
*
* @author wenmo
* @since 2021/5/28 14:21
**/
public class BusException extends RuntimeException {
private static final long serialVersionUID = -2955156471454043812L;
public BusException(String message) {
super(message);
}
}
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.Catalogue;
import org.apache.ibatis.annotations.Mapper;
/**
* CatalogueMapper
*
* @author wenmo
* @since 2021/5/28 13:53
**/
@Mapper
public interface CatalogueMapper extends SuperMapper<Catalogue> {
}
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.Cluster;
import org.apache.ibatis.annotations.Mapper;
/**
* ClusterMapper
*
* @author wenmo
* @since 2021/5/28 13:56
**/
@Mapper
public interface ClusterMapper extends SuperMapper<Cluster> {
}
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.Statement;
import org.apache.ibatis.annotations.Mapper;
/**
* StatementMapper
*
* @author wenmo
* @since 2021/5/28 13:41
**/
@Mapper
public interface StatementMapper extends SuperMapper<Statement> {
}
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.Task;
import org.apache.ibatis.annotations.Mapper;
/**
* 作业 Mapper 接口
* @author wenmo
* @since 2021-05-28
*/
@Mapper
public interface TaskMapper extends SuperMapper<Task> {
}
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.db.model.SuperEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* Catalogue
*
* @author wenmo
* @since 2021/5/28 13:51
**/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_catalogue")
public class Catalogue extends SuperEntity {
private static final long serialVersionUID = 4659379420249868394L;
private Integer taskId;
private String type;
private Integer parentId;
private Boolean isDir;
}
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.db.model.SuperEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* Cluster
*
* @author wenmo
* @since 2021/5/28 13:53
**/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_cluster")
public class Cluster extends SuperEntity {
private static final long serialVersionUID = 3104721227014487321L;
private String alias;
private String type;
private String hosts;
private String jobManagerHost;
private Integer status;
private String note;
}
package com.dlink.model;
/**
* 状态码
*
* @author wenmo
* @since 2021/5/28 19:58
*/
public enum CodeEnum {
SUCCESS(0),
ERROR(1);
private Integer code;
CodeEnum(Integer code){
this.code = code;
}
public Integer getCode() {
return code;
}
}
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
/**
* Statement
*
* @author wenmo
* @since 2021/5/28 13:39
**/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_task_statement")
public class Statement implements Serializable {
private static final long serialVersionUID = 1646348574144815792L;
private Integer id;
private String statement;
}
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.db.model.SuperEntity;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* 任务
*
* @author wenmo
* @since 2021-05-28
*/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_task")
public class Task extends SuperEntity{
private static final long serialVersionUID = 5988972129893667154L;
private String alias;
private String type;
private Integer checkPoint;
private String savePointPath;
private Integer parallelism;
private boolean fragment;
private Integer clusterId;
private String note;
public ExecutorSetting getLocalExecutorSetting(){
return new ExecutorSetting(Executor.LOCAL,checkPoint,parallelism,fragment,savePointPath);
}
public ExecutorSetting getRemoteExecutorSetting(){
return new ExecutorSetting(Executor.REMOTE,checkPoint,parallelism,fragment,savePointPath);
}
}
package com.dlink.model;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
/**
* User
*
* @author wenmo
* @since 2021/5/28 15:57
**/
@Data
@EqualsAndHashCode(callSuper = false)
public class User implements Serializable{
private static final long serialVersionUID = -1077801296270024204L;
private String username;
private String password;
}
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.Catalogue;
/**
* CatalogueService
*
* @author wenmo
* @since 2021/5/28 14:01
**/
public interface CatalogueService extends ISuperService<Catalogue> {
}
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.Cluster;
import java.util.List;
/**
* ClusterService
*
* @author wenmo
* @since 2021/5/28 14:01
**/
public interface ClusterService extends ISuperService<Cluster> {
String checkHeartBeat(String hosts,String host);
List<Cluster> listEnabledAll();
}
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.Statement;
/**
* StatementService
*
* @author wenmo
* @since 2021/5/28 13:45
**/
public interface StatementService extends ISuperService<Statement> {
}
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.Task;
import com.dlink.result.SubmitResult;
/**
* 作业 服务类
*
* @author wenmo
* @since 2021-05-28
*/
public interface TaskService extends ISuperService<Task> {
SubmitResult submitByTaskId(Integer id);
}
package com.dlink.service.impl;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.CatalogueMapper;
import com.dlink.model.Catalogue;
import com.dlink.service.CatalogueService;
import org.springframework.stereotype.Service;
/**
* CatalogueServiceImpl
*
* @author wenmo
* @since 2021/5/28 14:02
**/
@Service
public class CatalogueServiceImpl extends SuperServiceImpl<CatalogueMapper, Catalogue> implements CatalogueService {
}
package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.cluster.FlinkCluster;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.ClusterMapper;
import com.dlink.model.Cluster;
import com.dlink.service.ClusterService;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* ClusterServiceImpl
*
* @author wenmo
* @since 2021/5/28 14:02
**/
@Service
public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster> implements ClusterService {
@Override
public String checkHeartBeat(String hosts,String host) {
return FlinkCluster.testFlinkJobManagerIP(hosts,host);
}
@Override
public List<Cluster> listEnabledAll() {
return this.list(new QueryWrapper<Cluster>().eq("enabled",1));
}
}
package com.dlink.service.impl;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.StatementMapper;
import com.dlink.model.Statement;
import com.dlink.service.StatementService;
import org.springframework.stereotype.Service;
/**
* StatementServiceImpl
*
* @author wenmo
* @since 2021/5/28 13:45
**/
@Service
public class StatementServiceImpl extends SuperServiceImpl<StatementMapper, Statement> implements StatementService {
}
package com.dlink.service.impl;
import com.dlink.cluster.FlinkCluster;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.exception.BusException;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.job.JobManager;
import com.dlink.mapper.TaskMapper;
import com.dlink.model.Cluster;
import com.dlink.model.Statement;
import com.dlink.model.Task;
import com.dlink.result.SubmitResult;
import com.dlink.service.ClusterService;
import com.dlink.service.StatementService;
import com.dlink.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 任务 服务实现类
*
* @author wenmo
* @since 2021-05-24
*/
@Service
public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implements TaskService {
@Autowired
private StatementService statementService;
@Autowired
private ClusterService clusterService;
@Override
public SubmitResult submitByTaskId(Integer id) {
Task task = this.getById(id);
if (task == null) {
throw new BusException("作业不存在");
}
Cluster cluster = clusterService.getById(task.getClusterId());
if (cluster == null) {
throw new BusException("Flink集群不存在");
}
Statement statement = statementService.getById(id);
if (statement == null) {
throw new BusException("FlinkSql语句不存在");
}
String host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
if (host == null || "".equals(host)) {
throw new BusException("集群地址暂不可用");
}
JobManager jobManager = new JobManager(host);
return jobManager.submit(statement.getStatement(), task.getRemoteExecutorSetting());
}
}
package com.dlink.utils;
import com.nepxion.banner.BannerConstant;
import com.nepxion.banner.Description;
import com.nepxion.banner.DescriptionBanner;
import com.nepxion.banner.LogoBanner;
import java.util.ArrayList;
import java.util.List;
/**
* CustomBanner
*
* @author wenmo
* @since 2021/5/10 21:45
*/
public class CustomBanner {
public static void show(LogoBanner logoBanner, Description... descriptionList) {
String bannerShown = System.getProperty(BannerConstant.BANNER_SHOWN, "true");
if (!Boolean.valueOf(bannerShown)) {
return;
}
System.out.println("");
String bannerShownAnsiMode = System.getProperty(BannerConstant.BANNER_SHOWN_ANSI_MODE, "false");
if (Boolean.valueOf(bannerShownAnsiMode)) {
System.out.println(logoBanner.getBanner());
} else {
System.out.println(logoBanner.getPlainBanner());
}
List<Description> descriptions = new ArrayList<>();
for (Description description : descriptionList) {
descriptions.add(description);
}
DescriptionBanner descriptionBanner = new DescriptionBanner();
System.out.println(descriptionBanner.getBanner(descriptions));
}
}
org.springframework.context.ApplicationContextInitializer=\
com.dlink.common.banner.BannerInitializer
########################## 统一变量配置 ########################## ########################## 统一变量配置 ##########################
##### 数据库配置 ##### 数据库配置
datalink.datasource.ip=192.168.24.1 dlink.datasource.ip=10.1.51.25
datalink.datasource.username=datalink dlink.datasource.username=dlink
datalink.datasource.password=datalink dlink.datasource.password=dlink
##### redis配置
spring.redis.host=192.168.123.156
spring.redis.port=6379
spring.redis.password=123456
spring.redis.timeout=5000
##### Flink 集群配置 ##### Flink 集群配置
datalink.flink.host=192.168.123.157 dlink.flink.host=10.1.51.24,10.1.51.25,10.1.51.26
datalink.flink.port=8081 dlink.flink.port=8081
##### elasticsearch配置
#datalink.elasticsearch.uris=192.168.123.156:9200
#datalink.elasticsearch.username=elastic
#datalink.elasticsearch.password=qEnNfKNujqNrOPD9q5kb
##### sentinel配置
#datalink.sentinel.dashboard=192.168.123.156:6999
##### 日志链路追踪
#datalink.trace.enable=true
##### 负载均衡隔离(version隔离,只适用于开发环境) ##### 登录用户配置
datalink.ribbon.isolation.enabled=false dlink.login.username=admin
dlink.login.password=dlink
##### mybatis-plus打印完整sql(只适用于开发环境) ##### mybatis-plus打印完整sql(只适用于开发环境)
mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
\ No newline at end of file
########################## application级别通用配置 ########################## ########################## application级别通用配置 ##########################
##### ribbon配置
## 从注册中心刷新servelist的时间 默认30秒,单位ms
ribbon.ServerListRefreshInterval=15000
## 请求连接的超时时间 默认1秒,单位ms
ribbon.ConnectTimeout=30000
## 请求处理的超时时间 默认1秒,单位ms
ribbon.ReadTimeout=30000
## 对所有操作请求都进行重试,不配置这个MaxAutoRetries不起作用 默认false
#ribbon.OkToRetryOnAllOperations=true
## 对当前实例的重试次数 默认0
#ribbon.MaxAutoRetries=1
## 切换实例的重试次数 默认1
ribbon.MaxAutoRetriesNextServer=0
##### feign配置
feign.sentinel.enabled=true
feign.hystrix.enabled=false
feign.okhttp.enabled=true
feign.httpclient.enabled=false
feign.httpclient.max-connections=1000
feign.httpclient.max-connections-per-route=100
feign.client.config.feignName.connectTimeout=30000
feign.client.config.feignName.readTimeout=30000
## 开启Feign请求响应压缩
feign.compression.request.enabled=true
feign.compression.response.enabled=true
## 配置压缩文档类型及最小压缩的文档大小
feign.compression.request.mime-types=text/xml,application/xml,application/json
feign.compression.request.min-request-size=2048
##### sentinel配置
spring.cloud.sentinel.transport.dashboard=${datalink.sentinel.dashboard}
spring.cloud.sentinel.eager=true
##### druid配置 ##### druid配置
#连接池配置(通常来说,只需要修改initialSize、minIdle、maxActive #连接池配置(通常来说,只需要修改initialSize、minIdle、maxActive
spring.datasource.druid.initial-size=10 spring.datasource.druid.initial-size=10
...@@ -86,14 +49,3 @@ mybatis-plus.global-config.db-config.logic-not-delete-value=0 ...@@ -86,14 +49,3 @@ mybatis-plus.global-config.db-config.logic-not-delete-value=0
mybatis-plus.configuration.map-underscore-to-camel-case=true mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.configuration.cache-enabled=false mybatis-plus.configuration.cache-enabled=false
##### redis配置
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
spring:
datasource:
url: jdbc:mysql://10.1.51.25:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: dfly
password: Dareway@2020
driver-class-name: com.mysql.cj.jdbc.Driver
application:
name: dlink
server:
port: 8888
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
#实体扫描,多个package用逗号或者分号分隔
typeAliasesPackage: com.dlink.model
global-config:
db-config:
id-type: auto
configuration:
##### mybatis-plus打印完整sql(只适用于开发环境)
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
##### Flink 集群配置
dlink:
flink:
host: 10.1.51.24,10.1.51.25,10.1.51.26
port: 8081
##### 登录用户配置
login:
username: admin
password: dlink
\ No newline at end of file
########################## bootstrap级别通用配置 ########################## ########################## bootstrap级别通用配置 ##########################
# 默认开发环境 # 默认开发环境
spring.profiles.active=dev spring.profiles.active=dev
server.port=8888
##### nacos(注册中心和配置中心)地址 spring.application.name=Dlink
spring.cloud.nacos.server-addr=192.168.123.156:8848 \ No newline at end of file
#spring.cloud.nacos.username=nacos
#spring.cloud.nacos.password=nacos
spring.cloud.nacos.config.file-extension=yml
spring.cloud.nacos.config.shared-dataids=common.yml
spring.cloud.nacos.config.refreshable-dataids=common.yml
##### spring-boot-actuator配置
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
##### 允许bean覆盖
spring.main.allow-bean-definition-overriding=true
\ No newline at end of file
server:
port: 8004
spring:
application:
name: datalink-task
\ No newline at end of file
____ __ _ __
/ __ \ / /(_)___ / /__
/ / / // // / __ \/ //_/
/ /_/ // // / / / / <
/_____//__/_/_/ /_/_/|_|
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<contextName>${APP_NAME}</contextName>
<springProperty name="APP_NAME" scope="context" source="spring.application.name"/>
<springProperty name="LOG_FILE" scope="context" source="logging.file" defaultValue="../logs/application/${APP_NAME}"/>
<springProperty name="LOG_POINT_FILE" scope="context" source="logging.file" defaultValue="../logs/point"/>
<springProperty name="LOG_AUDIT_FILE" scope="context" source="logging.file" defaultValue="../logs/audit"/>
<springProperty name="LOG_MAXFILESIZE" scope="context" source="logback.filesize" defaultValue="50MB"/>
<springProperty name="LOG_FILEMAXDAY" scope="context" source="logback.filemaxday" defaultValue="7"/>
<springProperty name="ServerIP" scope="context" source="spring.cloud.client.ip-address" defaultValue="0.0.0.0"/>
<springProperty name="ServerPort" scope="context" source="server.port" defaultValue="0000"/>
<!-- 彩色日志 -->
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="[${APP_NAME}:${ServerIP}:${ServerPort}] %clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%level){blue} %clr(${PID}){magenta} %clr([%X{traceId}]){yellow} %clr([%thread]){orange} %clr(%-40.40logger{39}){cyan} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}" />
<property name="CONSOLE_LOG_PATTERN_NO_COLOR" value="[${APP_NAME}:${ServerIP}:${ServerPort}] %d{yyyy-MM-dd HH:mm:ss.SSS} %level ${PID} [%X{traceId}] [%thread] %-40.40logger{39} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}" />
<!-- 控制台日志 -->
<appender name="StdoutAppender" class="ch.qos.logback.core.ConsoleAppender">
<withJansi>true</withJansi>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- 按照每天生成常规日志文件 -->
<appender name="FileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_FILE}/${APP_NAME}.log</file>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN_NO_COLOR}</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 基于时间的分包策略 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE}/${APP_NAME}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!--保留时间,单位:天-->
<maxHistory>${LOG_FILEMAXDAY}</maxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${LOG_MAXFILESIZE}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
</filter>
</appender>
<appender name="point_log" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_POINT_FILE}/point.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|${APP_NAME}|%msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 基于时间的分包策略 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_POINT_FILE}/point.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!--保留时间,单位:天-->
<maxHistory>${LOG_FILEMAXDAY}</maxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${LOG_MAXFILESIZE}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
</appender>
<appender name="audit_log" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_AUDIT_FILE}/audit.log</file>
<encoder>
<pattern>%msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 基于时间的分包策略 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_AUDIT_FILE}/audit.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!--保留时间,单位:天-->
<maxHistory>${LOG_FILEMAXDAY}</maxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${LOG_MAXFILESIZE}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
</appender>
<appender name="point_log_async" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="point_log"/>
</appender>
<appender name="file_async" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="FileAppender"/>
</appender>
<appender name="audit_log_async" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="audit_log"/>
</appender>
<logger name="com.datalink.log.monitor" level="debug" addtivity="false">
<appender-ref ref="point_log_async" />
</logger>
<logger name="com.datalink.log.service.impl.LoggerAuditServiceImpl" level="debug" addtivity="false">
<appender-ref ref="audit_log_async" />
</logger>
<root level="INFO">
<appender-ref ref="StdoutAppender"/>
<appender-ref ref="file_async"/>
</root>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.CatalogueMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.dlink.model.Catalogue">
<id column="id" property="id" />
<result column="name" property="name" />
<result column="task_id" property="taskId" />
<result column="type" property="type" />
<result column="parent_id" property="parentId" />
<result column="is_dir" property="isDir" />
<result column="enabled" property="enabled" />
<result column="create_time" property="createTime" />
<result column="update_time" property="updateTime" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, task_id, type,parent_id,is_dir, enabled, create_time, update_time
</sql>
<select id="selectForProTable" resultType="com.dlink.model.Catalogue">
select
a.*
from
dlink_catalogue a
<where>
1=1
<if test='param.name!=null and param.name!=""'>
and a.name like "%${param.name}%"
</if>
<if test='param.task_id!=null and param.task_id!=""'>
and a.task_id = #{param.task_id}
</if>
<if test='param.createTime!=null and param.createTime!=""'>
and a.create_time <![CDATA[>=]]> str_to_date( #{param.createTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='param.updateTime!=null and param.updateTime!=""'>
and a.update_time <![CDATA[>=]]> str_to_date( #{param.updateTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.ClusterMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.dlink.model.Cluster">
<id column="id" property="id" />
<result column="name" property="name" />
<result column="alias" property="alias" />
<result column="type" property="type" />
<result column="hosts" property="hosts" />
<result column="job_manager_host" property="jobManagerHost" />
<result column="status" property="status" />
<result column="note" property="note" />
<result column="enabled" property="enabled" />
<result column="create_time" property="createTime" />
<result column="update_time" property="updateTime" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,hosts,job_manager_host, status,note, enabled, create_time, update_time
</sql>
<select id="selectForProTable" resultType="com.dlink.model.Cluster">
select
a.*
from
dlink_cluster a
<where>
1=1
<if test='param.name!=null and param.name!=""'>
and a.name like "%${param.name}%"
</if>
<if test='param.alias!=null and param.alias!=""'>
and a.alias like "%${param.alias}%"
</if>
<if test='param.createTime!=null and param.createTime!=""'>
and a.create_time <![CDATA[>=]]> str_to_date( #{param.createTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='param.updateTime!=null and param.updateTime!=""'>
and a.update_time <![CDATA[>=]]> str_to_date( #{param.updateTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.StatementMapper">
<resultMap id="BaseResultMap" type="com.dlink.model.Statement">
<id column="id" property="id" />
<result column="statement" property="statement" />
</resultMap>
<sql id="Base_Column_List">
id,statement
</sql>
<select id="selectForProTable" resultType="com.dlink.model.Task">
select
a.*
from
dlink_task_statement a
<where>
1=1
<if test='param.id!=null and param.id!=""'>
and a.id = #{param.id}
</if>
<if test='param.statement!=null and param.statement!=""'>
and a.statement like "%${param.statement}%"
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.TaskMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.dlink.model.Task">
<id column="id" property="id" />
<result column="name" property="name" />
<result column="alias" property="alias" />
<result column="type" property="type" />
<result column="check_point" property="checkPoint" />
<result column="save_point_path" property="savePointPath" />
<result column="parallelism" property="parallelism" />
<result column="fragment" property="fragment" />
<result column="cluster_id" property="clusterId" />
<result column="note" property="note" />
<result column="enabled" property="enabled" />
<result column="create_time" property="createTime" />
<result column="update_time" property="updateTime" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,check_point,save_point_path, parallelism,fragment,cluster_id,note, enabled, create_time, update_time
</sql>
<select id="selectForProTable" resultType="com.dlink.model.Task">
select
a.*
from
dlink_task a
<where>
1=1
<if test='param.name!=null and param.name!=""'>
and a.name like "%${param.name}%"
</if>
<if test='param.alias!=null and param.alias!=""'>
and a.alias like "%${param.alias}%"
</if>
<if test='param.createTime!=null and param.createTime!=""'>
and a.create_time <![CDATA[>=]]> str_to_date( #{param.createTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='param.updateTime!=null and param.updateTime!=""'>
and a.update_time <![CDATA[>=]]> str_to_date( #{param.updateTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
...@@ -26,26 +26,21 @@ public class FlinkCluster { ...@@ -26,26 +26,21 @@ public class FlinkCluster {
FlinkCluster.flinkJobMangerHost = flinkJobMangerHost; FlinkCluster.flinkJobMangerHost = flinkJobMangerHost;
} }
public static String getFlinkJobManagerIP(String flinkServers) { public static String getFlinkJobManagerIP(String hosts) {
String res = "";
String flinkAddress = "";
try { try {
flinkAddress = getFlinkJobMangerHost(); String res = HttpUtil.get(NetConstant.HTTP + getFlinkJobMangerHost() + NetConstant.SLASH + FlinkHistoryConstant.JOBS, NetConstant.SERVER_TIME_OUT_ACTIVE);
res = HttpUtil.get(NetConstant.HTTP + flinkAddress + NetConstant.COLON + NetConstant.PORT + NetConstant.SLASH + FlinkHistoryConstant.JOBS, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) { if (!res.isEmpty()) {
return flinkAddress; return getFlinkJobMangerHost();
} }
} catch (Exception e) { } catch (Exception e) {
} }
String[] servers = flinkServers.split(","); String[] servers = hosts.split(",");
for (String server : servers) { for (String server : servers) {
try { try {
String url = NetConstant.HTTP + server + NetConstant.COLON + NetConstant.PORT + NetConstant.SLASH + FlinkHistoryConstant.JOBS; String url = NetConstant.HTTP + server + NetConstant.SLASH + FlinkHistoryConstant.JOBS;
res = HttpUtil.get(url, NetConstant.SERVER_TIME_OUT_ACTIVE); String res = HttpUtil.get(url, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) { if (!res.isEmpty()) {
if(server.equalsIgnoreCase(flinkAddress)){ setFlinkJobMangerHost(server);
setFlinkJobMangerHost(server);
}
return server; return server;
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -53,4 +48,42 @@ public class FlinkCluster { ...@@ -53,4 +48,42 @@ public class FlinkCluster {
} }
return ""; return "";
} }
public static String getFlinkJobManagerHost(String hosts) {
String[] servers = hosts.split(",");
for (String server : servers) {
try {
String res = HttpUtil.get(NetConstant.HTTP + server + NetConstant.SLASH + FlinkHistoryConstant.JOBS, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
setFlinkJobMangerHost(server);
return server;
}
} catch (Exception e) {
}
}
return "";
}
public static String testFlinkJobManagerIP(String hosts,String host) {
try {
String res = HttpUtil.get(NetConstant.HTTP + host + NetConstant.SLASH + FlinkHistoryConstant.JOBS, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
return host;
}
} catch (Exception e) {
}
String[] servers = hosts.split(",");
for (String server : servers) {
try {
String url = NetConstant.HTTP + server + NetConstant.SLASH + FlinkHistoryConstant.JOBS;
String res = HttpUtil.get(url, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
setFlinkJobMangerHost(server);
return server;
}
} catch (Exception e) {
}
}
return null;
}
} }
...@@ -47,4 +47,8 @@ public interface FlinkSQLConstant { ...@@ -47,4 +47,8 @@ public interface FlinkSQLConstant {
* 删除表语句的头部 DROP TABLE IF EXISTS * 删除表语句的头部 DROP TABLE IF EXISTS
*/ */
String DROP_TABLE_HEAD = " DROP TABLE IF EXISTS "; String DROP_TABLE_HEAD = " DROP TABLE IF EXISTS ";
/**
* 分隔符
*/
String SEPARATOR = ";";
} }
...@@ -8,24 +8,34 @@ package com.dlink.executor; ...@@ -8,24 +8,34 @@ package com.dlink.executor;
**/ **/
public class ExecutorSetting { public class ExecutorSetting {
private String type = Executor.LOCAL; private String type = Executor.LOCAL;
private Long checkpoint; private Integer checkpoint;
private Integer parallelism;
private boolean useSqlFragment = true; private boolean useSqlFragment = true;
private String savePointPath;
public ExecutorSetting(String type) { public ExecutorSetting(String type) {
this.type = type; this.type = type;
} }
public ExecutorSetting(String type, Long checkpoint) { public ExecutorSetting(String type, Integer checkpoint) {
this.type = type; this.type = type;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
} }
public ExecutorSetting(String type, Long checkpoint, boolean useSqlFragment) { public ExecutorSetting(String type, Integer checkpoint, boolean useSqlFragment) {
this.type = type; this.type = type;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.useSqlFragment = useSqlFragment; this.useSqlFragment = useSqlFragment;
} }
public ExecutorSetting(String type, Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) {
this.type = type;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
}
public String getType() { public String getType() {
return type; return type;
} }
...@@ -34,11 +44,11 @@ public class ExecutorSetting { ...@@ -34,11 +44,11 @@ public class ExecutorSetting {
this.type = type; this.type = type;
} }
public Long getCheckpoint() { public Integer getCheckpoint() {
return checkpoint; return checkpoint;
} }
public void setCheckpoint(Long checkpoint) { public void setCheckpoint(Integer checkpoint) {
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
} }
...@@ -49,4 +59,20 @@ public class ExecutorSetting { ...@@ -49,4 +59,20 @@ public class ExecutorSetting {
public void setUseSqlFragment(boolean useSqlFragment) { public void setUseSqlFragment(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment; this.useSqlFragment = useSqlFragment;
} }
public Integer getParallelism() {
return parallelism;
}
public void setParallelism(Integer parallelism) {
this.parallelism = parallelism;
}
public String getSavePointPath() {
return savePointPath;
}
public void setSavePointPath(String savePointPath) {
this.savePointPath = savePointPath;
}
} }
...@@ -13,6 +13,7 @@ import org.apache.flink.api.common.JobID; ...@@ -13,6 +13,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import java.time.LocalDate; import java.time.LocalDate;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -30,6 +31,19 @@ public class JobManager { ...@@ -30,6 +31,19 @@ public class JobManager {
private String sessionId; private String sessionId;
private Integer maxRowNum = 100; private Integer maxRowNum = 100;
public JobManager(String host) {
if(host!=null) {
String[] strs = host.split(":");
if(strs.length>=2) {
this.flinkHost = strs[0];
this.port = Integer.parseInt(strs[1]);
}else{
this.flinkHost = strs[0];
this.port = 8081;
}
}
}
public JobManager(String flinkHost, Integer port) { public JobManager(String flinkHost, Integer port) {
this.flinkHost = flinkHost; this.flinkHost = flinkHost;
this.port = port; this.port = port;
...@@ -84,6 +98,14 @@ public class JobManager { ...@@ -84,6 +98,14 @@ public class JobManager {
return runResult; return runResult;
} }
public SubmitResult submit(String statement, ExecutorSetting executerSetting) {
if(statement==null||"".equals(statement)){
return SubmitResult.error("FlinkSql语句不存在");
}
String [] statements = statement.split(FlinkSQLConstant.SEPARATOR);
return submit(Arrays.asList(statements),executerSetting);
}
public SubmitResult submit(List<String> sqlList, ExecutorSetting executerSetting) { public SubmitResult submit(List<String> sqlList, ExecutorSetting executerSetting) {
SubmitResult result = new SubmitResult(sessionId,sqlList,flinkHost); SubmitResult result = new SubmitResult(sessionId,sqlList,flinkHost);
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
......
...@@ -23,6 +23,15 @@ public class SubmitResult { ...@@ -23,6 +23,15 @@ public class SubmitResult {
public SubmitResult() { public SubmitResult() {
} }
public static SubmitResult error(String error){
return new SubmitResult(false, error);
}
public SubmitResult(boolean success, String error) {
this.success = success;
this.error = error;
}
public SubmitResult(String sessionId, List<String> statements, String flinkHost) { public SubmitResult(String sessionId, List<String> statements, String flinkHost) {
this.sessionId = sessionId; this.sessionId = sessionId;
this.statements = statements; this.statements = statements;
......
...@@ -25,6 +25,10 @@ ...@@ -25,6 +25,10 @@
<lombok.version>1.18.16</lombok.version> <lombok.version>1.18.16</lombok.version>
<jackjson.version>2.11.4</jackjson.version> <jackjson.version>2.11.4</jackjson.version>
<guava.version>21.0</guava.version> <guava.version>21.0</guava.version>
<slf4j.version>1.7.30</slf4j.version>
<mysql-connector-java.version>8.0.22</mysql-connector-java.version>
<banner.version>1.0.2</banner.version>
<hibernate-validator.version>6.2.0.Final</hibernate-validator.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
...@@ -90,6 +94,31 @@ ...@@ -90,6 +94,31 @@
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>${guava.version}</version> <version>${guava.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector-java.version}</version>
</dependency>
<dependency>
<groupId>com.nepxion</groupId>
<artifactId>banner</artifactId>
<version>${banner.version}</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>${hibernate-validator.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment