Commit 1001549f authored by wenmo's avatar wenmo

dlink-app改进

parent 9f177ae7
package com.dlink.service; package com.dlink.service;
import com.dlink.common.result.Result;
import com.dlink.db.service.ISuperService; import com.dlink.db.service.ISuperService;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.model.Task; import com.dlink.model.Task;
import com.dlink.result.SubmitResult;
/** /**
* 作业 服务类 * 作业 服务类
...@@ -17,8 +15,6 @@ public interface TaskService extends ISuperService<Task> { ...@@ -17,8 +15,6 @@ public interface TaskService extends ISuperService<Task> {
JobResult submitByTaskId(Integer id); JobResult submitByTaskId(Integer id);
// Result submitApplicationByTaskId(Integer id);
Task getTaskInfoById(Integer id); Task getTaskInfoById(Integer id);
boolean saveOrUpdateTask(Task task); boolean saveOrUpdateTask(Task task);
......
...@@ -36,6 +36,19 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -36,6 +36,19 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Autowired @Autowired
private ClusterConfigurationService clusterConfigurationService; private ClusterConfigurationService clusterConfigurationService;
@Value("${spring.datasource.driver-class-name}")
private String driver;
@Value("${spring.datasource.url}")
private String url;
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
private String buildParas(Integer id) {
return "--id " + id + " --driver " + driver + " --url " + url + " --username " + username + " --password " + password;
}
@Override @Override
public JobResult submitByTaskId(Integer id) { public JobResult submitByTaskId(Integer id) {
Task task = this.getById(id); Task task = this.getById(id);
...@@ -43,15 +56,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -43,15 +56,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Statement statement = statementService.getById(id); Statement statement = statementService.getById(id);
Assert.check(statement); Assert.check(statement);
JobConfig config = task.buildSubmitConfig(); JobConfig config = task.buildSubmitConfig();
if(!JobManager.useGateway(config.getType())) { if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId())); config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
}else{ } else {
Map<String, String> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId()); Map<String, String> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
if("yarn-application".equals(config.getType())||"ya".equals(config.getType())){ if ("yarn-application".equals(config.getType()) || "ya".equals(config.getType())) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances(); SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath",systemConfiguration.getSqlSubmitJarPath()); gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas",systemConfiguration.getSqlSubmitJarParas() + config.getTaskId()); gatewayConfig.put("userJarParas", systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarMainAppClass",systemConfiguration.getSqlSubmitJarMainAppClass()); gatewayConfig.put("userJarMainAppClass", systemConfiguration.getSqlSubmitJarMainAppClass());
} }
config.buildGatewayConfig(gatewayConfig); config.buildGatewayConfig(gatewayConfig);
} }
...@@ -59,34 +72,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -59,34 +72,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return jobManager.executeSql(statement.getStatement()); return jobManager.executeSql(statement.getStatement());
} }
/*@Override
public Result submitApplicationByTaskId(Integer id) {
Task task = this.getById(id);
Assert.check(task);
Statement statement = statementService.getById(id);
Assert.check(statement);
JobConfig config = task.buildSubmitConfig();
GatewayConfig gatewayConfig = new GatewayConfig();
gatewayConfig.getFlinkConfig().setJobName(config.getJobName());
gatewayConfig.setType(GatewayType.YARN_PER_JOB);
ClusterConfig clusterConfig = ClusterConfig.build(
"/opt/src/flink-1.12.2_pj/conf",
"/opt/src/flink-1.12.2_pj/conf",
"/usr/local/hadoop/hadoop-2.7.7/etc/hadoop/yarn-site.xml");
gatewayConfig.setClusterConfig(clusterConfig);
JobManager jobManager = JobManager.build(config);
SubmitResult result = jobManager.submitGraph(statement.getStatement(), gatewayConfig);
return Result.succeed(result,"提交成功");
}*/
@Override @Override
public Task getTaskInfoById(Integer id) { public Task getTaskInfoById(Integer id) {
Task task = this.getById(id); Task task = this.getById(id);
if (task != null) { if (task != null) {
Statement statement = statementService.getById(id); Statement statement = statementService.getById(id);
if(task.getClusterId()!=null) { if (task.getClusterId() != null) {
Cluster cluster = clusterService.getById(task.getClusterId()); Cluster cluster = clusterService.getById(task.getClusterId());
if(cluster!=null){ if (cluster != null) {
task.setClusterName(cluster.getAlias()); task.setClusterName(cluster.getAlias());
} }
} }
...@@ -108,13 +101,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -108,13 +101,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
statementService.updateById(statement); statementService.updateById(statement);
} }
} else { } else {
if(task.getCheckPoint()==null){ if (task.getCheckPoint() == null) {
task.setCheckPoint(0); task.setCheckPoint(0);
} }
if(task.getParallelism()==null){ if (task.getParallelism() == null) {
task.setParallelism(1); task.setParallelism(1);
} }
if(task.getClusterId()==null){ if (task.getClusterId() == null) {
task.setClusterId(0); task.setClusterId(0);
} }
this.save(task); this.save(task);
......
...@@ -22,19 +22,19 @@ ...@@ -22,19 +22,19 @@
</properties> </properties>
<dependencies> <dependencies>
<!-- Apache Flink dependencies --> <!-- Apache Flink dependencies -->
<dependency> <!--<dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId> <artifactId>flink-core</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>-->
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId> <artifactId>flink-java</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <!--<dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId> <artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
...@@ -56,8 +56,8 @@ ...@@ -56,8 +56,8 @@
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>-->
<dependency> <!--<dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<version>1.7.25</version> <version>1.7.25</version>
...@@ -74,7 +74,7 @@ ...@@ -74,7 +74,7 @@
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
<version>${log4j.version}</version> <version>${log4j.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>-->
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
...@@ -82,6 +82,11 @@ ...@@ -82,6 +82,11 @@
<scope>provided</scope> <scope>provided</scope>
<version>8.0.21</version> <version>8.0.21</version>
</dependency> </dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId> <artifactId>dlink-executor</artifactId>
......
...@@ -8,16 +8,11 @@ import com.dlink.executor.ExecutorSetting; ...@@ -8,16 +8,11 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import org.apache.flink.table.api.StatementSet;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** /**
* FlinkSQLFactory * FlinkSQLFactory
...@@ -76,7 +71,7 @@ public class FlinkSQLFactory { ...@@ -76,7 +71,7 @@ public class FlinkSQLFactory {
public static void submit(Integer id,DBConfig dbConfig){ public static void submit(Integer id,DBConfig dbConfig){
List<String> statements = FlinkSQLFactory.getStatements(Integer.valueOf(id), dbConfig); List<String> statements = FlinkSQLFactory.getStatements(Integer.valueOf(id), dbConfig);
ExecutorSetting executorSetting = ExecutorSetting.build(FlinkSQLFactory.getTaskConfig(Integer.valueOf(id),dbConfig)); ExecutorSetting executorSetting = ExecutorSetting.build(FlinkSQLFactory.getTaskConfig(Integer.valueOf(id),dbConfig));
Executor executor = Executor.buildLocalExecutor(executorSetting); Executor executor = Executor.buildAppStreamExecutor(executorSetting);
List<StatementParam> ddl = new ArrayList<>(); List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>(); List<StatementParam> trans = new ArrayList<>();
for (String item : statements) { for (String item : statements) {
...@@ -96,19 +91,15 @@ public class FlinkSQLFactory { ...@@ -96,19 +91,15 @@ public class FlinkSQLFactory {
} }
if(executorSetting.isUseStatementSet()) { if(executorSetting.isUseStatementSet()) {
List<String> inserts = new ArrayList<>(); List<String> inserts = new ArrayList<>();
StatementSet statementSet = executor.createStatementSet();
for (StatementParam item : trans) { for (StatementParam item : trans) {
if(item.getType().equals(SqlType.INSERT)) { if(item.getType().equals(SqlType.INSERT)) {
statementSet.addInsertSql(item.getValue());
inserts.add(item.getValue()); inserts.add(item.getValue());
} }
} }
if(inserts.size()>0) { executor.submitStatementSet(inserts);
statementSet.execute();
}
}else{ }else{
for (StatementParam item : trans) { for (StatementParam item : trans) {
executor.executeSql(item.getValue()); executor.submitSql(item.getValue());
break; break;
} }
} }
......
package com.dlink.executor.custom;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import java.lang.reflect.Method;
import java.util.Map;
/**
* 定制TableEnvironmentImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
protected CustomTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean isStreamingMode, ClassLoader userClassLoader) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
}
static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return create(executionEnvironment, settings, new TableConfig());
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {
if (!settings.isStreamingMode()) {
throw new TableException("StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
} else {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = (ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new CustomTableEnvironmentImpl(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode(), classLoader);
}
}
private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
} catch (Exception var4) {
throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", var4);
}
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
}
public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
}
public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
}
package com.dlink.executor.custom;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
...@@ -2,8 +2,6 @@ package com.dlink.model; ...@@ -2,8 +2,6 @@ package com.dlink.model;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
/** /**
* SystemConfiguration * SystemConfiguration
* *
...@@ -28,7 +26,7 @@ public class SystemConfiguration { ...@@ -28,7 +26,7 @@ public class SystemConfiguration {
"sqlSubmitJarParas", "sqlSubmitJarParas",
"FlinkSQL提交Jar参数", "FlinkSQL提交Jar参数",
ValueType.STRING, ValueType.STRING,
"--id ", "",
"用于指定Applcation模式提交FlinkSQL的Jar的参数" "用于指定Applcation模式提交FlinkSQL的Jar的参数"
); );
private Configuration sqlSubmitJarMainAppClass = new Configuration( private Configuration sqlSubmitJarMainAppClass = new Configuration(
......
package com.dlink.executor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* AppStreamExecutor
*
* @author wenmo
* @since 2021/11/18
*/
public class AppStreamExecutor extends Executor{
public AppStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
init();
}
}
...@@ -57,6 +57,10 @@ public abstract class Executor { ...@@ -57,6 +57,10 @@ public abstract class Executor {
return new LocalStreamExecutor(executorSetting); return new LocalStreamExecutor(executorSetting);
} }
public static Executor buildAppStreamExecutor(ExecutorSetting executorSetting){
return new AppStreamExecutor(executorSetting);
}
public static Executor buildRemoteExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){ public static Executor buildRemoteExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
environmentSetting.setUseRemote(true); environmentSetting.setUseRemote(true);
return new RemoteStreamExecutor(environmentSetting,executorSetting); return new RemoteStreamExecutor(environmentSetting,executorSetting);
...@@ -207,4 +211,20 @@ public abstract class Executor { ...@@ -207,4 +211,20 @@ public abstract class Executor {
public StatementSet createStatementSet(){ public StatementSet createStatementSet(){
return stEnvironment.createStatementSet(); return stEnvironment.createStatementSet();
} }
public TableResult executeStatementSet(List<String> statements){
StatementSet statementSet = stEnvironment.createStatementSet();
for (String item : statements) {
statementSet.addInsertSql(item);
}
return statementSet.execute();
}
public void submitSql(String statements){
executeSql(statements);
}
public void submitStatementSet(List<String> statements){
executeStatementSet(statements);
}
} }
...@@ -364,7 +364,7 @@ export default (): React.ReactNode => { ...@@ -364,7 +364,7 @@ export default (): React.ReactNode => {
<Link>更新 dlink 的 flink 主版本号为 1.13.3</Link> <Link>更新 dlink 的 flink 主版本号为 1.13.3</Link>
</li> </li>
<li> <li>
<Link>新增 yarn-application 的作业提交方式</Link> <Link>新增 yarn-application 的sql作业提交方式</Link>
</li> </li>
<li> <li>
<Link>新增 yarn-perjob 的作业提交方式</Link> <Link>新增 yarn-perjob 的作业提交方式</Link>
...@@ -381,6 +381,12 @@ export default (): React.ReactNode => { ...@@ -381,6 +381,12 @@ export default (): React.ReactNode => {
<li> <li>
<Link>executor 模块独立并优化增强逻辑</Link> <Link>executor 模块独立并优化增强逻辑</Link>
</li> </li>
<li>
<Link>新增系统配置管理</Link>
</li>
<li>
<Link>新增 yarn-application 的sql作业提交方式</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment