Commit 28c5e425 authored by godkaikai's avatar godkaikai

application改进

parent 1001549f
package com.dlink.app; package com.dlink.app;
import com.dlink.app.db.DBConfig; import com.dlink.app.db.DBConfig;
import com.dlink.app.flinksql.FlinkSQLFactory; import com.dlink.app.flinksql.Submiter;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import java.io.IOException; import java.io.IOException;
...@@ -16,12 +16,11 @@ import java.time.LocalDateTime; ...@@ -16,12 +16,11 @@ import java.time.LocalDateTime;
public class MainApp { public class MainApp {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
System.out.println(LocalDateTime.now() + "任务开始");
ParameterTool parameters = ParameterTool.fromArgs(args); ParameterTool parameters = ParameterTool.fromArgs(args);
String id = parameters.get("id", null); String id = parameters.get("id", null);
if (id!=null&&!"".equals(id)) { if (id!=null&&!"".equals(id)) {
DBConfig dbConfig = DBConfig.build(parameters); DBConfig dbConfig = DBConfig.build(parameters);
FlinkSQLFactory.submit(Integer.valueOf(id),dbConfig); Submiter.submit(Integer.valueOf(id),dbConfig);
} }
} }
} }
...@@ -8,6 +8,8 @@ import com.dlink.executor.ExecutorSetting; ...@@ -8,6 +8,8 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
...@@ -20,7 +22,9 @@ import java.util.*; ...@@ -20,7 +22,9 @@ import java.util.*;
* @author wenmo * @author wenmo
* @since 2021/10/27 * @since 2021/10/27
**/ **/
public class FlinkSQLFactory { public class Submiter {
private static final Logger logger = LoggerFactory.getLogger(Submiter.class);
private static String getQuerySQL(Integer id) throws SQLException { private static String getQuerySQL(Integer id) throws SQLException {
if (id == null) { if (id == null) {
...@@ -44,9 +48,9 @@ public class FlinkSQLFactory { ...@@ -44,9 +48,9 @@ public class FlinkSQLFactory {
statement = DBUtil.getOneByID(getQuerySQL(id),config); statement = DBUtil.getOneByID(getQuerySQL(id),config);
} catch (IOException | SQLException e) { } catch (IOException | SQLException e) {
e.printStackTrace(); e.printStackTrace();
System.err.println(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 异常,ID 为"+ id ); logger.error(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 异常,ID 为"+ id );
System.err.println(LocalDateTime.now().toString() + "连接信息为:"+ config.toString() ); logger.error(LocalDateTime.now().toString() + "连接信息为:"+ config.toString() );
System.err.println(LocalDateTime.now().toString() + "异常信息为:"+ e.getMessage() ); logger.error(LocalDateTime.now().toString() + "异常信息为:"+ e.getMessage() );
} }
return statement; return statement;
} }
...@@ -54,12 +58,12 @@ public class FlinkSQLFactory { ...@@ -54,12 +58,12 @@ public class FlinkSQLFactory {
public static Map<String,String> getTaskConfig(Integer id, DBConfig config) { public static Map<String,String> getTaskConfig(Integer id, DBConfig config) {
Map<String,String> task = new HashMap<>(); Map<String,String> task = new HashMap<>();
try { try {
task = DBUtil.getMapByID(getQuerySQL(id),config); task = DBUtil.getMapByID(getTaskInfo(id),config);
} catch (IOException | SQLException e) { } catch (IOException | SQLException e) {
e.printStackTrace(); e.printStackTrace();
System.err.println(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 配置异常,ID 为"+ id ); logger.error(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 配置异常,ID 为"+ id );
System.err.println(LocalDateTime.now().toString() + "连接信息为:"+ config.toString() ); logger.error(LocalDateTime.now().toString() + "连接信息为:"+ config.toString() );
System.err.println(LocalDateTime.now().toString() + "异常信息为:"+ e.getMessage() ); logger.error(LocalDateTime.now().toString() + "异常信息为:"+ e.getMessage() );
} }
return task; return task;
} }
...@@ -69,8 +73,10 @@ public class FlinkSQLFactory { ...@@ -69,8 +73,10 @@ public class FlinkSQLFactory {
} }
public static void submit(Integer id,DBConfig dbConfig){ public static void submit(Integer id,DBConfig dbConfig){
List<String> statements = FlinkSQLFactory.getStatements(Integer.valueOf(id), dbConfig); logger.info(LocalDateTime.now() + "开始提交作业 -- "+id);
ExecutorSetting executorSetting = ExecutorSetting.build(FlinkSQLFactory.getTaskConfig(Integer.valueOf(id),dbConfig)); List<String> statements = Submiter.getStatements(id, dbConfig);
ExecutorSetting executorSetting = ExecutorSetting.build(Submiter.getTaskConfig(id,dbConfig));
logger.info("作业配置如下: "+executorSetting.toString());
Executor executor = Executor.buildAppStreamExecutor(executorSetting); Executor executor = Executor.buildAppStreamExecutor(executorSetting);
List<StatementParam> ddl = new ArrayList<>(); List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>(); List<StatementParam> trans = new ArrayList<>();
...@@ -89,6 +95,11 @@ public class FlinkSQLFactory { ...@@ -89,6 +95,11 @@ public class FlinkSQLFactory {
ddl.add(new StatementParam(statement, operationType)); ddl.add(new StatementParam(statement, operationType));
} }
} }
for (StatementParam item : ddl) {
logger.info("正在执行 FlinkSQL: "+item.getValue());
executor.submitSql(item.getValue());
logger.info("执行成功");
}
if(executorSetting.isUseStatementSet()) { if(executorSetting.isUseStatementSet()) {
List<String> inserts = new ArrayList<>(); List<String> inserts = new ArrayList<>();
for (StatementParam item : trans) { for (StatementParam item : trans) {
...@@ -96,13 +107,18 @@ public class FlinkSQLFactory { ...@@ -96,13 +107,18 @@ public class FlinkSQLFactory {
inserts.add(item.getValue()); inserts.add(item.getValue());
} }
} }
logger.info("正在执行 FlinkSQL 语句集: "+String.join(FlinkSQLConstant.SEPARATOR,inserts));
executor.submitStatementSet(inserts); executor.submitStatementSet(inserts);
logger.info("执行成功");
}else{ }else{
for (StatementParam item : trans) { for (StatementParam item : trans) {
logger.info("正在执行 FlinkSQL: "+item.getValue());
executor.submitSql(item.getValue()); executor.submitSql(item.getValue());
logger.info("执行成功");
break; break;
} }
} }
logger.info(LocalDateTime.now() + "任务提交成功");
System.out.println(LocalDateTime.now() + "任务提交成功"); System.out.println(LocalDateTime.now() + "任务提交成功");
} }
} }
...@@ -176,7 +176,7 @@ ...@@ -176,7 +176,7 @@
<directory>${project.parent.basedir}/dlink-app/target</directory> <directory>${project.parent.basedir}/dlink-app/target</directory>
<outputDirectory>jar</outputDirectory> <outputDirectory>jar</outputDirectory>
<includes> <includes>
<include>dlink-app-${project.version}.jar</include> <include>dlink-app-${project.version}-jar-with-dependencies.jar</include>
</includes> </includes>
</fileSet> </fileSet>
</fileSets> </fileSets>
......
...@@ -116,4 +116,17 @@ public class ExecutorSetting { ...@@ -116,4 +116,17 @@ public class ExecutorSetting {
settingMap.get("jobName"), settingMap.get("jobName"),
settingMap.get("config")); settingMap.get("config"));
} }
@Override
public String toString() {
return "ExecutorSetting{" +
"checkpoint=" + checkpoint +
", parallelism=" + parallelism +
", useSqlFragment=" + useSqlFragment +
", useStatementSet=" + useStatementSet +
", savePointPath='" + savePointPath + '\'' +
", jobName='" + jobName + '\'' +
", config=" + config +
'}';
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment