Commit 9f177ae7 authored by godkaikai's avatar godkaikai

sql的yarn-application提交和系统配置

parent 3c11f876
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.SysConfig;
import com.dlink.service.SysConfigService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
/**
* SysConfigController
*
* @author wenmo
* @since 2021/11/18
**/
@Slf4j
@RestController
@RequestMapping("/api/sysConfig")
public class SysConfigController {
@Autowired
private SysConfigService sysConfigService;
/**
* 新增或者更新
*/
@PutMapping
public Result saveOrUpdate(@RequestBody SysConfig sysConfig) throws Exception {
if(sysConfigService.saveOrUpdate(sysConfig)){
return Result.succeed("新增成功");
}else {
return Result.failed("新增失败");
}
}
/**
* 动态查询列表
*/
@PostMapping
public ProTableResult<SysConfig> listSysConfigs(@RequestBody JsonNode para) {
return sysConfigService.selectForProTable(para);
}
/**
* 批量删除
*/
@DeleteMapping
public Result deleteMul(@RequestBody JsonNode para) {
if (para.size()>0){
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
if(!sysConfigService.removeById(id)){
error.add(id);
}
}
if(error.size()==0) {
return Result.succeed("删除成功");
}else {
return Result.succeed("删除部分成功,但"+error.toString()+"删除失败,共"+error.size()+"次失败。");
}
}else{
return Result.failed("请选择要删除的记录");
}
}
/**
* 获取指定ID的信息
*/
@PostMapping("/getOneById")
public Result getOneById(@RequestBody SysConfig sysConfig) throws Exception {
sysConfig = sysConfigService.getById(sysConfig.getId());
return Result.succeed(sysConfig,"获取成功");
}
/**
* 批量更新配置
*/
@PostMapping("/updateSysConfigByJson")
public Result updateSysConfigByJson(@RequestBody JsonNode para) throws Exception {
sysConfigService.updateSysConfigByJson(para);
return Result.succeed("更新配置成功");
}
}
package com.dlink.init;
import com.dlink.service.SysConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* SystemInit
*
* @author wenmo
* @since 2021/11/18
**/
@Component
@Order(value = 1)
public class SystemInit implements ApplicationRunner {
@Autowired
private SysConfigService sysConfigService;
@Override
public void run(ApplicationArguments args) throws Exception {
sysConfigService.initSysConfig();
}
}
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.SysConfig;
import org.apache.ibatis.annotations.Mapper;
/**
* SysConfig
*
* @author wenmo
* @since 2021/11/18
**/
@Mapper
public interface SysConfigMapper extends SuperMapper<SysConfig> {
}
......@@ -7,6 +7,8 @@ import com.dlink.db.model.SuperEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
/**
* Jar
*
......
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.db.annotation.Save;
import com.dlink.db.model.SuperEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* SysConfig
*
* @author wenmo
* @since 2021/11/18
**/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_sys_config")
public class SysConfig implements Serializable {
private static final long serialVersionUID = 3769276772487490408L;
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
@NotNull(message = "配置名不能为空", groups = {Save.class})
private String name;
private String value;
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime;
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime;
protected Serializable pkVal() {
return this.id;
}
}
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.SysConfig;
import com.fasterxml.jackson.databind.JsonNode;
/**
* SysConfig
*
* @author wenmo
* @since 2021/11/18
**/
public interface SysConfigService extends ISuperService<SysConfig> {
void initSysConfig();
void updateSysConfigByJson(JsonNode node);
}
package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.SysConfigMapper;
import com.dlink.model.SysConfig;
import com.dlink.model.SystemConfiguration;
import com.dlink.service.SysConfigService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* SysConfigServiceImpl
*
* @author wenmo
* @since 2021/11/18
**/
@Service
public class SysConfigServiceImpl extends SuperServiceImpl<SysConfigMapper, SysConfig> implements SysConfigService {
private static final ObjectMapper mapper = new ObjectMapper();
@Override
public void initSysConfig() {
List<SysConfig> sysConfigs = list();
if(sysConfigs.size()==0){
return;
}
Map<String,String> map = new HashMap<>();
for(SysConfig item : sysConfigs){
map.put(item.getName(),item.getValue());
}
SystemConfiguration.getInstances().setConfiguration(mapper.valueToTree(map));
}
@Override
public void updateSysConfigByJson(JsonNode node) {
if (node!=null&&node.isObject()) {
Iterator<Map.Entry<String, JsonNode>> it = node.fields();
while (it.hasNext()) {
Map.Entry<String, JsonNode> entry = it.next();
String name = entry.getKey();
String value = entry.getValue().asText();
SysConfig config = getOne(new QueryWrapper<SysConfig>().eq("name", name));
SysConfig newConfig = new SysConfig();
newConfig.setValue(value);
if(Asserts.isNull(config)){
newConfig.setName(name);
save(newConfig);
}else {
newConfig.setId(config.getId());
updateById(newConfig);
}
}
}
SystemConfiguration.getInstances().setConfiguration(node);
}
}
......@@ -8,14 +8,18 @@ import com.dlink.job.JobResult;
import com.dlink.mapper.TaskMapper;
import com.dlink.model.Cluster;
import com.dlink.model.Statement;
import com.dlink.model.SystemConfiguration;
import com.dlink.model.Task;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.StatementService;
import com.dlink.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* 任务 服务实现类
*
......@@ -42,7 +46,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if(!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
}else{
config.buildGatewayConfig(clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId()));
Map<String, String> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
if("yarn-application".equals(config.getType())||"ya".equals(config.getType())){
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath",systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas",systemConfiguration.getSqlSubmitJarParas() + config.getTaskId());
gatewayConfig.put("userJarMainAppClass",systemConfiguration.getSqlSubmitJarMainAppClass());
}
config.buildGatewayConfig(gatewayConfig);
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(statement.getStatement());
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.SysConfigMapper">
<select id="selectForProTable" resultType="com.dlink.model.SysConfig">
select
a.*
from
dlink_sys_config a
<where>
1=1
<if test='param.name!=null and param.name!=""'>
and a.name like "%${param.name}%"
</if>
<if test='param.createTime!=null and param.createTime!=""'>
and a.create_time <![CDATA[>=]]> str_to_date( #{param.createTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='param.updateTime!=null and param.updateTime!=""'>
and a.update_time <![CDATA[>=]]> str_to_date( #{param.updateTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
......@@ -13,7 +13,7 @@
<properties>
<mainClass>com.dlink.app.MainApp</mainClass>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.5</flink.version>
<flink.version>1.13.2</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
......@@ -26,21 +26,25 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
......@@ -51,28 +55,37 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>provided</scope>
<version>8.0.21</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId>
</dependency>
</dependencies>
<build>
......
package com.dlink.app;
import com.dlink.app.assertion.Asserts;
import com.dlink.app.db.DBConfig;
import com.dlink.app.executor.Executor;
import com.dlink.app.flinksql.FlinkSQLFactory;
import org.apache.flink.api.java.utils.ParameterTool;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* MainApp
......@@ -23,8 +19,9 @@ public class MainApp {
System.out.println(LocalDateTime.now() + "任务开始");
ParameterTool parameters = ParameterTool.fromArgs(args);
String id = parameters.get("id", null);
if (Asserts.isNotNullString(id)) {
Executor.build().submit(FlinkSQLFactory.getStatements(Integer.valueOf(id), DBConfig.build(parameters)));
if (id!=null&&!"".equals(id)) {
DBConfig dbConfig = DBConfig.build(parameters);
FlinkSQLFactory.submit(Integer.valueOf(id),dbConfig);
}
}
}
package com.dlink.app.assertion;
import java.util.Collection;
import java.util.Map;
/**
* Asserts
*
* @author wenmo
* @since 2021/7/5 21:57
*/
public class Asserts {
public static boolean isNotNull(Object object){
return object!=null;
}
public static boolean isNull(Object object){
return object==null;
}
public static boolean isNullString(String str){
return isNull(str)||"".equals(str);
}
public static boolean isNotNullString(String str){
return !isNullString(str);
}
public static boolean isEquals(String str1,String str2){
if(isNull(str1)&&isNull(str2)){
return true;
}else if(isNull(str1)||isNull(str2)){
return false;
}else{
return str1.equals(str2);
}
}
public static boolean isEqualsIgnoreCase(String str1,String str2){
if(isNull(str1)&&isNull(str2)){
return true;
}else if(isNull(str1)||isNull(str2)){
return false;
}else{
return str1.equalsIgnoreCase(str2);
}
}
public static boolean isNullCollection(Collection collection) {
if (isNull(collection)||collection.size()==0) {
return true;
}
return false;
}
public static boolean isNotNullCollection(Collection collection) {
return !isNullCollection(collection);
}
public static boolean isNullMap(Map map) {
if (isNull(map)||map.size()==0) {
return true;
}
return false;
}
public static boolean isNotNullMap(Map map) {
return !isNullMap(map);
}
public static void checkNull(String key,String msg) {
if (key == null||"".equals(key)) {
throw new RuntimeException(msg);
}
}
public static void checkNotNull(Object object,String msg) {
if (isNull(object)) {
throw new RuntimeException(msg);
}
}
public static void checkNullString(String key,String msg) {
if (isNull(key)||isEquals("",key)) {
throw new RuntimeException(msg);
}
}
public static void checkNullCollection(Collection collection,String msg) {
if(isNullCollection(collection)){
throw new RuntimeException(msg);
}
}
public static void checkNullMap(Map map,String msg) {
if(isNullMap(map)){
throw new RuntimeException(msg);
}
}
}
package com.dlink.app.constant;
/**
* AppConstant
*
* @author wenmo
* @since 2021/10/27
**/
public class AppConstant {
public static final String FLINKSQL_SEPARATOR = ";";
}
......@@ -6,8 +6,8 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -59,4 +59,44 @@ public class DBUtil {
}*/
return result;
}
public static Map<String,String> getMapByID(String sql,DBConfig config) throws SQLException, IOException {
Connection conn = getConnection(config);
HashMap<String,String> map = new HashMap();
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
List<String> columnList = new ArrayList<>();
for(int i =0;i<rs.getMetaData().getColumnCount();i++){
columnList.add(rs.getMetaData().getColumnName(i));
}
if (rs.next()) {
for(int i =0;i<columnList.size();i++){
map.put(columnList.get(i),rs.getString(i));
}
}
}
close(conn);
return map;
}
public static List<Map<String,String>> getListByID(String sql,DBConfig config) throws SQLException, IOException {
Connection conn = getConnection(config);
List<Map<String,String>> list = new ArrayList<>();
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
List<String> columnList = new ArrayList<>();
for(int i =0;i<rs.getMetaData().getColumnCount();i++){
columnList.add(rs.getMetaData().getColumnName(i));
}
while (rs.next()) {
HashMap<String,String> map = new HashMap();
for(int i =0;i<columnList.size();i++){
map.put(columnList.get(i),rs.getString(i));
}
list.add(map);
}
}
close(conn);
return list;
}
}
package com.dlink.app.executor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import java.util.List;
/**
* Executor
*
* @author wenmo
* @since 2021/10/27
**/
public class Executor {
private StreamExecutionEnvironment environment;
private StreamTableEnvironment stEnvironment;
private ExecutorSetting executorSetting;
private SqlManager sqlManager;
public static Executor build(){
return new Executor(ExecutorSetting.DEFAULT);
}
public static Executor build(ExecutorSetting setting){
return new Executor(setting);
}
private Executor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.sqlManager = new SqlManager();
init(executorSetting);
}
private void init(ExecutorSetting setting){
this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
this.stEnvironment = StreamTableEnvironment.create(this.environment);
}
private void executeSql(String statement){
if(executorSetting.isUseSqlFragment()) {
statement = sqlManager.parseVariable(statement);
if(statement.length() > 0 && checkShowFragments(statement)){
stEnvironment.executeSql(statement);
}
}else{
stEnvironment.executeSql(statement);
}
}
public void submit(List<String> statements){
for(String statement : statements){
if(statement==null||"".equals(statement.trim())){
continue;
}
executeSql(statement);
}
}
private boolean checkShowFragments(String sql){
return sqlManager.checkShowFragments(sql);
}
}
package com.dlink.app.executor;
import org.apache.flink.api.java.utils.ParameterTool;
import java.util.Map;
/**
* ExecutorSetting
*
* @author wenmo
* @since 2021/5/25 13:43
**/
public class ExecutorSetting {
private Integer checkpoint;
private Integer parallelism;
private boolean useSqlFragment;
private String savePointPath;
private String jobName;
private Map<String,String> config;
public static final ExecutorSetting DEFAULT = new ExecutorSetting(0,1,true);
public ExecutorSetting(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(Integer checkpoint) {
this.checkpoint = checkpoint;
}
public ExecutorSetting(Integer checkpoint, boolean useSqlFragment) {
this.checkpoint = checkpoint;
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
this.jobName = jobName;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName, Map<String, String> config) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
this.jobName = jobName;
this.config = config;
}
public Integer getCheckpoint() {
return checkpoint;
}
public Integer getParallelism() {
return parallelism;
}
public boolean isUseSqlFragment() {
return useSqlFragment;
}
public String getSavePointPath() {
return savePointPath;
}
public String getJobName() {
return jobName;
}
public Map<String, String> getConfig() {
return config;
}
}
package com.dlink.app.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
public final class SqlManager {
private Map<String, String> sqlFragments;
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
sqlFragments = new HashMap<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public List<String> listSqlFragments() {
return new ArrayList<>(sqlFragments.keySet());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
/*if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}*/
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public Map<String, String> getSqlFragment() {
return sqlFragments;
}
public Iterator getSqlFragmentsIterator() {
return sqlFragments.entrySet().iterator();
}
public boolean checkShowFragments(String sql){
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) {
return statement;
}
String[] strs = statement.split(";");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim();
if (str.length() == 0) {
continue;
}
if (str.contains(":=")) {
String[] strs2 = str.split(":=");
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(":=") + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
}
} else {
sb.append(replaceVariable(str));
}
}
return sb.toString();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
}
package com.dlink.app.flinksql;
import com.dlink.app.constant.AppConstant;
import com.dlink.app.db.DBConfig;
import com.dlink.app.db.DBUtil;
import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType;
import com.dlink.trans.Operations;
import org.apache.flink.table.api.StatementSet;
import java.io.IOException;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* FlinkSQLFactory
......@@ -25,6 +34,15 @@ public class FlinkSQLFactory {
return "select statement from dlink_task_statement where id = " + id;
}
private static String getTaskInfo(Integer id) throws SQLException {
if (id == null) {
throw new SQLException("请指定任务ID");
}
return "select id, name, alias, type,check_point as checkPoint," +
"save_point_path as savePointPath, parallelism,fragment,statement_set as statementSet,config" +
" from dlink_task where id = " + id;
}
private static String getFlinkSQLStatement(Integer id, DBConfig config) {
String statement = "";
try {
......@@ -38,7 +56,62 @@ public class FlinkSQLFactory {
return statement;
}
public static Map<String,String> getTaskConfig(Integer id, DBConfig config) {
Map<String,String> task = new HashMap<>();
try {
task = DBUtil.getMapByID(getQuerySQL(id),config);
} catch (IOException | SQLException e) {
e.printStackTrace();
System.err.println(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 配置异常,ID 为"+ id );
System.err.println(LocalDateTime.now().toString() + "连接信息为:"+ config.toString() );
System.err.println(LocalDateTime.now().toString() + "异常信息为:"+ e.getMessage() );
}
return task;
}
public static List<String> getStatements(Integer id, DBConfig config){
return Arrays.asList(getFlinkSQLStatement(id, config).split(AppConstant.FLINKSQL_SEPARATOR));
return Arrays.asList(getFlinkSQLStatement(id, config).split(FlinkSQLConstant.SEPARATOR));
}
public static void submit(Integer id,DBConfig dbConfig){
List<String> statements = FlinkSQLFactory.getStatements(Integer.valueOf(id), dbConfig);
ExecutorSetting executorSetting = ExecutorSetting.build(FlinkSQLFactory.getTaskConfig(Integer.valueOf(id),dbConfig));
Executor executor = Executor.buildLocalExecutor(executorSetting);
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
for (String item : statements) {
String statement = FlinkInterceptor.pretreatStatement(executor, item);
if (statement.isEmpty()) {
continue;
}
SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
trans.add(new StatementParam(statement, operationType));
if (!executorSetting.isUseStatementSet()) {
break;
}
} else {
ddl.add(new StatementParam(statement, operationType));
}
}
if(executorSetting.isUseStatementSet()) {
List<String> inserts = new ArrayList<>();
StatementSet statementSet = executor.createStatementSet();
for (StatementParam item : trans) {
if(item.getType().equals(SqlType.INSERT)) {
statementSet.addInsertSql(item.getValue());
inserts.add(item.getValue());
}
}
if(inserts.size()>0) {
statementSet.execute();
}
}else{
for (StatementParam item : trans) {
executor.executeSql(item.getValue());
break;
}
}
System.out.println(LocalDateTime.now() + "任务提交成功");
}
}
package com.dlink.app.flinksql;
import com.dlink.parser.SqlType;
/**
* StatementParam
*
* @author wenmo
* @since 2021/11/16
*/
public class StatementParam {
private String value;
private SqlType type;
public StatementParam(String value, SqlType type) {
this.value = value;
this.type = type;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public SqlType getType() {
return type;
}
public void setType(SqlType type) {
this.type = type;
}
}
......@@ -172,5 +172,12 @@
<include>dlink-executor-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-app/target</directory>
<outputDirectory>jar</outputDirectory>
<includes>
<include>dlink-app-${project.version}.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>
\ No newline at end of file
package com.dlink.model;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
/**
* SystemConfiguration
*
* @author wenmo
* @since 2021/11/18
**/
public class SystemConfiguration {
private static volatile SystemConfiguration systemConfiguration = new SystemConfiguration();
public static SystemConfiguration getInstances() {
return systemConfiguration;
}
private Configuration sqlSubmitJarPath = new Configuration(
"sqlSubmitJarPath",
"FlinkSQL提交Jar路径",
ValueType.STRING,
"hdfs:///dlink/jar/dlink-app.jar",
"用于指定Applcation模式提交FlinkSQL的Jar的路径"
);
private Configuration sqlSubmitJarParas = new Configuration(
"sqlSubmitJarParas",
"FlinkSQL提交Jar参数",
ValueType.STRING,
"--id ",
"用于指定Applcation模式提交FlinkSQL的Jar的参数"
);
private Configuration sqlSubmitJarMainAppClass = new Configuration(
"sqlSubmitJarMainAppClass",
"FlinkSQL提交Jar主类",
ValueType.STRING,
"com.dlink.app.MainApp",
"用于指定Applcation模式提交FlinkSQL的Jar的主类"
);
public void setConfiguration(JsonNode jsonNode){
if(jsonNode.has("sqlSubmitJarPath")){
setSqlSubmitJarPath(jsonNode.get("sqlSubmitJarPath").asText());
}
if(jsonNode.has("sqlSubmitJarParas")){
setSqlSubmitJarParas(jsonNode.get("sqlSubmitJarParas").asText());
}
if(jsonNode.has("sqlSubmitJarMainAppClass")){
setSqlSubmitJarMainAppClass(jsonNode.get("sqlSubmitJarMainAppClass").asText());
}
}
public String getSqlSubmitJarParas() {
return sqlSubmitJarParas.getValue().toString();
}
public void setSqlSubmitJarParas(String sqlSubmitJarParas) {
this.sqlSubmitJarParas.setValue(sqlSubmitJarParas);
}
public String getSqlSubmitJarPath() {
return sqlSubmitJarPath.getValue().toString();
}
public void setSqlSubmitJarPath(String sqlSubmitJarPath) {
this.sqlSubmitJarPath.setValue(sqlSubmitJarPath);
}
public String getSqlSubmitJarMainAppClass() {
return sqlSubmitJarMainAppClass.getValue().toString();
}
public void setSqlSubmitJarMainAppClass(String sqlSubmitJarMainAppClass) {
this.sqlSubmitJarMainAppClass.setValue(sqlSubmitJarMainAppClass);
}
enum ValueType{
STRING,INT,DOUBLE,FLOAT,BOOLEAN,DATE
}
public class Configuration{
private String name;
private String label;
private ValueType type;
private Object defaultValue;
private Object value;
private String note;
public Configuration(String name, String label, ValueType type, Object defaultValue, String note) {
this.name = name;
this.label = label;
this.type = type;
this.defaultValue = defaultValue;
this.value = defaultValue;
this.note = note;
}
public void setValue(Object value) {
this.value = value;
}
public Object getValue() {
return value;
}
}
}
package com.dlink.job;
import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.session.SessionConfig;
......@@ -100,5 +101,12 @@ public class JobConfig {
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath"),
config.get("flinkLibPath"),
config.get("hadoopConfigPath")));
if(config.containsKey("userJarPath")){
gatewayConfig.setAppConfig(AppConfig.build(
config.get("userJarPath"),
config.get("userJarParas"),
config.get("userJarMainAppClass")
));
}
}
}
......@@ -9,6 +9,7 @@ import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.explainer.Explainer;
import com.dlink.gateway.Gateway;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.interceptor.FlinkInterceptor;
......@@ -388,22 +389,21 @@ public class JobManager extends RunTime {
try {
for (StatementParam item : jobParam.getDdl()) {
currentSql = item.getValue();
/*if (!FlinkInterceptor.build(executor, item.getValue())) {
executor.executeSql(item.getValue());
}*/
executor.executeSql(item.getValue());
}
if(config.isUseStatementSet()&&useGateway) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
/*if (!FlinkInterceptor.build(executor, item.getValue())) {
inserts.add(item.getValue());
}*/
inserts.add(item.getValue());
}
currentSql = String.join(FlinkSQLConstant.SEPARATOR,inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
GatewayResult gatewayResult = null;
if(GatewayType.YARN_APPLICATION.equalsValue(config.getType())){
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
}else{
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
}
InsertResult insertResult = new InsertResult(gatewayResult.getAppId(), true);
job.setResult(insertResult);
job.setJobId(gatewayResult.getAppId());
......@@ -413,10 +413,6 @@ public class JobManager extends RunTime {
StatementSet statementSet = stEnvironment.createStatementSet();
for (StatementParam item : jobParam.getTrans()) {
if(item.getType().equals(SqlType.INSERT)) {
/*if (!FlinkInterceptor.build(executor, item.getValue())) {
statementSet.addInsertSql(item.getValue());
inserts.add(item.getValue());
}*/
statementSet.addInsertSql(item.getValue());
inserts.add(item.getValue());
}
......@@ -435,16 +431,17 @@ public class JobManager extends RunTime {
}else if(!config.isUseStatementSet()&&useGateway) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
/*if (!FlinkInterceptor.build(executor, item.getValue())) {
inserts.add(item.getValue());
break;
}*/
inserts.add(item.getValue());
break;
}
currentSql = String.join(FlinkSQLConstant.SEPARATOR,inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
GatewayResult gatewayResult = null;
if(GatewayType.YARN_APPLICATION.equalsValue(config.getType())){
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
}else{
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
}
InsertResult insertResult = new InsertResult(gatewayResult.getAppId(), true);
job.setResult(insertResult);
job.setJobId(gatewayResult.getAppId());
......@@ -491,7 +488,6 @@ public class JobManager extends RunTime {
List<StatementParam> trans = new ArrayList<>();
for (String item : statements) {
String statement = FlinkInterceptor.pretreatStatement(executor,item);
// String statement = SqlUtil.removeNote(item);
if (statement.isEmpty()) {
continue;
}
......
......@@ -30,10 +30,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......
......@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction;
......@@ -202,4 +203,8 @@ public abstract class Executor {
public JobGraph getJobGraphFromInserts(List<String> statements){
return stEnvironment.getJobGraphFromInserts(statements);
}
public StatementSet createStatementSet(){
return stEnvironment.createStatementSet();
}
}
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
......@@ -17,10 +25,12 @@ public class ExecutorSetting {
private Integer checkpoint;
private Integer parallelism;
private boolean useSqlFragment;
private boolean useStatementSet;
private String savePointPath;
private String jobName;
private Map<String,String> config;
public static final ExecutorSetting DEFAULT = new ExecutorSetting(0,1,true);
private static final ObjectMapper mapper = new ObjectMapper();
public ExecutorSetting(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment;
......@@ -64,4 +74,46 @@ public class ExecutorSetting {
this.jobName = jobName;
this.config = config;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment,boolean useStatementSet, String savePointPath, String jobName, Map<String, String> config) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.useStatementSet = useStatementSet;
this.savePointPath = savePointPath;
this.jobName = jobName;
this.config = config;
}
public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boolean useSqlFragment,boolean useStatementSet, String savePointPath, String jobName, String configJson){
JsonNode paras = null;
Map<String,String> config = new HashMap<>();
if(Asserts.isNotNullString(configJson)) {
try {
paras = mapper.readTree(configJson);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
config = mapper.convertValue(paras, new TypeReference<Map<String, String>>(){});
}
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,useStatementSet,savePointPath,jobName,config);
}
public static ExecutorSetting build(Map<String,String> settingMap){
Integer checkpoint = null;
Integer parallelism = null;
if(settingMap.containsKey("checkpoint")&&!"".equals(settingMap.get("checkpoint"))){
checkpoint = Integer.valueOf(settingMap.get("checkpoint"));
}
if(settingMap.containsKey("parallelism")&&!"".equals(settingMap.get("parallelism"))){
parallelism = Integer.valueOf(settingMap.get("parallelism"));
}
return build(checkpoint,
parallelism,
"1".equals(settingMap.get("useSqlFragment")),
"1".equals(settingMap.get("useStatementSet")),
settingMap.get("savePointPath"),
settingMap.get("jobName"),
settingMap.get("config"));
}
}
package com.dlink.gateway.config;
import com.dlink.assertion.Asserts;
import lombok.Getter;
import lombok.Setter;
......@@ -18,4 +19,19 @@ public class AppConfig {
public AppConfig() {
}
public AppConfig(String userJarPath, String[] userJarParas, String userJarMainAppClass) {
this.userJarPath = userJarPath;
this.userJarParas = userJarParas;
this.userJarMainAppClass = userJarMainAppClass;
}
public static AppConfig build(String userJarPath, String userJarParasStr, String userJarMainAppClass){
if(Asserts.isNotNullString(userJarParasStr)){
return new AppConfig(userJarPath,userJarParasStr.split(" "),userJarMainAppClass);
}else{
return new AppConfig(userJarPath,new String[]{},userJarMainAppClass);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment