Commit 179cd24e authored by wenmo's avatar wenmo

batch model

parent caaa087c
......@@ -109,6 +109,10 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-base</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-base</artifactId>
......@@ -124,7 +128,7 @@
<!--<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-mysql</artifactId>
<version>0.5.0</version>
<version>0.6.0-SNAPSHOT</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
......
......@@ -27,6 +27,7 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
private boolean useChangeLog;
private boolean useAutoCancel;
private boolean statementSet;
private boolean batchModel;
private boolean useSession;
private String session;
private Integer clusterId;
......@@ -60,7 +61,7 @@ public class StudioExecuteDTO extends AbstractStatementDTO{
}
return new JobConfig(
type,useResult,useChangeLog,useAutoCancel, useSession, session, clusterId,
clusterConfigurationId,jarId, taskId, jobName, fragment,statementSet,
clusterConfigurationId,jarId, taskId, jobName, fragment,statementSet,batchModel,
maxRowNum, checkPoint, parallelism,savePointStrategy, savePointPath,config);
}
}
......@@ -25,7 +25,7 @@ import java.util.Map;
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_task")
public class Task extends SuperEntity{
public class Task extends SuperEntity {
private static final long serialVersionUID = 5988972129893667154L;
......@@ -48,6 +48,8 @@ public class Task extends SuperEntity{
private boolean statementSet;
private boolean batchModel;
private Integer clusterId;
private Integer clusterConfigurationId;
......@@ -74,13 +76,13 @@ public class Task extends SuperEntity{
private List<Savepoints> savepoints;
@TableField(exist = false)
private List<Map<String,String>> config = new ArrayList<>();
private List<Map<String, String>> config = new ArrayList<>();
public List<Map<String,String>> parseConfig(){
public List<Map<String, String>> parseConfig() {
ObjectMapper objectMapper = new ObjectMapper();
try {
if(Asserts.isNotNullString(configJson)) {
if (Asserts.isNotNullString(configJson)) {
config = objectMapper.readValue(configJson, ArrayList.class);
}
} catch (JsonProcessingException e) {
......@@ -89,16 +91,17 @@ public class Task extends SuperEntity{
return config;
}
public JobConfig buildSubmitConfig(){
public JobConfig buildSubmitConfig() {
boolean useRemote = true;
if(clusterId==null||clusterId==0){
if (clusterId == null || clusterId == 0) {
useRemote = false;
}
Map<String,String> map = new HashMap<>();
for(Map<String,String> item : config){
map.put(item.get("key"),item.get("value"));
Map<String, String> map = new HashMap<>();
for (Map<String, String> item : config) {
map.put(item.get("key"), item.get("value"));
}
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,jarId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointStrategy,savePointPath,map);
return new JobConfig(type, false, false, useRemote, clusterId, clusterConfigurationId, jarId, getId(),
alias, fragment, statementSet, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map);
}
}
......@@ -2,32 +2,6 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.TaskMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.dlink.model.Task">
<id column="id" property="id" />
<result column="name" property="name" />
<result column="alias" property="alias" />
<result column="dialect" property="dialect" />
<result column="type" property="type" />
<result column="check_point" property="checkPoint" />
<result column="save_point_strategy" property="savePointStrategy" />
<result column="save_point_path" property="savePointPath" />
<result column="parallelism" property="parallelism" />
<result column="fragment" property="fragment" />
<result column="statement_set" property="statementSet" />
<result column="cluster_id" property="clusterId" />
<result column="cluster_configuration_id" property="clusterConfigurationId" />
<result column="database_id" property="databaseId" />
<result column="jar_id" property="jarId" />
<result column="env_id" property="envId" />
<result column="config_json" property="configJson" />
<result column="note" property="note" />
<result column="step" property="step" />
<result column="enabled" property="enabled" />
<result column="create_time" property="createTime" />
<result column="update_time" property="updateTime" />
</resultMap>
<select id="selectForProTable" resultType="com.dlink.model.Task">
select
a.*
......
......@@ -145,7 +145,7 @@ public class Submiter {
}
logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes));
try {
executor.getEnvironment().execute(executorSetting.getJobName());
executor.execute(executorSetting.getJobName());
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -28,6 +28,17 @@
<artifactId>dlink-client-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
......
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -14,7 +15,10 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
......@@ -37,7 +41,6 @@ import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.types.Row;
import java.lang.reflect.Method;
import java.util.ArrayList;
......@@ -51,7 +54,7 @@ import java.util.Map;
* @author wenmo
* @since 2021/6/7 22:06
**/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment {
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
......@@ -207,21 +210,21 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public boolean parseAndLoadConfiguration(String statement,StreamExecutionEnvironment environment,Map<String,Object> setMap){
public boolean parseAndLoadConfiguration(String statement, ExecutionConfig executionConfig, Map<String,Object> setMap){
List<Operation> operations = getParser().parse(statement);
for(Operation operation : operations){
if(operation instanceof SetOperation){
callSet((SetOperation)operation,environment,setMap);
callSet((SetOperation)operation,executionConfig,setMap);
return true;
} else if (operation instanceof ResetOperation){
callReset((ResetOperation)operation,environment,setMap);
callReset((ResetOperation)operation,executionConfig,setMap);
return true;
}
}
return false;
}
private void callSet(SetOperation setOperation,StreamExecutionEnvironment environment,Map<String,Object> setMap){
private void callSet(SetOperation setOperation, ExecutionConfig executionConfig,Map<String,Object> setMap){
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
String key = setOperation.getKey().get().trim();
String value = setOperation.getValue().get().trim();
......@@ -229,19 +232,19 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
confMap.put(key,value);
setMap.put(key,value);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
executionConfig.configure(configuration,null);
getConfig().addConfiguration(configuration);
}
}
private void callReset(ResetOperation resetOperation,StreamExecutionEnvironment environment,Map<String,Object> setMap) {
private void callReset(ResetOperation resetOperation, ExecutionConfig executionConfig,Map<String,Object> setMap) {
if (resetOperation.getKey().isPresent()) {
String key = resetOperation.getKey().get().trim();
Map<String,String> confMap = new HashMap<>();
confMap.put(key,null);
setMap.remove(key);
Configuration configuration = Configuration.fromMap(confMap);
environment.getConfig().configure(configuration,null);
executionConfig.configure(configuration,null);
getConfig().addConfiguration(configuration);
}else {
setMap.clear();
......
......@@ -14,14 +14,110 @@
<properties>
<source.level>1.8</source.level>
<target.level>1.8</target.level>
<dubbo.version>3.0.2.1</dubbo.version>
<flink.version>1.13.5</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<junit.version>4.12</junit.version>
<spring.version>4.3.16.RELEASE</spring.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-framework-bom</artifactId>
<version>${spring.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-bom</artifactId>
<version>${dubbo.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-dependencies-zookeeper</artifactId>
<version>${dubbo.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-dependencies-zookeeper</artifactId>
<type>pom</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
......
package com.dlink.executor;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* CustomTableEnvironment
*
* @author wenmo
* @since 2022/2/5 10:35
*/
public interface CustomTableEnvironment {
TableConfig getConfig();
CatalogManager getCatalogManager();
void registerCatalog(String catalogName, Catalog catalog);
String[] listCatalogs();
Optional<Catalog> getCatalog(String catalogName);
TableResult executeSql(String statement);
Table sqlQuery(String statement);
void registerTable(String name,Table table);
String explainSql(String statement, ExplainDetail... extraDetails);
ObjectNode getStreamGraph(String statement);
JobPlanInfo getJobPlanInfo(List<String> statements);
StreamGraph getStreamGraphFromInserts(List<String> statements);
JobGraph getJobGraphFromInserts(List<String> statements);
SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails);
boolean parseAndLoadConfiguration(String statement, ExecutionConfig config, Map<String,Object> setMap);
StatementSet createStatementSet();
}
......@@ -215,7 +215,7 @@ public class Explainer {
record = executor.explainSqlRecord(item.getValue());
if (Asserts.isNull(record)) {
record = new SqlExplainResult();
executor.getEnvironment().getStreamGraph();
executor.getStreamGraph();
}else {
executor.executeSql(item.getValue());
}
......
......@@ -42,6 +42,7 @@ public class JobConfig {
private String jobName;
private boolean useSqlFragment;
private boolean useStatementSet;
private boolean useBatchModel;
private Integer maxRowNum;
private Integer checkpoint;
private Integer parallelism;
......@@ -66,7 +67,7 @@ public class JobConfig {
public JobConfig(String type, boolean useResult, boolean useChangeLog, boolean useAutoCancel, boolean useSession, String session, Integer clusterId,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint, Integer parallelism,
boolean useStatementSet, boolean useBatchModel, Integer maxRowNum, Integer checkpoint, Integer parallelism,
Integer savePointStrategyValue, String savePointPath, Map<String,String> config) {
this.type = type;
this.useResult = useResult;
......@@ -82,6 +83,7 @@ public class JobConfig {
this.jobName = jobName;
this.useSqlFragment = useSqlFragment;
this.useStatementSet = useStatementSet;
this.useBatchModel = useBatchModel;
this.maxRowNum = maxRowNum;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
......@@ -126,7 +128,7 @@ public class JobConfig {
public JobConfig(String type,boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue,
boolean useStatementSet,boolean useBatchModel,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue,
String savePointPath,Map<String,String> config) {
this.type = type;
this.useResult = useResult;
......@@ -139,6 +141,7 @@ public class JobConfig {
this.jobName = jobName;
this.useSqlFragment = useSqlFragment;
this.useStatementSet = useStatementSet;
this.useBatchModel = useBatchModel;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.savePointStrategy = SavePointStrategy.get(savePointStrategyValue);
......@@ -147,7 +150,7 @@ public class JobConfig {
}
public ExecutorSetting getExecutorSetting(){
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,useStatementSet,savePointPath,jobName,config);
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,useStatementSet,useBatchModel,savePointPath,jobName,config);
}
public void setSessionConfig(SessionConfig sessionConfig){
......
......@@ -328,7 +328,7 @@ public class JobManager {
break;
}
}
StreamGraph streamGraph = executor.getEnvironment().getStreamGraph();
StreamGraph streamGraph = executor.getStreamGraph();
streamGraph.setJobName(config.getJobName());
JobGraph jobGraph = streamGraph.getJobGraph();
GatewayResult gatewayResult = null;
......@@ -352,7 +352,7 @@ public class JobManager {
break;
}
}
JobExecutionResult jobExecutionResult = executor.getEnvironment().execute(config.getJobName());
JobExecutionResult jobExecutionResult = executor.execute(config.getJobName());
if (jobExecutionResult.isJobExecutionResult()) {
job.setJobId(jobExecutionResult.getJobID().toHexString());
}
......
package com.dlink.core;
import com.dlink.executor.CustomBatchTableEnvironmentImpl;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.junit.Test;
/**
* BatchTest
*
* @author wenmo
* @since 2022/2/7 23:15
*/
public class BatchTest {
@Test
public void batchTest(){
String source = "CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" buyer ROW<first_name STRING, last_name STRING>,\n" +
" order_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'number-of-rows' = '100'\n" +
")";
String select = "select order_number,price,order_time from Orders";
// LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
// .inStreamingMode() // 声明为流任务
.inBatchMode() // 声明为批任务
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql(source);
TableResult tableResult = tEnv.executeSql(select);
tableResult.print();
}
@Test
public void batchTest2(){
String source = "CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" buyer ROW<first_name STRING, last_name STRING>,\n" +
" order_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'number-of-rows' = '100'\n" +
")";
String select = "select order_number,price,order_time from Orders";
LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
CustomBatchTableEnvironmentImpl batchTableEnvironment = CustomBatchTableEnvironmentImpl.create(environment);
batchTableEnvironment.executeSql(source);
TableResult tableResult = batchTableEnvironment.executeSql(select);
tableResult.print();
}
@Test
public void batchTest3(){
String source = "CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" buyer ROW<first_name STRING, last_name STRING>,\n" +
" order_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'number-of-rows' = '100'\n" +
")";
String select = "select order_number,price,order_time from Orders";
LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
CustomBatchTableEnvironmentImpl batchTableEnvironment = CustomBatchTableEnvironmentImpl.create(environment);
batchTableEnvironment.executeSql(source);
TableResult tableResult = batchTableEnvironment.executeSql(select);
tableResult.print();
}
}
......@@ -27,7 +27,7 @@ public class JobManagerTest {
public void cancelJobSelect(){
JobConfig config = new JobConfig("session-yarn",true,true, true,true, "s1", 2,
null, null,null, "测试", false,false, 100, 0,
null, null,null, "测试", false,false,false, 100, 0,
1, 0,null,new HashMap<>());
if(config.isUseRemote()) {
config.setAddress("192.168.123.157:8081");
......
......@@ -254,6 +254,7 @@ CREATE TABLE `dlink_task` (
`parallelism` int(4) NULL DEFAULT NULL COMMENT 'parallelism',
`fragment` tinyint(1) NULL DEFAULT NULL COMMENT 'fragment',
`statement_set` tinyint(1) NULL DEFAULT NULL COMMENT '启用语句集',
`batch_model` tinyint(1) NULL DEFAULT 0 COMMENT '使用批模式',
`cluster_id` int(11) NULL DEFAULT NULL COMMENT 'Flink集群ID',
`cluster_configuration_id` int(11) NULL DEFAULT NULL COMMENT '集群配置ID',
`database_id` int(11) NULL DEFAULT NULL COMMENT '数据源ID',
......
......@@ -524,4 +524,10 @@ ALTER TABLE `dlink_task`
update dlink_task set dialect = 'FlinkJar' where jar_id is not null;
update dlink_catalogue set type = 'FlinkJar' where task_id in (select id as task_id from dlink_task where jar_id is not null);
-- ----------------------------
-- 0.6.0-SNAPSHOT 2022-02-07
-- ----------------------------
ALTER TABLE `dlink_task`
ADD COLUMN `batch_model` tinyint(1) NULL DEFAULT 0 COMMENT '使用批模式' AFTER `statement_set`;
SET FOREIGN_KEY_CHECKS = 1;
package com.dlink.executor;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableException;
/**
* AbstractBatchExecutor
*
* @author wenmo
* @since 2022/2/7 20:05
*/
public abstract class AbstractBatchExecutor extends Executor{
protected ExecutionEnvironment environment;
public void initEnvironment(){
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public void updateEnvironment(ExecutorSetting executorSetting){
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public JobExecutionResult execute(String jobName) throws Exception {
return environment.execute(jobName);
}
public StreamGraph getStreamGraph(){
throw new TableException("Batch model can't get StreamGraph.");
}
public StreamExecutionEnvironment getStreamExecutionEnvironment(){
return null;
}
public ExecutionConfig getExecutionConfig(){
return environment.getConfig();
}
public boolean parseAndLoadConfiguration(String statement){
return stEnvironment.parseAndLoadConfiguration(statement,getExecutionConfig(),setConfig);
}
}
package com.dlink.executor;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
/**
* AbstractStreamExecutor
*
* @author wenmo
* @since 2022/2/7 20:03
*/
public abstract class AbstractStreamExecutor extends Executor{
protected StreamExecutionEnvironment environment;
public void initEnvironment(){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public void updateEnvironment(ExecutorSetting executorSetting){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public JobExecutionResult execute(String jobName) throws Exception {
return environment.execute(jobName);
}
public StreamGraph getStreamGraph(){
return environment.getStreamGraph();
}
public StreamExecutionEnvironment getStreamExecutionEnvironment(){
return environment;
}
public ExecutionConfig getExecutionConfig(){
return environment.getConfig();
}
public boolean parseAndLoadConfiguration(String statement){
return stEnvironment.parseAndLoadConfiguration(statement,getExecutionConfig(),setConfig);
}
}
package com.dlink.executor;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* AppBatchExecutor
*
* @author wenmo
* @since 2022/2/7 22:14
*/
public class AppBatchExecutor extends AbstractBatchExecutor {
public AppBatchExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = ExecutionEnvironment.createLocalEnvironment();
init();
}
@Override
CustomTableEnvironment createCustomTableEnvironment() {
return CustomBatchTableEnvironmentImpl.create(environment);
}
}
......@@ -8,11 +8,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @author wenmo
* @since 2021/11/18
*/
public class AppStreamExecutor extends Executor{
public class AppStreamExecutor extends AbstractStreamExecutor{
public AppStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
init();
}
@Override
CustomTableEnvironment createCustomTableEnvironment() {
return CustomTableEnvironmentImpl.create(environment);
}
}
......@@ -18,6 +18,7 @@ public class EnvironmentSetting {
private String host;
private int port;
private boolean useRemote;
public static final EnvironmentSetting LOCAL = new EnvironmentSetting(false);
public EnvironmentSetting(boolean useRemote) {
......
......@@ -6,6 +6,8 @@ import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -33,7 +35,7 @@ import java.util.Map;
public abstract class Executor {
protected StreamExecutionEnvironment environment;
protected CustomTableEnvironmentImpl stEnvironment;
protected CustomTableEnvironment stEnvironment;
protected EnvironmentSetting environmentSetting;
protected ExecutorSetting executorSetting;
protected Map<String,Object> setConfig = new HashMap<>();
......@@ -62,23 +64,39 @@ public abstract class Executor {
}
public static Executor buildLocalExecutor(ExecutorSetting executorSetting){
if(executorSetting.isUseBatchModel()){
return new LocalBatchExecutor(executorSetting);
}else{
return new LocalStreamExecutor(executorSetting);
}
}
public static Executor buildAppStreamExecutor(ExecutorSetting executorSetting){
if(executorSetting.isUseBatchModel()){
return new AppBatchExecutor(executorSetting);
}else{
return new AppStreamExecutor(executorSetting);
}
}
public static Executor buildRemoteExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
environmentSetting.setUseRemote(true);
if(executorSetting.isUseBatchModel()){
return new RemoteBatchExecutor(environmentSetting,executorSetting);
}else{
return new RemoteStreamExecutor(environmentSetting,executorSetting);
}
}
public abstract ExecutionConfig getExecutionConfig();
public abstract StreamExecutionEnvironment getStreamExecutionEnvironment();
public StreamExecutionEnvironment getEnvironment(){
return environment;
}
public CustomTableEnvironmentImpl getCustomTableEnvironmentImpl(){
public CustomTableEnvironment getCustomTableEnvironment(){
return stEnvironment;
}
......@@ -108,35 +126,15 @@ public abstract class Executor {
updateStreamExecutionEnvironment(executorSetting);
}
private void initEnvironment(){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public abstract void initEnvironment();
private void updateEnvironment(ExecutorSetting executorSetting){
if(executorSetting.getCheckpoint()!=null&&executorSetting.getCheckpoint()>0){
environment.enableCheckpointing(executorSetting.getCheckpoint());
}
if(executorSetting.getParallelism()!=null&&executorSetting.getParallelism()>0){
environment.setParallelism(executorSetting.getParallelism());
}
if(executorSetting.getConfig()!=null) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
environment.getConfig().configure(configuration, null);
}
}
public abstract void updateEnvironment(ExecutorSetting executorSetting);
abstract CustomTableEnvironment createCustomTableEnvironment();
private void initStreamExecutionEnvironment(){
useSqlFragment = executorSetting.isUseSqlFragment();
stEnvironment = CustomTableEnvironmentImpl.create(environment);
stEnvironment = createCustomTableEnvironment();
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString(PipelineOptions.NAME.key(), executorSetting.getJobName());
}
......@@ -164,7 +162,7 @@ public abstract class Executor {
private void copyCatalog(){
String[] catalogs = stEnvironment.listCatalogs();
CustomTableEnvironmentImpl newstEnvironment = CustomTableEnvironmentImpl.create(environment);
CustomTableEnvironment newstEnvironment = createCustomTableEnvironment();
for (int i = 0; i < catalogs.length; i++) {
if(stEnvironment.getCatalog(catalogs[i]).isPresent()) {
newstEnvironment.getCatalogManager().unregisterCatalog(catalogs[i],true);
......@@ -182,6 +180,8 @@ public abstract class Executor {
return !FlinkInterceptor.build(this,statement);
}
public abstract JobExecutionResult execute(String jobName) throws Exception ;
public TableResult executeSql(String statement){
statement = pretreatStatement(statement);
if(pretreatExecute(statement)) {
......@@ -233,11 +233,13 @@ public abstract class Executor {
}
}
public abstract StreamGraph getStreamGraph();
public ObjectNode getStreamGraphFromDataStream(List<String> statements){
for(String statement : statements){
executeSql(statement);
}
StreamGraph streamGraph = environment.getStreamGraph();
StreamGraph streamGraph = getStreamGraph();
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
......@@ -259,17 +261,17 @@ public abstract class Executor {
for(String statement : statements){
executeSql(statement);
}
StreamGraph streamGraph = environment.getStreamGraph();
StreamGraph streamGraph = getStreamGraph();
return new JobPlanInfo(JsonPlanGenerator.generatePlan(streamGraph.getJobGraph()));
}
public void registerFunction(String name, ScalarFunction function){
/*public void registerFunction(String name, ScalarFunction function){
stEnvironment.registerFunction(name,function);
}
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2){
stEnvironment.createTemporarySystemFunction(name,var2);
}
}*/
public CatalogManager getCatalogManager(){
return stEnvironment.getCatalogManager();
......@@ -307,7 +309,5 @@ public abstract class Executor {
executeStatementSet(statements);
}
public boolean parseAndLoadConfiguration(String statement){
return stEnvironment.parseAndLoadConfiguration(statement,environment,setConfig);
}
public abstract boolean parseAndLoadConfiguration(String statement);
}
......@@ -21,6 +21,7 @@ import java.util.Map;
@Getter
public class ExecutorSetting {
private boolean useBatchModel = false;
private Integer checkpoint;
private Integer parallelism;
private boolean useSqlFragment;
......@@ -74,17 +75,19 @@ public class ExecutorSetting {
this.config = config;
}
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment,boolean useStatementSet, String savePointPath, String jobName, Map<String, String> config) {
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment,boolean useStatementSet,
boolean useBatchModel, String savePointPath, String jobName, Map<String, String> config) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.useStatementSet = useStatementSet;
this.useBatchModel = useBatchModel;
this.savePointPath = savePointPath;
this.jobName = jobName;
this.config = config;
}
public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boolean useSqlFragment,boolean useStatementSet, String savePointPath, String jobName, String configJson){
public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boolean useSqlFragment,boolean useStatementSet,boolean useBatchModel, String savePointPath, String jobName, String configJson){
List<Map<String,String>> configList = new ArrayList<>();
if(Asserts.isNotNullString(configJson)) {
try {
......@@ -97,7 +100,7 @@ public class ExecutorSetting {
for(Map<String,String> item : configList){
config.put(item.get("key"),item.get("value"));
}
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,useStatementSet,savePointPath,jobName,config);
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,useStatementSet,useBatchModel,savePointPath,jobName,config);
}
public static ExecutorSetting build(Map<String,String> settingMap){
......@@ -113,6 +116,7 @@ public class ExecutorSetting {
parallelism,
"1".equals(settingMap.get("useSqlFragment")),
"1".equals(settingMap.get("useStatementSet")),
"1".equals(settingMap.get("useBatchModel")),
settingMap.get("savePointPath"),
settingMap.get("jobName"),
settingMap.get("config"));
......
package com.dlink.executor;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
/**
* LocalBatchExecutor
*
* @author wenmo
* @since 2022/2/4 0:04
*/
public class LocalBatchExecutor extends AbstractBatchExecutor {
public LocalBatchExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = ExecutionEnvironment.createLocalEnvironment();
init();
}
@Override
CustomTableEnvironment createCustomTableEnvironment() {
return CustomBatchTableEnvironmentImpl.create(environment);
}
}
......@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @author wenmo
* @since 2021/5/25 13:48
**/
public class LocalStreamExecutor extends Executor {
public class LocalStreamExecutor extends AbstractStreamExecutor {
public LocalStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
......@@ -16,4 +16,8 @@ public class LocalStreamExecutor extends Executor {
init();
}
@Override
CustomTableEnvironment createCustomTableEnvironment() {
return CustomTableEnvironmentImpl.create(environment);
}
}
package com.dlink.executor;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* RemoteBatchExecutor
*
* @author wenmo
* @since 2022/2/7 22:10
*/
public class RemoteBatchExecutor extends AbstractBatchExecutor {
public RemoteBatchExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting;
this.environment = ExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
init();
}
@Override
CustomTableEnvironment createCustomTableEnvironment() {
return CustomBatchTableEnvironmentImpl.create(environment);
}
}
......@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @author wenmo
* @since 2021/5/25 14:05
**/
public class RemoteStreamExecutor extends Executor {
public class RemoteStreamExecutor extends AbstractStreamExecutor {
public RemoteStreamExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
......@@ -17,4 +17,8 @@ public class RemoteStreamExecutor extends Executor {
init();
}
@Override
CustomTableEnvironment createCustomTableEnvironment() {
return CustomTableEnvironmentImpl.create(environment);
}
}
......@@ -37,7 +37,7 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
@Override
public void build(Executor executor) {
AggTable aggTable = AggTable.build(statement);
Table source = executor.getCustomTableEnvironmentImpl().sqlQuery("select * from "+ aggTable.getTable());
Table source = executor.getCustomTableEnvironment().sqlQuery("select * from "+ aggTable.getTable());
List<String> wheres = aggTable.getWheres();
if(wheres!=null&&wheres.size()>0) {
for (String s : wheres) {
......@@ -47,6 +47,6 @@ public class CreateAggTableOperation extends AbstractOperation implements Operat
Table sink = source.groupBy(aggTable.getGroupBy())
.flatAggregate(aggTable.getAggBy())
.select(aggTable.getColumns());
executor.getCustomTableEnvironmentImpl().registerTable(aggTable.getName(), sink);
executor.getCustomTableEnvironment().registerTable(aggTable.getName(), sink);
}
}
......@@ -40,7 +40,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
,cdcSource.getPassword(),cdcSource.getCheckpoint(),cdcSource.getParallelism(),cdcSource.getDatabase(),cdcSource.getTable()
,cdcSource.getTopic(),cdcSource.getBrokers());
try {
FlinkCDCMergeBuilder.buildMySqlCDC(executor.getEnvironment(),config);
FlinkCDCMergeBuilder.buildMySqlCDC(executor.getStreamExecutionEnvironment(),config);
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -52,10 +52,10 @@ public class SetOperation extends AbstractOperation implements Operation {
if(Asserts.isNotNullMap(map)&&map.size()==2) {
Map<String, String> confMap = new HashMap<>();
confMap.put(StringUtils.join(map.get("SET"), "."), StringUtils.join(map.get("="), ","));
executor.getCustomTableEnvironmentImpl().getConfig().addConfiguration(Configuration.fromMap(confMap));
executor.getCustomTableEnvironment().getConfig().addConfiguration(Configuration.fromMap(confMap));
Configuration configuration = Configuration.fromMap(confMap);
executor.getEnvironment().getConfig().configure(configuration,null);
executor.getCustomTableEnvironmentImpl().getConfig().addConfiguration(configuration);
executor.getExecutionConfig().configure(configuration,null);
executor.getCustomTableEnvironment().getConfig().addConfiguration(configuration);
}
}
}
......@@ -14,7 +14,7 @@ const {Text} = Typography;
const StudioSetting = (props: any) => {
const {sessionCluster, clusterConfiguration, current, form, dispatch, tabs, currentSession, jars,env, toolHeight} = props;
const {sessionCluster, clusterConfiguration, current, form, dispatch, tabs, currentSession, env, toolHeight} = props;
const getClusterOptions = () => {
const itemList = [];
......@@ -209,6 +209,17 @@ const StudioSetting = (props: any) => {
</Form.Item>
</Col>
</Row>
<Row>
<Col span={12}>
<Form.Item
label="批模式" className={styles.form_item} name="batchModel" valuePropName="checked"
tooltip={{title: '使用批模式', icon: <InfoCircleOutlined/>}}
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
/>
</Form.Item>
</Col>
</Row>
<Form.Item
label="SavePoint策略" className={styles.form_item} name="savePointStrategy"
tooltip='指定 SavePoint策略,默认为禁用'
......
......@@ -70,6 +70,7 @@ export type TaskType = {
parallelism?: number,
fragment?: boolean,
statementSet?: boolean,
batchModel?: boolean,
config?: [],
clusterId?: any,
clusterName?: string,
......@@ -355,7 +356,7 @@ const Model: ModelType = {
const {deleteType, current} = payload;
const newTabs = state.tabs;
const firstKey = newTabs.panes[0].key;
const newCurrent = newTabs.panes[0];
let newCurrent = newTabs.panes[0];
if (deleteType == 'CLOSE_OTHER') {
const keys = [firstKey, current.key];
newCurrent = current;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment