Unverified Commit 3da48e7d authored by aiwenmo's avatar aiwenmo Committed by GitHub

Merge pull request #1 from aiwenmo/dev

core
parents 068afcf8 65c81075
......@@ -21,3 +21,10 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
.idea/*
target/*
*.iml
*.lst
*/target/*
*/*/target/*
dlink-web/node_modules/*
# dlink
\ No newline at end of file
# Dlink
## 简介
Dlink 为 Apache Flink 而生。它是一个敏捷的 FlinkSQL 开发运维平台,可以在线开发、预览、提交作业。
与此同时,Dlink 也是 DataLink 数据中台生态的核心组件。
DataLink 开源项目及社区正在建设,希望本项目可以帮助你更快发展。
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-admin</artifactId>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.dlink.common.result;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
* 分页结果
*
* @author wenmo
* @since 2021/5/3 20:03
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PageResult<T> implements Serializable {
private static final long serialVersionUID = -5143774412936881374L;
/**
* 总数
*/
private Long count;
/**
* 是否成功:0 成功、1 失败
*/
private int code;
/**
* 当前页结果集
*/
private List<T> data;
}
package com.dlink.common.result;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
* Ant Design Pro ProTable Query Result
*
* @author wenmo
* @since 2021/5/18 21:54
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProTableResult<T> implements Serializable {
private static final long serialVersionUID = -6377431009117000655L;
/**
* 总数
*/
private Long total;
/**
* 是否成功:true 成功、false 失败
*/
private boolean success;
/**
* 当前页码
*/
private Integer current;
/**
* 当前每页记录数
*/
private Integer pageSize;
/**
* 当前页结果集
*/
private List<T> data;
}
package com.dlink.common.result;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 返回对象
*
* @author wenmo
* @since 2021/5/3 19:56
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Result<T> implements Serializable {
private T datas;
private Integer code;
private String msg;
public static <T> Result<T> succeed(String msg) {
return of(null, CodeEnum.SUCCESS.getCode(), msg);
}
public static <T> Result<T> succeed(T model, String msg) {
return of(model, CodeEnum.SUCCESS.getCode(), msg);
}
public static <T> Result<T> succeed(T model) {
return of(model, CodeEnum.SUCCESS.getCode(), "");
}
public static <T> Result<T> of(T datas, Integer code, String msg) {
return new Result<>(datas, code, msg);
}
public static <T> Result<T> failed(String msg) {
return of(null, CodeEnum.ERROR.getCode(), msg);
}
public static <T> Result<T> failed(T model, String msg) {
return of(model, CodeEnum.ERROR.getCode(), msg);
}
}
package com.dlink.db.config;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import com.baomidou.mybatisplus.core.parser.ISqlParser;
import com.baomidou.mybatisplus.core.parser.ISqlParserFilter;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.plugins.tenant.TenantHandler;
import com.baomidou.mybatisplus.extension.plugins.tenant.TenantSqlParser;
import com.dlink.db.handler.DateMetaObjectHandler;
import com.dlink.db.properties.MybatisPlusFillProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
/**
* MybatisPlusConfigure
*
* @author wenmo
* @since 2021/5/25
**/
@EnableConfigurationProperties(MybatisPlusFillProperties.class)
public class MybatisPlusConfigure {
@Autowired
private MybatisPlusFillProperties autoFillProperties;
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
return paginationInterceptor;
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "dlink.mybatis-plus.fill", name = "enabled", havingValue = "true", matchIfMissing = true)
public MetaObjectHandler metaObjectHandler() {
return new DateMetaObjectHandler(autoFillProperties);
}
}
package com.dlink.db.handler;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import com.dlink.db.properties.MybatisPlusFillProperties;
import org.apache.ibatis.reflection.MetaObject;
import java.time.LocalDateTime;
/**
* DateMetaObjectHandler
*
* @author wenmo
* @since 2021/5/25
**/
public class DateMetaObjectHandler implements MetaObjectHandler {
private MybatisPlusFillProperties mybatisPlusFillProperties;
public DateMetaObjectHandler(MybatisPlusFillProperties mybatisPlusFillProperties) {
this.mybatisPlusFillProperties = mybatisPlusFillProperties;
}
@Override
public boolean openInsertFill() {
return mybatisPlusFillProperties.getEnableInsertFill();
}
@Override
public boolean openUpdateFill() {
return mybatisPlusFillProperties.getEnableUpdateFill();
}
@Override
public void insertFill(MetaObject metaObject) {
Object createTime = getFieldValByName(mybatisPlusFillProperties.getCreateTimeField(), metaObject);
Object updateTime = getFieldValByName(mybatisPlusFillProperties.getUpdateTimeField(), metaObject);
if (createTime == null) {
setFieldValByName(mybatisPlusFillProperties.getCreateTimeField(), LocalDateTime.now(), metaObject);
}
if (updateTime == null) {
setFieldValByName(mybatisPlusFillProperties.getUpdateTimeField(), LocalDateTime.now(), metaObject);
}
}
@Override
public void updateFill(MetaObject metaObject) {
setFieldValByName(mybatisPlusFillProperties.getUpdateTimeField(), LocalDateTime.now(), metaObject);
}
}
\ No newline at end of file
package com.dlink.db.mapper;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
/**
* SuperMapper
*
* @author wenmo
* @since 2021/5/25
**/
public interface SuperMapper<T> extends BaseMapper<T> {
List<T> selectForProTable(Page<T> page, @Param(Constants.WRAPPER) Wrapper<T> queryWrapper, @Param("param") Map<String, Object> param);
}
package com.dlink.db.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* MybatisPlusFillProperties
*
* @author wenmo
* @since 2021/5/25
**/
@Setter
@Getter
@ConfigurationProperties(prefix = "dlink.mybatis-plus.fill")
public class MybatisPlusFillProperties {
private Boolean enabled = true;
private Boolean enableInsertFill = true;
private Boolean enableUpdateFill = true;
private String createTimeField = "createTime";
private String updateTimeField = "updateTime";
}
package com.dlink.db.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.dlink.common.result.ProTableResult;
import com.fasterxml.jackson.databind.JsonNode;
/**
* ISuperService
*
* @author wenmo
* @since 2021/5/25
**/
public interface ISuperService<T> extends IService<T> {
ProTableResult<T> selectForProTable(JsonNode para);
}
package com.dlink.db.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.dlink.common.result.ProTableResult;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.db.service.ISuperService;
import com.dlink.db.util.ProTableUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Map;
/**
* SuperServiceImpl
*
* @author wenmo
* @since 2021/5/25
**/
public class SuperServiceImpl<M extends SuperMapper<T>, T> extends ServiceImpl<M, T> implements ISuperService<T> {
@Override
public ProTableResult<T> selectForProTable(JsonNode para) {
Integer current = para.has("current") ? para.get("current").asInt() : 1;
Integer pageSize = para.has("pageSize") ? para.get("pageSize").asInt() : 10;
QueryWrapper<T> queryWrapper = new QueryWrapper<>();
ProTableUtil.autoQueryDefalut(para, queryWrapper);
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> param = mapper.convertValue(para, Map.class);
Page<T> page = new Page<>(current, pageSize);
List<T> list = baseMapper.selectForProTable(page, queryWrapper, param);
return ProTableResult.<T>builder().success(true).data(list).total(page.getTotal()).current(current).pageSize(pageSize).build();
}
}
package com.dlink.db.util;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.google.common.base.CaseFormat;
import java.util.*;
/**
* ProTableUtil
*
* @author wenmo
* @since 2021/5/25
**/
public class ProTableUtil {
/**
* @Author wenmo
* @Description 自动装载表格分页排序参数
* @Date 2021/5/18
* @Param [para, wrapper, camelToUnderscore, isDelete]
**/
public static void autoQuery(JsonNode para, QueryWrapper<?> wrapper, boolean camelToUnderscore, boolean isDelete) {
buildDelete(wrapper,camelToUnderscore,isDelete);
JsonNode sortField = para.get("sorter");
if(sortField!=null) {
Iterator<Map.Entry<String, JsonNode>> fields = sortField.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> entry = fields.next();
buildSort(entry.getKey(), entry.getValue().asText(), wrapper, camelToUnderscore);
}
}
JsonNode filter = para.get("filter");
if(filter!=null) {
Iterator<Map.Entry<String, JsonNode>> fields2 = filter.fields();
while (fields2.hasNext()) {
Map.Entry<String, JsonNode> entry = fields2.next();
buildFilter(entry.getKey(), entry.getValue(), wrapper, camelToUnderscore);
}
}
}
private static void buildDelete( QueryWrapper<?> wrapper, boolean camelToUnderscore, boolean isDelete){
if (isDelete) {
if (camelToUnderscore) {
wrapper.eq(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, "isDelete"), 0);
} else {
wrapper.eq("isDelete", 0);
}
}
}
private static void buildSort(String sortField,String sortValue,QueryWrapper<?> wrapper, boolean camelToUnderscore){
if (sortField != null && sortValue != null) {
if (camelToUnderscore) {
sortField = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, sortField);
}
if (sortValue.equals("descend")) {
if(!sortField.contains(".")) {
wrapper.orderByDesc("a." + sortField);
}
} else {
if(!sortField.contains(".")) {
wrapper.orderByAsc("a." + sortField);
}
}
}
}
private static void buildFilter(String searchField,JsonNode searchValue,QueryWrapper<?> wrapper, boolean camelToUnderscore){
if (searchField != null && !searchField.equals("") && searchValue != null) {
if (camelToUnderscore) {
searchField = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, searchField);
}
final String field = searchField;
List<String> searchValues = new ArrayList<>();
String type ="String";
if (searchValue.isArray()){
for (final JsonNode objNode : searchValue){
if(objNode.getNodeType()==JsonNodeType.NUMBER){
type ="Number";
}
searchValues.add(objNode.asText());
}
}
if(searchValues.size()>0) {
if ("Number".equals(type)) {
wrapper.and(qw -> {
for (int i = 0; i < searchValues.size(); i++) {
Double itemField = Double.parseDouble(searchValues.get(i));
if (i > 0) {
qw.or();
}
qw.eq("a." + field, itemField);
}
});
} else {
wrapper.and(qw -> {
for (int i = 0; i < searchValues.size(); i++) {
String itemField = searchValues.get(i);
if (i > 0) {
qw.or();
}
qw.eq("a." + field, itemField);
}
});
}
}
}
}
/**
* @return void
* @Author wenmo
* @Description 自动装载表单查询参数
* @Date 2021/5/18
* @Param [wrapper, para, blackarr, writearr, camelToUnderscore]
**/
public static void autoSetFromPara(QueryWrapper<?> wrapper, JsonNode para, String[] blackarr, String[] writearr, boolean camelToUnderscore) {
List<String> blacklist = Arrays.asList(blackarr);
List<String> writelist = Arrays.asList(writearr);
if (para.isObject())
{
Iterator<Map.Entry<String, JsonNode>> it = para.fields();
while (it.hasNext())
{
Map.Entry<String, JsonNode> entry = it.next();
String mapKey = entry.getKey();
if (blacklist.indexOf(mapKey) == -1 || writelist.indexOf(mapKey) > -1) {
if(entry.getValue().getNodeType()== JsonNodeType.NUMBER) {
Double mapValue = entry.getValue().asDouble();
if (mapValue != null) {
if (camelToUnderscore) {
wrapper.eq(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, mapKey), mapValue);
} else {
wrapper.eq(mapKey, mapValue);
}
}
}else{
String mapValue = entry.getValue().asText();
if (mapValue != null&&!"".equals(mapValue)) {
if (camelToUnderscore) {
wrapper.eq(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, mapKey), mapValue);
} else {
wrapper.eq(mapKey, mapValue);
}
}
}
}
}
}
}
/**
* @return void
* @Author wenmo
* @Description 默认表单黑白名单
* @Date 2021/5/18
* @Param [wrapper, para]
**/
public static void autoSetFromParaDefault(QueryWrapper<?> wrapper, JsonNode para) {
final String[] blackarr = {"current", "pageSize", "sorter", "filter"};
final String[] writearr = {};
autoSetFromPara(wrapper, para, blackarr, writearr, true);
}
/**
* @return void
* @Author wenmo
* @Description 默认表格参数
* @Date 2021/5/18
* @Param [para, wrapper]
**/
public static void autoQueryDefalut(JsonNode para, QueryWrapper<?> wrapper) {
autoQuery(para, wrapper, true, false);
}
/**
* @return void
* @Author wenmo
* @Description protable默认调用方法
* @Date 2021/5/18
* @Param [para, wrapper]
**/
public static void autoQueryAndSetFormParaDefalut(JsonNode para, QueryWrapper<?> wrapper) {
autoSetFromParaDefault(wrapper, para);
autoQueryDefalut(para, wrapper);
}
public static void autoQueryAndSetFormParaCustom(JsonNode para, QueryWrapper<?> wrapper) {
autoSetFromParaDefault(wrapper, para);
autoQuery(para, wrapper, true, false);
}
}
########################## 统一变量配置 ##########################
##### 数据库配置
datalink.datasource.ip=192.168.24.1
datalink.datasource.username=datalink
datalink.datasource.password=datalink
##### redis配置
spring.redis.host=192.168.123.156
spring.redis.port=6379
spring.redis.password=123456
spring.redis.timeout=5000
##### Flink 集群配置
datalink.flink.host=192.168.123.157
datalink.flink.port=8081
##### elasticsearch配置
#datalink.elasticsearch.uris=192.168.123.156:9200
#datalink.elasticsearch.username=elastic
#datalink.elasticsearch.password=qEnNfKNujqNrOPD9q5kb
##### sentinel配置
#datalink.sentinel.dashboard=192.168.123.156:6999
##### 日志链路追踪
#datalink.trace.enable=true
##### 负载均衡隔离(version隔离,只适用于开发环境)
datalink.ribbon.isolation.enabled=false
##### mybatis-plus打印完整sql(只适用于开发环境)
mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
\ No newline at end of file
########################## application级别通用配置 ##########################
##### ribbon配置
## 从注册中心刷新servelist的时间 默认30秒,单位ms
ribbon.ServerListRefreshInterval=15000
## 请求连接的超时时间 默认1秒,单位ms
ribbon.ConnectTimeout=30000
## 请求处理的超时时间 默认1秒,单位ms
ribbon.ReadTimeout=30000
## 对所有操作请求都进行重试,不配置这个MaxAutoRetries不起作用 默认false
#ribbon.OkToRetryOnAllOperations=true
## 对当前实例的重试次数 默认0
#ribbon.MaxAutoRetries=1
## 切换实例的重试次数 默认1
ribbon.MaxAutoRetriesNextServer=0
##### feign配置
feign.sentinel.enabled=true
feign.hystrix.enabled=false
feign.okhttp.enabled=true
feign.httpclient.enabled=false
feign.httpclient.max-connections=1000
feign.httpclient.max-connections-per-route=100
feign.client.config.feignName.connectTimeout=30000
feign.client.config.feignName.readTimeout=30000
## 开启Feign请求响应压缩
feign.compression.request.enabled=true
feign.compression.response.enabled=true
## 配置压缩文档类型及最小压缩的文档大小
feign.compression.request.mime-types=text/xml,application/xml,application/json
feign.compression.request.min-request-size=2048
##### sentinel配置
spring.cloud.sentinel.transport.dashboard=${datalink.sentinel.dashboard}
spring.cloud.sentinel.eager=true
##### druid配置
#连接池配置(通常来说,只需要修改initialSize、minIdle、maxActive
spring.datasource.druid.initial-size=10
spring.datasource.druid.max-active=500
spring.datasource.druid.min-idle=10
# 配置获取连接等待超时的时间
spring.datasource.druid.max-wait=60000
#打开PSCache,并且指定每个连接上PSCache的大小
spring.datasource.druid.pool-prepared-statements=true
spring.datasource.druid.max-pool-prepared-statement-per-connection-size=20
spring.datasource.druid.validation-query=SELECT 'x'
spring.datasource.druid.test-on-borrow=false
spring.datasource.druid.test-on-return=false
spring.datasource.druid.test-while-idle=true
#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
spring.datasource.druid.time-between-eviction-runs-millis=60000
#配置一个连接在池中最小生存的时间,单位是毫秒
spring.datasource.druid.min-evictable-idle-time-millis=300000
spring.datasource.druid.filters=stat,wall
# WebStatFilter配置,说明请参考Druid Wiki,配置_配置WebStatFilter
#是否启用StatFilter默认值true
spring.datasource.druid.web-stat-filter.enabled=true
spring.datasource.druid.web-stat-filter.url-pattern=/*
spring.datasource.druid.web-stat-filter.exclusions="*.js , *.gif ,*.jpg ,*.png ,*.css ,*.ico , /druid/*"
spring.datasource.druid.web-stat-filter.session-stat-max-count=1000
spring.datasource.druid.web-stat-filter.profile-enable=true
spring.datasource.druid.web-stat-filter.session-stat-enable=false
# StatViewServlet配置
#展示Druid的统计信息,StatViewServlet的用途包括:1.提供监控信息展示的html页面2.提供监控信息的JSON API
#是否启用StatViewServlet默认值true
spring.datasource.druid.stat-view-servlet.enabled=true
#根据配置中的url-pattern来访问内置监控页面,如果是上面的配置,内置监控页面的首页是/druid/index.html例如:http://127.0.0.1:9000/druid/index.html
spring.datasource.druid.stat-view-servlet.url-pattern=/druid/*
#允许清空统计数据
spring.datasource.druid.stat-view-servlet.reset-enable=true
spring.datasource.druid.stat-view-servlet.login-username=admin
spring.datasource.druid.stat-view-servlet.login-password=admin
##### mybatis-plus配置
#字段策略 IGNORED:"忽略判断",NOT_NULL:"非 NULL 判断"),NOT_EMPTY:"非空判断"
mybatis-plus.global-config.db-config.field-strategy=NOT_NULL
#逻辑删除配置
mybatis-plus.global-config.db-config.logic-delete-field=isDelete
mybatis-plus.global-config.db-config.logic-delete-value=1
mybatis-plus.global-config.db-config.logic-not-delete-value=0
# 原生配置
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.configuration.cache-enabled=false
##### redis配置
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
########################## bootstrap级别通用配置 ##########################
# 默认开发环境
spring.profiles.active=dev
##### nacos(注册中心和配置中心)地址
spring.cloud.nacos.server-addr=192.168.123.156:8848
#spring.cloud.nacos.username=nacos
#spring.cloud.nacos.password=nacos
spring.cloud.nacos.config.file-extension=yml
spring.cloud.nacos.config.shared-dataids=common.yml
spring.cloud.nacos.config.refreshable-dataids=common.yml
##### spring-boot-actuator配置
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
##### 允许bean覆盖
spring.main.allow-bean-definition-overriding=true
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>dlink-core</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.sql.gateway.version>1.12</flink.sql.gateway.version>
<junit.version>4.12</junit.version>
<hadoop.version>2.7.7</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.dlink.cluster;
import cn.hutool.http.HttpUtil;
import com.dlink.constant.FlinkConstant;
import com.dlink.constant.FlinkHistoryConstant;
import com.dlink.constant.NetConstant;
import java.util.HashMap;
import java.util.Map;
/**
* FlinkCluster
*
* @author wenmo
* @since 2021/5/25 15:08
**/
public class FlinkCluster {
private static String flinkJobMangerHost;
public static String getFlinkJobMangerHost() {
return flinkJobMangerHost;
}
public static void setFlinkJobMangerHost(String flinkJobMangerHost) {
FlinkCluster.flinkJobMangerHost = flinkJobMangerHost;
}
public static String getFlinkJobManagerIP(String flinkServers) {
String res = "";
String flinkAddress = "";
try {
flinkAddress = getFlinkJobMangerHost();
res = HttpUtil.get(NetConstant.HTTP + flinkAddress + NetConstant.COLON + NetConstant.PORT + NetConstant.SLASH + FlinkHistoryConstant.JOBS, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
return flinkAddress;
}
} catch (Exception e) {
}
String[] servers = flinkServers.split(",");
for (String server : servers) {
try {
String url = NetConstant.HTTP + server + NetConstant.COLON + NetConstant.PORT + NetConstant.SLASH + FlinkHistoryConstant.JOBS;
res = HttpUtil.get(url, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
if(server.equalsIgnoreCase(flinkAddress)){
setFlinkJobMangerHost(server);
}
return server;
}
} catch (Exception e) {
}
}
return "";
}
}
package com.dlink.constant;
/**
* FlinkConstant
*
* @author wenmo
* @since 2021/5/25 14:39
**/
public interface FlinkConstant {
/**
* flink端口
*/
Integer PORT = 8081;
/**
* flink会话默认个数
*/
Integer DEFAULT_SESSION_COUNT = 256;
/**
* flink加载因子
*/
Double DEFAULT_FACTOR = 0.75;
/**
* flink运行节点
*/
String FLINK_JOB_MANAGER_HOST = "flinkJobManagerHOST";
}
package com.dlink.constant;
public interface FlinkFunctionConstant {
/**
* TO_MAP 函数
*/
String TO_MAP = "TO_MAP";
/**
* GET_KEY 函数
*/
String GET_KEY = "GET_KEY";
}
package com.dlink.constant;
public interface FlinkHistoryConstant {
/**
* history端口
*/
String PORT = "8082";
/**
* 逗号,
*/
String COMMA = ",";
/**
* 任务复数 jobs
*/
String JOBS = "jobs";
/**
* 任务单数 job
*/
String JOB = "job";
/**
* 总览 overview
*/
String OVERVIEW = "overview";
/**
* 错误 error
*/
String ERROR = "error";
/**
* 起始时间 start-time
*/
String START_TIME = "start-time";
/**
* 任务名称 name
*/
String NAME = "name";
/**
* 任务状态 state
*/
String STATE = "state";
/**
* 异常 获取任务数据失败
*/
String EXCEPTION_DATA_NOT_FOUND = "获取任务数据失败";
/**
* 30天时间戳的大小
*/
Long THIRTY_DAY = 30L * 24 * 60 * 60 * 1000;
/**
* 一天时间戳
*/
Integer ONE_DAY = 24 * 60 * 60 * 1000;
/**
* 运行active
*/
String ACTIVE = "active";
/**
* 查询记录的条数
*/
String COUNT = "count";
/**
* 当前页码 page
*/
String PAGE = "page";
/**
* 每一页的大小 SIZE
*/
String SIZE = "size";
/**
* 当前页的条数 pageCount
*/
String PAGE_COUNT = "pageCount";
/**
* 返回数据集 resList
*/
String RES_LIST = "resList";
}
package com.dlink.constant;
/**
* flink任务的常量
*/
public interface FlinkJobConstant {
/**
* flink job id
*/
String FLINK_JOB_ID = "jobId";
/**
* flink job error
*/
String FLINK_JOB_ERROR = "error";
/**
* 默认空串
*/
String DEFAULT_EMPTY = "";
/**
* 默认端口
*/
int DEFAULT_PORT = 8081;
}
package com.dlink.constant;
/**
* FlinkSQLConstant
*
* @author wenmo
* @since 2021/5/25 15:51
**/
public interface FlinkSQLConstant {
/**
* 创建
*/
String CREATE = "CREATE";
/**
* 删除
*/
String DROP = "DROP";
/**
* 插入
*/
String INSERT = "INSERT";
/**
* 修改
*/
String ALTER = "ALTER";
/**
* 查询
*/
String SELECT = "SELECT";
/**
* show操作
*/
String SHOW = "SHOW";
/**
* 未知操作类型
*/
String UNKNOWN_TYPE = "UNKNOWN TYPE";
/**
* 查询时null对应的值
*/
String NULL_COLUMNS = "";
/**
* 创建聚合表 CREATEAGGTABLE
*/
String CREATE_AGG_TABLE = "CREATEAGGTABLE";
/**
* 删除表语句的头部 DROP TABLE IF EXISTS
*/
String DROP_TABLE_HEAD = " DROP TABLE IF EXISTS ";
}
package com.dlink.constant;
public interface NetConstant {
/**
* http://
*/
String HTTP = "http://";
/**
* 冒号:
*/
String COLON = ":";
/**
* 斜杠/
*/
String SLASH = "/";
/**
* Flink默认端口
*/
String PORT = "8081";
/**
* 连接运行服务器超时时间 1000
*/
Integer SERVER_TIME_OUT_ACTIVE = 1000;
/**
* 连接FLINK历史服务器超时时间 2000
*/
Integer SERVER_TIME_OUT_HISTORY = 3000;
}
package com.dlink.executor;
/**
* EnvironmentSetting
*
* @author wenmo
* @since 2021/5/25 13:45
**/
public class EnvironmentSetting {
private String host;
private int port;
public EnvironmentSetting(String host, int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
/**
* Executor
* @author wenmo
* @since 2021/5/25 13:39
**/
public abstract class Executor {
public static final String LOCAL = "LOCAL";
public static final String REMOTE = "REMOTE";
public static Executor build(){
return new LocalStreamExecutor(new ExecutorSetting(LOCAL));
}
public static Executor build(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
if(LOCAL.equals(executorSetting.getType())){
return new LocalStreamExecutor(executorSetting);
}else if(REMOTE.equals(executorSetting.getType())){
return new RemoteStreamExecutor(environmentSetting,executorSetting);
}else{
return new LocalStreamExecutor(executorSetting);
}
}
public abstract StreamExecutionEnvironment getEnvironment();
public abstract CustomTableEnvironmentImpl getCustomTableEnvironmentImpl();
public abstract ExecutorSetting getExecutorSetting();
public abstract EnvironmentSetting getEnvironmentSetting();
public abstract JobExecutionResult execute(String statement) throws Exception;
public abstract TableResult executeSql(String statement);
public abstract Table sqlQuery(String statement);
public abstract String explainSql(String statement, ExplainDetail... extraDetails);
public abstract SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails);
public abstract String getStreamGraphString(String statement);
public abstract ObjectNode getStreamGraph(String statement);
public abstract void registerFunction(String name, ScalarFunction function);
public abstract void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2);
}
package com.dlink.executor;
/**
* ExecutorSetting
*
* @author wenmo
* @since 2021/5/25 13:43
**/
public class ExecutorSetting {
private String type = Executor.LOCAL;
private Long checkpoint;
private boolean useSqlFragment = true;
public ExecutorSetting(String type) {
this.type = type;
}
public ExecutorSetting(String type, Long checkpoint) {
this.type = type;
this.checkpoint = checkpoint;
}
public ExecutorSetting(String type, Long checkpoint, boolean useSqlFragment) {
this.type = type;
this.checkpoint = checkpoint;
this.useSqlFragment = useSqlFragment;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Long getCheckpoint() {
return checkpoint;
}
public void setCheckpoint(Long checkpoint) {
this.checkpoint = checkpoint;
}
public boolean isUseSqlFragment() {
return useSqlFragment;
}
public void setUseSqlFragment(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment;
}
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
/**
* LocalStreamExecuter
*
* @author wenmo
* @since 2021/5/25 13:48
**/
public class LocalStreamExecutor extends Executor {
private StreamExecutionEnvironment environment;
private CustomTableEnvironmentImpl stEnvironment;
private ExecutorSetting executorSetting;
public LocalStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
stEnvironment = CustomTableEnvironmentImpl.create(environment);
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
}
@Override
public StreamExecutionEnvironment getEnvironment() {
return this.environment;
}
@Override
public CustomTableEnvironmentImpl getCustomTableEnvironmentImpl() {
return this.stEnvironment;
}
@Override
public ExecutorSetting getExecutorSetting() {
return this.executorSetting;
}
@Override
public EnvironmentSetting getEnvironmentSetting() {
return null;
}
@Override
public JobExecutionResult execute(String statement) throws Exception {
return stEnvironment.execute(statement);
}
@Override
public TableResult executeSql(String statement) {
return stEnvironment.executeSql(statement);
}
@Override
public Table sqlQuery(String statement) {
return stEnvironment.sqlQuery(statement);
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
return stEnvironment.explainSql(statement,extraDetails);
}
@Override
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
return stEnvironment.explainSqlRecord(statement,extraDetails);
}
@Override
public String getStreamGraphString(String statement) {
return stEnvironment.getStreamGraphString(statement);
}
@Override
public ObjectNode getStreamGraph(String statement) {
return stEnvironment.getStreamGraph(statement);
}
@Override
public void registerFunction(String name, ScalarFunction function) {
stEnvironment.registerFunction(name,function);
}
@Override
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2) {
stEnvironment.createTemporarySystemFunction(name,var2);
}
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
/**
* RemoteStreamExecutor
*
* @author wenmo
* @since 2021/5/25 14:05
**/
public class RemoteStreamExecutor extends Executor {
private StreamExecutionEnvironment environment;
private CustomTableEnvironmentImpl stEnvironment;
private EnvironmentSetting environmentSetting;
private ExecutorSetting executorSetting;
public RemoteStreamExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting;
synchronized (RemoteStreamExecutor.class){
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
if(stEnvironment == null){
stEnvironment = CustomTableEnvironmentImpl.create(environment);
}
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
stEnvironment.unUseSqlFragment();
}
}
}
@Override
public StreamExecutionEnvironment getEnvironment() {
return this.environment;
}
@Override
public CustomTableEnvironmentImpl getCustomTableEnvironmentImpl() {
return this.stEnvironment;
}
@Override
public ExecutorSetting getExecutorSetting() {
return this.executorSetting;
}
@Override
public EnvironmentSetting getEnvironmentSetting() {
return this.environmentSetting;
}
@Override
public JobExecutionResult execute(String statement) throws Exception {
return stEnvironment.execute(statement);
}
@Override
public TableResult executeSql(String statement){
return stEnvironment.executeSql(statement);
}
@Override
public Table sqlQuery(String statement){
return stEnvironment.sqlQuery(statement);
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
return stEnvironment.explainSql(statement,extraDetails);
}
@Override
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
return stEnvironment.explainSqlRecord(statement,extraDetails);
}
@Override
public String getStreamGraphString(String statement) {
return stEnvironment.getStreamGraphString(statement);
}
@Override
public ObjectNode getStreamGraph(String statement) {
return stEnvironment.getStreamGraph(statement);
}
@Override
public void registerFunction(String name, ScalarFunction function) {
stEnvironment.registerFunction(name,function);
}
@Override
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2) {
stEnvironment.createTemporarySystemFunction(name,var2);
}
}
package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 定制TableEnvironmentImpl
*
* @author wenmo
* @since 2021/5/25
**/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
private SqlManager sqlManager;
private boolean useSqlFragment = true;
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, SqlManager sqlManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
this.sqlManager = sqlManager;
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
}
static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return create(executionEnvironment, settings, new TableConfig());
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {
if (!settings.isStreamingMode()) {
throw new TableException("StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
} else {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
SqlManager sqlManager = new SqlManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ((PlannerFactory) ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new CustomTableEnvironmentImpl(catalogManager, sqlManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader);
}
}
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory = (ExecutorFactory) ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
} catch (Exception var4) {
throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", var4);
}
}
public void useSqlFragment() {
this.useSqlFragment = true;
}
public void unUseSqlFragment() {
this.useSqlFragment = false;
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
} else {
return super.explainSql(statement, extraDetails);
}
}
public String getStreamGraphString(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return "This is a sql fragment.";
}
}
if (checkShowFragments(statement)) {
return "'SHOW FRAGMENTS' can't be explained.";
}
List<Operation> operations = super.parser.parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
return ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans).getStreamingPlanAsJSON();
}else{
return "Unsupported SQL query! explainSql() need a single SQL to query.";
}
}
}
public ObjectNode getStreamGraph(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be explained.");
}
List<Operation> operations = super.parser.parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
} else {
List<ModifyOperation> modifyOperations = new ArrayList<>();
for (int i = 0; i < operations.size(); i++) {
if(operations.get(i) instanceof ModifyOperation){
modifyOperations.add((ModifyOperation)operations.get(i));
}
}
List<Transformation<?>> trans = super.planner.translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
ObjectNode jsonNode = jsonGenerator.getJSONNode();
return jsonNode;
}else{
throw new TableException("Unsupported SQL query! explainSql() need a single SQL to query.");
}
}
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
String orignSql = statement;
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
record.setParseTrue(true);
record.setType("Sql Fragment");
record.setExplain(orignSql);
record.setExplainTrue(true);
return record;
}
}
List<Operation> operations = parser.parse(statement);
record.setParseTrue(true);
if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! explainSql() only accepts a single SQL query.");
}
List<Operation> operationlist = new ArrayList<>(operations);
for (int i = 0; i < operationlist.size(); i++) {
Operation operation = operationlist.get(i);
if (operation instanceof ModifyOperation) {
record.setType("Modify DML");
} else if (operation instanceof ExplainOperation) {
record.setType("Explain DML");
} else if (operation instanceof QueryOperation) {
record.setType("Query DML");
} else {
operationlist.remove(i);
record.setType("DDL");
i=i-1;
}
}
if(operationlist.size()==0){
//record.setExplain("DDL语句不进行解释。");
return record;
}
record.setExplain(planner.explain(operationlist, extraDetails));
record.setExplainTrue(true);
return record;
}
@Override
public String[] getCompletionHints(String statement, int position) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return new String[0];
}
}
return super.getCompletionHints(statement, position);
}
@Override
public Table sqlQuery(String query) {
if(useSqlFragment) {
query = sqlManager.parseVariable(query);
if (query.length() == 0) {
throw new TableException("Unsupported SQL query! The SQL query parsed is null.If it's a sql fragment, and please use executeSql().");
}
if (checkShowFragments(query)) {
return sqlManager.getSqlFragmentsTable(this);
} else {
return super.sqlQuery(query);
}
}else {
return super.sqlQuery(query);
}
}
@Override
public TableResult executeSql(String statement) {
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
return CustomTableResultImpl.TABLE_RESULT_OK;
}
if (checkShowFragments(statement)) {
return sqlManager.getSqlFragments();
} else {
return super.executeSql(statement);
}
}else{
return super.executeSql(statement);
}
}
@Override
public void sqlUpdate(String stmt) {
if(useSqlFragment) {
stmt = sqlManager.parseVariable(stmt);
if (stmt.length() == 0) {
throw new TableException("Unsupported SQL update! The SQL update parsed is null.If it's a sql fragment, and please use executeSql().");
}
}
super.sqlUpdate(stmt);
}
public boolean checkShowFragments(String sql){
return sqlManager.checkShowFragments(sql);
}
}
package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.*;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
/**
* 定制TableResultImpl
* @author wenmo
* @since 2021/5/25
**/
@Internal
class CustomTableResultImpl implements TableResult {
public static final TableResult TABLE_RESULT_OK;
private final JobClient jobClient;
private final TableSchema tableSchema;
private final ResultKind resultKind;
private final CloseableRowIteratorWrapper data;
private final PrintStyle printStyle;
private CustomTableResultImpl(@Nullable JobClient jobClient, TableSchema tableSchema, ResultKind resultKind, CloseableIterator<Row> data, PrintStyle printStyle) {
this.jobClient = jobClient;
this.tableSchema = (TableSchema) Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.resultKind = (ResultKind)Preconditions.checkNotNull(resultKind, "resultKind should not be null");
Preconditions.checkNotNull(data, "data should not be null");
this.data = new CloseableRowIteratorWrapper(data);
this.printStyle = (PrintStyle)Preconditions.checkNotNull(printStyle, "printStyle should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields,List<Row> rows){
Builder builder = builder().resultKind(ResultKind.SUCCESS);
if(fields.size()>0) {
TableSchema.Builder tableSchemaBuild = TableSchema.builder();
for (int i = 0; i < fields.size(); i++) {
tableSchemaBuild.field(fields.get(i).getName(),fields.get(i).getType());
}
builder.tableSchema(tableSchemaBuild.build()).data(rows);
}
return builder.build();
}
public Optional<JobClient> getJobClient() {
return Optional.ofNullable(this.jobClient);
}
public void await() throws InterruptedException, ExecutionException {
try {
this.awaitInternal(-1L, TimeUnit.MILLISECONDS);
} catch (TimeoutException var2) {
;
}
}
public void await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
this.awaitInternal(timeout, unit);
}
private void awaitInternal(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (this.jobClient != null) {
ExecutorService executor = Executors.newFixedThreadPool(1, (r) -> {
return new Thread(r, "TableResult-await-thread");
});
try {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
while(!this.data.isFirstRowReady()) {
try {
Thread.sleep(100L);
} catch (InterruptedException var2) {
throw new TableException("Thread is interrupted");
}
}
}, executor);
if (timeout >= 0L) {
future.get(timeout, unit);
} else {
future.get();
}
} finally {
executor.shutdown();
}
}
}
public TableSchema getTableSchema() {
return this.tableSchema;
}
public ResultKind getResultKind() {
return this.resultKind;
}
public CloseableIterator<Row> collect() {
return this.data;
}
public void print() {
Iterator<Row> it = this.collect();
if (this.printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle)this.printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle)this.printStyle).getNullColumn();
boolean deriveColumnWidthByType = ((TableauStyle)this.printStyle).isDeriveColumnWidthByType();
boolean printRowKind = ((TableauStyle)this.printStyle).isPrintRowKind();
PrintUtils.printAsTableauForm(this.getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
} else {
if (!(this.printStyle instanceof RawContentStyle)) {
throw new TableException("Unsupported print style: " + this.printStyle);
}
while(it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString((Row)it.next())));
}
}
}
public static Builder builder() {
return new Builder();
}
static {
TABLE_RESULT_OK = builder().resultKind(ResultKind.SUCCESS).tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()).data(Collections.singletonList(Row.of(new Object[]{"OK"}))).build();
}
private static final class CloseableRowIteratorWrapper implements CloseableIterator<Row> {
private final CloseableIterator<Row> iterator;
private boolean isFirstRowReady;
private CloseableRowIteratorWrapper(CloseableIterator<Row> iterator) {
this.isFirstRowReady = false;
this.iterator = iterator;
}
public void close() throws Exception {
this.iterator.close();
}
public boolean hasNext() {
boolean hasNext = this.iterator.hasNext();
this.isFirstRowReady = this.isFirstRowReady || hasNext;
return hasNext;
}
public Row next() {
Row next = (Row)this.iterator.next();
this.isFirstRowReady = true;
return next;
}
public boolean isFirstRowReady() {
return this.isFirstRowReady || this.hasNext();
}
}
private static final class RawContentStyle implements PrintStyle {
private RawContentStyle() {
}
}
private static final class TableauStyle implements PrintStyle {
private final boolean deriveColumnWidthByType;
private final int maxColumnWidth;
private final String nullColumn;
private final boolean printRowKind;
private TableauStyle(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
this.printRowKind = printRowKind;
}
public boolean isDeriveColumnWidthByType() {
return this.deriveColumnWidthByType;
}
int getMaxColumnWidth() {
return this.maxColumnWidth;
}
String getNullColumn() {
return this.nullColumn;
}
public boolean isPrintRowKind() {
return this.printRowKind;
}
}
public interface PrintStyle {
static PrintStyle tableau(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
}
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
public static class Builder {
private JobClient jobClient;
private TableSchema tableSchema;
private ResultKind resultKind;
private CloseableIterator<Row> data;
private PrintStyle printStyle;
private Builder() {
this.jobClient = null;
this.tableSchema = null;
this.resultKind = null;
this.data = null;
this.printStyle = PrintStyle.tableau(2147483647, "(NULL)", false, false);
}
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
public Builder tableSchema(TableSchema tableSchema) {
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.tableSchema = tableSchema;
return this;
}
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
}
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
}
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
public TableResult build() {
return new CustomTableResultImpl(this.jobClient, this.tableSchema, this.resultKind, this.data, this.printStyle);
}
}
}
package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/5/25
**/
@Internal
public final class SqlManager {
private Map<String, String> sqlFragments;
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
sqlFragments = new HashMap<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public List<String> listSqlFragments() {
return new ArrayList<>(sqlFragments.keySet());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public Map<String, String> getSqlFragment() {
return sqlFragments;
}
public TableResult getSqlFragments() {
List<Row> rows = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows);
}
public Iterator getSqlFragmentsIterator() {
return sqlFragments.entrySet().iterator();
}
public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
}
return environment.fromValues(keys);
}
public boolean checkShowFragments(String sql){
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) {
return statement;
}
String[] strs = statement.split(";");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim();
if (str.length() == 0) {
continue;
}
if (str.contains(":=")) {
String[] strs2 = str.split(":=");
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(":=") + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
}
} else {
sb.append(replaceVariable(str));
}
}
return sb.toString();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
}
package com.dlink.executor.custom;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/5/11 14:04
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
package com.dlink.job;
import com.dlink.constant.FlinkConstant;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.EnvironmentSetting;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.result.*;
import com.dlink.session.ExecutorEntity;
import com.dlink.session.SessionPool;
import com.dlink.trans.Operations;
import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.TableResult;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* JobManager
*
* @author wenmo
* @since 2021/5/25 15:27
**/
public class JobManager {
private String flinkHost;
private Integer port;
private String sessionId;
private Integer maxRowNum = 100;
public JobManager(String flinkHost, Integer port) {
this.flinkHost = flinkHost;
this.port = port;
}
public JobManager(String flinkHost, Integer port, String sessionId, Integer maxRowNum) {
this.flinkHost = flinkHost;
this.sessionId = sessionId;
this.maxRowNum = maxRowNum;
this.port = port;
}
public RunResult execute(String statement) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost);
Executor executor = null;
ExecutorEntity executorEntity = SessionPool.get(sessionId);
if (executorEntity != null) {
executor = executorEntity.getExecutor();
} else {
executor = Executor.build(new EnvironmentSetting(flinkHost, FlinkConstant.PORT), new ExecutorSetting(Executor.REMOTE));
SessionPool.push(new ExecutorEntity(sessionId, executor));
}
String[] Statements = statement.split(";");
int currentIndex = 0;
//当前只支持对 show select的操作的结果的数据查询 后期需要可添加
try {
for (String item : Statements) {
currentIndex++;
if (item.trim().isEmpty()) {
continue;
}
String operationType = Operations.getOperationType(item);
long start = System.currentTimeMillis();
TableResult tableResult = executor.executeSql(item);
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult);
runResult.setResult(result);
runResult.setTime(timeElapsed);
runResult.setFinishDate(LocalDate.now());
}
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getStackTrace();
StringBuffer resMsg = new StringBuffer("");
for (StackTraceElement s : trace) {
resMsg.append(" </br> " + s + " ");
}
runResult.setError(LocalDate.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "</br> >>>堆栈信息<<<" + resMsg.toString());
return runResult;
}
return runResult;
}
public SubmitResult submit(List<String> sqlList, ExecutorSetting executerSetting) {
SubmitResult result = new SubmitResult(sessionId,sqlList,flinkHost);
Map<String, String> map = new HashMap<>();
int currentIndex = 0;
try {
if (sqlList != null && sqlList.size() > 0) {
Executor executor = Executor.build(new EnvironmentSetting(flinkHost, port), executerSetting);
for (String sqlText : sqlList) {
currentIndex++;
String operationType = Operations.getOperationType(sqlText);
if (operationType.equalsIgnoreCase(FlinkSQLConstant.INSERT)) {
long start = System.currentTimeMillis();
TableResult tableResult = executor.executeSql(sqlText);
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
JobID jobID = tableResult.getJobClient().get().getJobID();
result.setSuccess(true);
result.setTime(timeElapsed);
result.setFinishDate(LocalDate.now());
InsertResult insertResult = new InsertResult(sqlText,(jobID == null ? "" : jobID.toHexString()),true,timeElapsed,LocalDate.now());
result.setResult(insertResult);
} else {
executor.executeSql(sqlText);
}
}
} else {
result.setSuccess(false);
result.setMsg(LocalDate.now().toString()+":执行sql语句为空。");
return result;
}
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getStackTrace();
StringBuilder resMsg = new StringBuilder();
for (StackTraceElement s : trace) {
resMsg.append(" </br> " + s + " ");
}
result.setError(LocalDate.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "</br> >>>堆栈信息<<<" + resMsg.toString());
return result;
}
result.setSuccess(true);
result.setMsg(LocalDate.now().toString() + ":任务提交成功!");
return result;
}
}
package com.dlink.result;
/**
* AbstractBuilder
*
* @author wenmo
* @since 2021/5/25 16:11
**/
public class AbstractBuilder {
protected String operationType;
protected Integer maxRowNum;
protected boolean printRowKind;
protected String nullColumn;
}
package com.dlink.result;
/**
* IResult
*
* @author wenmo
* @since 2021/5/25 16:22
**/
public interface IResult {
}
package com.dlink.result;
import java.time.LocalDate;
/**
* InsertResult
*
* @author wenmo
* @since 2021/5/25 19:08
**/
public class InsertResult implements IResult {
private String statement;
private String jobID;
private boolean success;
private long time;
private LocalDate finishDate;
public InsertResult(String statement, String jobID, boolean success, long time, LocalDate finishDate) {
this.statement = statement;
this.jobID = jobID;
this.success = success;
this.time = time;
this.finishDate = finishDate;
}
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public String getJobID() {
return jobID;
}
public void setJobID(String jobID) {
this.jobID = jobID;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public LocalDate getFinishDate() {
return finishDate;
}
public void setFinishDate(LocalDate finishDate) {
this.finishDate = finishDate;
}
}
package com.dlink.result;
/**
* JobSubmitRecord
*
* @author wenmo
* @since 2021/5/25 15:32
**/
public class JobSubmitResult {
}
package com.dlink.result;
import org.apache.flink.table.api.TableResult;
/**
* ResultBuilder
*
* @author wenmo
* @since 2021/5/25 15:59
**/
public interface ResultBuilder {
static ResultBuilder build(String operationType, Integer maxRowNum, String nullColumn, boolean printRowKind){
switch (operationType.toUpperCase()){
case SelectBuilder.OPERATION_TYPE:
return new SelectBuilder(operationType,maxRowNum,nullColumn,printRowKind);
default:
return new SelectBuilder(operationType,maxRowNum,nullColumn,printRowKind);
}
}
IResult getResult(TableResult tableResult);
}
package com.dlink.result;
import java.time.LocalDate;
/**
* RunResult
*
* @author wenmo
* @since 2021/5/25 16:46
**/
public class RunResult {
private String sessionId;
private String statement;
private String flinkHost;
private boolean success;
private long time;
private LocalDate finishDate;
private String msg;
private String error;
private IResult result;
public RunResult() {
}
public RunResult(String sessionId, String statement, String flinkHost) {
this.sessionId = sessionId;
this.statement = statement;
this.flinkHost = flinkHost;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public IResult getResult() {
return result;
}
public void setResult(IResult result) {
this.result = result;
}
public String getFlinkHost() {
return flinkHost;
}
public void setFlinkHost(String flinkHost) {
this.flinkHost = flinkHost;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public LocalDate getFinishDate() {
return finishDate;
}
public void setFinishDate(LocalDate finishDate) {
this.finishDate = finishDate;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
package com.dlink.result;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.*;
import java.util.stream.Stream;
/**
* SelectBuilder
*
* @author wenmo
* @since 2021/5/25 16:03
**/
public class SelectBuilder extends AbstractBuilder implements ResultBuilder {
public static final String OPERATION_TYPE = "SELECT";
public SelectBuilder(String operationType, Integer maxRowNum,String nullColumn,boolean printRowKind) {
this.operationType = operationType;
this.maxRowNum = maxRowNum;
this.printRowKind = printRowKind;
this.nullColumn = nullColumn;
}
@Override
public IResult getResult(TableResult tableResult) {
List<TableColumn> columns = tableResult.getTableSchema().getTableColumns();
int totalCount = 0;
Set<String> column = new LinkedHashSet();
String[] columnNames = (String[]) columns.stream().map(TableColumn::getName).map(s -> s.replace(" ","")).toArray((x$0) -> {
return (new String[x$0]);
});
if (printRowKind) {
columnNames = (String[]) Stream.concat(Stream.of("op"), Arrays.stream(columnNames)).toArray((x$0) -> {
return new String[x$0];
});
}
long numRows;
List<Map<String,Object>> rows = new ArrayList<>();
Iterator<Row> it = tableResult.collect();
for (numRows = 0L; it.hasNext() ; ++numRows) {
if (numRows < maxRowNum) {
String[] cols = rowToString((Row) it.next());
Map<String,Object> row = new HashMap<>();
for (int i = 0; i < cols.length; i++) {
if (i > columnNames.length) {
column.add("UKN" + i);
row.put("UKN" + i, cols[i]);
} else {
column.add(columnNames[i]);
row.put(columnNames[i], cols[i]);
}
}
rows.add(row);
}else {
it.next();
}
totalCount++;
}
return new SelectResult(rows,totalCount,rows.size(),column);
}
public String[] rowToString(Row row) {
int len = printRowKind ? row.getArity() + 1 : row.getArity();
List<String> fields = new ArrayList(len);
if (printRowKind) {
fields.add(row.getKind().shortString());
}
for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i);
if (field == null) {
fields.add(nullColumn);
} else {
fields.add(StringUtils.arrayAwareToString(field));
}
}
return (String[]) fields.toArray(new String[0]);
}
}
package com.dlink.result;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* SelectResult
*
* @author wenmo
* @since 2021/5/25 16:01
**/
public class SelectResult implements IResult{
private List<Map<String,Object>> rowData;
private Integer total;
private Integer currentCount;
private Set<String> columns;
public SelectResult(List<Map<String, Object>> rowData, Integer total, Integer currentCount, Set<String> columns) {
this.rowData = rowData;
this.total = total;
this.currentCount = currentCount;
this.columns = columns;
}
}
package com.dlink.result;
import java.util.Date;
/**
* 解释结果
*
* @author wenmo
* @since 2021/5/25 11:41
**/
public class SqlExplainResult {
private Integer index;
private String type;
private String sql;
private String parse;
private String explain;
private String error;
private boolean parseTrue;
private boolean explainTrue;
private Date explainTime;
public Integer getIndex() {
return index;
}
public void setIndex(Integer index) {
this.index = index;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public String getParse() {
return parse;
}
public void setParse(String parse) {
this.parse = parse;
}
public String getExplain() {
return explain;
}
public void setExplain(String explain) {
this.explain = explain;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public boolean isParseTrue() {
return parseTrue;
}
public void setParseTrue(boolean parseTrue) {
this.parseTrue = parseTrue;
}
public boolean isExplainTrue() {
return explainTrue;
}
public void setExplainTrue(boolean explainTrue) {
this.explainTrue = explainTrue;
}
public Date getExplainTime() {
return explainTime;
}
public void setExplainTime(Date explainTime) {
this.explainTime = explainTime;
}
@Override
public String toString() {
return "SqlExplainRecord{" +
"index=" + index +
", type='" + type + '\'' +
", sql='" + sql + '\'' +
", parse='" + parse + '\'' +
", explain='" + explain + '\'' +
", error='" + error + '\'' +
", parseTrue=" + parseTrue +
", explainTrue=" + explainTrue +
", explainTime=" + explainTime +
'}';
}
}
package com.dlink.result;
import java.time.LocalDate;
import java.util.List;
/**
* SubmitResult
*
* @author wenmo
* @since 2021/5/25 19:04
**/
public class SubmitResult {
private String sessionId;
private List<String> statements;
private String flinkHost;
private boolean success;
private long time;
private LocalDate finishDate;
private String msg;
private String error;
private IResult result;
public SubmitResult() {
}
public SubmitResult(String sessionId, List<String> statements, String flinkHost) {
this.sessionId = sessionId;
this.statements = statements;
this.flinkHost = flinkHost;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
public List<String> getStatements() {
return statements;
}
public void setStatements(List<String> statements) {
this.statements = statements;
}
public String getFlinkHost() {
return flinkHost;
}
public void setFlinkHost(String flinkHost) {
this.flinkHost = flinkHost;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public LocalDate getFinishDate() {
return finishDate;
}
public void setFinishDate(LocalDate finishDate) {
this.finishDate = finishDate;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public IResult getResult() {
return result;
}
public void setResult(IResult result) {
this.result = result;
}
}
package com.dlink.session;
import com.dlink.executor.Executor;
/**
* FlinkEntity
*
* @author wenmo
* @since 2021/5/25 14:45
**/
public class ExecutorEntity {
private String sessionId;
private Executor executor;
public ExecutorEntity(String sessionId, Executor executor) {
this.sessionId = sessionId;
this.executor = executor;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
public Executor getExecutor() {
return executor;
}
public void setExecutor(Executor executor) {
this.executor = executor;
}
}
package com.dlink.session;
import com.dlink.constant.FlinkConstant;
import java.util.List;
import java.util.Vector;
/**
* SessionPool
*
* @author wenmo
* @since 2021/5/25 14:32
**/
public class SessionPool {
private static volatile List<ExecutorEntity> executorList = new Vector<>(FlinkConstant.DEFAULT_SESSION_COUNT);
public static Integer push(ExecutorEntity executorEntity){
if (executorList.size() >= FlinkConstant.DEFAULT_SESSION_COUNT * FlinkConstant.DEFAULT_FACTOR) {
executorList.remove(0);
}else if(executorList.size() >= FlinkConstant.DEFAULT_SESSION_COUNT){
executorList.clear();
}
executorList.add(executorEntity);
return executorList.size();
}
public static Integer remove(String sessionId) {
int count = executorList.size();
for (int i = 0; i < executorList.size(); i++) {
if (sessionId.equals(executorList.get(i).getSessionId())) {
executorList.remove(i);
break;
}
}
return count - executorList.size();
}
public static ExecutorEntity get(String sessionId) {
for (ExecutorEntity executorEntity : executorList) {
if (executorEntity.getSessionId().equals(sessionId)) {
return executorEntity;
}
}
return null;
}
}
package com.dlink.trans;
import com.dlink.constant.FlinkSQLConstant;
/**
* SqlUtil
*
* @author wenmo
* @since 2021/5/25 15:50
**/
public class Operations {
/**
* 获取操作类型
*
* @param sql
* @return
*/
public static String getOperationType(String sql) {
String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").toUpperCase();
if (sqlTrim.startsWith(FlinkSQLConstant.CREATE)) {
return FlinkSQLConstant.CREATE;
}
if (sqlTrim.startsWith(FlinkSQLConstant.ALTER)) {
return FlinkSQLConstant.ALTER;
}
if (sqlTrim.startsWith(FlinkSQLConstant.INSERT)) {
return FlinkSQLConstant.INSERT;
}
if (sqlTrim.startsWith(FlinkSQLConstant.DROP)) {
return FlinkSQLConstant.INSERT;
}
if (sqlTrim.startsWith(FlinkSQLConstant.SELECT)) {
return FlinkSQLConstant.SELECT;
}
if (sqlTrim.startsWith(FlinkSQLConstant.SHOW)) {
return FlinkSQLConstant.SHOW;
}
return FlinkSQLConstant.UNKNOWN_TYPE;
}
}
package com.dlink.ud.udf;
import org.apache.flink.table.functions.ScalarFunction;
/**
* GetKey
*
* @author wenmo
* @since 2021/5/25 15:50
**/
public class GetKey extends ScalarFunction {
public String eval(String map, String key, String defaultValue) {
if (map == null || !map.contains(key)) {
return defaultValue;
}
String[] maps = map.replaceAll("\\{", "").replaceAll("\\}", "").split(",");
for (int i = 0; i < maps.length; i++) {
String[] items = maps[i].split("=");
if (items.length >= 2) {
if (key.equals(items[0].trim())) {
return items[1];
}
}
}
return defaultValue;
}
}
\ No newline at end of file
package com.dlink.ud.udtaf;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* RowsToMap
*
* @author wenmo
* @since 2021/5/25 15:50
**/
public class RowsToMap extends TableAggregateFunction<String, Map> {
@Override
public Map createAccumulator() {
return new HashMap();
}
public void accumulate(Map acc, String cls, Object v, String key) {
String[] keys = key.split(",");
for (int i = 0; i < keys.length; i++) {
if (keys[i].equals(cls)) {
acc.put(cls, v);
}
}
}
public void accumulate(Map acc, String cls, Object v) {
acc.put(cls, v);
}
public void merge(Map acc, Iterable<Map> iterable) {
for (Map otherAcc : iterable) {
Iterator iter = otherAcc.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
accumulate(acc, entry.getKey().toString(), entry.getValue());
}
}
}
public void emitValue(Map acc, Collector<String> out) {
out.collect(acc.toString());
}
}
\ No newline at end of file
package com.dlink.utils;
import com.dlink.constant.FlinkHistoryConstant;
import java.util.Date;
import java.util.TimeZone;
public class DateFormatUtil {
/**
* 获取一个日期的0:00:00 时间戳 日期必须大于00:00:00否则返回上一天
*
* @param date
* @return
*/
public static long getZeroTimeStamp(Date date) {
return getZeroTimeStamp(date.getTime());
}
public static long getZeroTimeStamp(Long timestamp) {
timestamp += TimeZone.getDefault().getRawOffset();
return timestamp / FlinkHistoryConstant.ONE_DAY * FlinkHistoryConstant.ONE_DAY - TimeZone.getDefault().getRawOffset();
}
/**
* 获取指定时间 当天的最后一秒 23:59:59 日期必须大于00:00:00 否则返回上一天
* @param date
* @return
*/
public static long getLastTimeStampOfOneday(Date date) {
return getLastTimeStampOfOneday(date.getTime());
}
public static long getLastTimeStampOfOneday(Long timestamp) {
timestamp += TimeZone.getDefault().getRawOffset();
return ( timestamp / FlinkHistoryConstant.ONE_DAY * FlinkHistoryConstant.ONE_DAY + FlinkHistoryConstant.ONE_DAY - 100)- TimeZone.getDefault().getRawOffset();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.*;
/** Helper class for generating a JSON representation from a {@link StreamGraph}. */
@Internal
public class JSONGenerator {
public static final String STEPS = "step_function";
public static final String ID = "id";
public static final String SIDE = "side";
public static final String SHIP_STRATEGY = "ship_strategy";
public static final String PREDECESSORS = "predecessors";
public static final String TYPE = "type";
public static final String PACT = "pact";
public static final String CONTENTS = "contents";
public static final String PARALLELISM = "parallelism";
private StreamGraph streamGraph;
private final ObjectMapper mapper = new ObjectMapper();
public JSONGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
}
public String getJSON() {
return getJSONNode().toPrettyString();
}
public ObjectNode getJSONNode() {
ObjectNode json = mapper.createObjectNode();
ArrayNode nodes = mapper.createArrayNode();
json.put("nodes", nodes);
List<Integer> operatorIDs = new ArrayList<>(streamGraph.getVertexIDs());
Comparator<Integer> operatorIDComparator =
Comparator.comparingInt(
(Integer id) -> streamGraph.getSinkIDs().contains(id) ? 1 : 0)
.thenComparingInt(id -> id);
operatorIDs.sort(operatorIDComparator);
visit(nodes, operatorIDs, new HashMap<>());
return json;
}
private void visit(
ArrayNode jsonArray, List<Integer> toVisit, Map<Integer, Integer> edgeRemapings) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
ObjectNode node = mapper.createObjectNode();
decorateNode(vertexID, node);
if (!streamGraph.getSourceIDs().contains(vertexID)) {
ArrayNode inputs = mapper.createArrayNode();
node.put(PREDECESSORS, inputs);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
Integer mappedID =
(edgeRemapings.keySet().contains(inputID))
? edgeRemapings.get(inputID)
: inputID;
decorateEdge(inputs, inEdge, mappedID);
}
}
jsonArray.add(node);
toVisit.remove(vertexID);
} else {
Integer iterationHead = -1;
for (StreamEdge inEdge : vertex.getInEdges()) {
int operator = inEdge.getSourceId();
if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
iterationHead = operator;
}
}
ObjectNode obj = mapper.createObjectNode();
ArrayNode iterationSteps = mapper.createArrayNode();
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
ArrayNode iterationInputs = mapper.createArrayNode();
obj.put(PREDECESSORS, iterationInputs);
toVisit.remove(iterationHead);
visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
jsonArray.add(obj);
}
if (!toVisit.isEmpty()) {
visit(jsonArray, toVisit, edgeRemapings);
}
}
private void visitIteration(
ArrayNode jsonArray,
List<Integer> toVisit,
int headId,
Map<Integer, Integer> edgeRemapings,
ArrayNode iterationInEdges) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);
// Ignoring head and tail to avoid redundancy
if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
ObjectNode obj = mapper.createObjectNode();
jsonArray.add(obj);
decorateNode(vertexID, obj);
ArrayNode inEdges = mapper.createArrayNode();
obj.put(PREDECESSORS, inEdges);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
if (edgeRemapings.keySet().contains(inputID)) {
decorateEdge(inEdges, inEdge, inputID);
} else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
decorateEdge(iterationInEdges, inEdge, inputID);
}
}
edgeRemapings.put(vertexID, headId);
visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
}
}
private void decorateEdge(ArrayNode inputArray, StreamEdge inEdge, int mappedInputID) {
ObjectNode input = mapper.createObjectNode();
inputArray.add(input);
input.put(ID, mappedInputID);
input.put(SHIP_STRATEGY, inEdge.getPartitioner().toString());
input.put(SIDE, (inputArray.size() == 0) ? "first" : "second");
}
private void decorateNode(Integer vertexID, ObjectNode node) {
StreamNode vertex = streamGraph.getStreamNode(vertexID);
node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
if (streamGraph.getSourceIDs().contains(vertexID)) {
node.put(PACT, "Data Source");
} else if (streamGraph.getSinkIDs().contains(vertexID)) {
node.put(PACT, "Data Sink");
} else {
node.put(PACT, "Operator");
}
node.put(CONTENTS, vertex.getOperatorName());
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dlink</groupId>
<artifactId>dlink</artifactId>
<packaging>pom</packaging>
<version>0.1-SNAPSHOT</version>
<modules>
<module>dlink-core</module>
<module>dlink-admin</module>
</modules>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot-dependencies.version>2.3.8.RELEASE</spring-boot-dependencies.version>
<hutool.version>5.1.4</hutool.version>
<druid-starter>1.1.22</druid-starter>
<mybatis-plus-boot-starter.version>3.4.0</mybatis-plus-boot-starter.version>
<lombok.version>1.18.16</lombok.version>
<jackjson.version>2.11.4</jackjson.version>
<guava.version>21.0</guava.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- hutool java工具类库 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- mybatis-plus start -->
<!--<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-extension</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>-->
<!-- druid 官方 starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid-starter}</version>
</dependency>
<!-- mybatis-plus start -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot-dependencies.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackjson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackjson.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment