Commit 60b13d25 authored by wenmo's avatar wenmo

新增多模式下的语句集提交

parent c8062706
......@@ -20,6 +20,7 @@ import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -168,6 +169,7 @@ public class JobManager extends RunTime {
return false;
}
@Deprecated
public SubmitResult submit(String statement) {
if (statement == null || "".equals(statement)) {
return SubmitResult.error("FlinkSql语句不存在");
......@@ -176,6 +178,7 @@ public class JobManager extends RunTime {
return submit(Arrays.asList(statements));
}
@Deprecated
public SubmitResult submit(List<String> sqlList) {
SubmitResult result = new SubmitResult(sessionId, sqlList, environmentSetting.getHost(), executorSetting.getJobName());
int currentIndex = 0;
......@@ -231,6 +234,7 @@ public class JobManager extends RunTime {
return result;
}
@Deprecated
public SubmitResult submitGraph(String statement, GatewayConfig gatewayConfig) {
if (statement == null || "".equals(statement)) {
return SubmitResult.error("FlinkSql语句不存在");
......@@ -291,7 +295,8 @@ public class JobManager extends RunTime {
return result;
}
public JobResult executeSql(String statement) {
@Deprecated
public JobResult executeSql2(String statement) {
String address = null;
if(!useGateway){
address = environmentSetting.getAddress();
......@@ -368,6 +373,99 @@ public class JobManager extends RunTime {
return job.getJobResult();
}
public JobResult executeSql(String statement) {
String address = null;
if(!useGateway){
address = environmentSetting.getAddress();
}
Job job = new Job(config,address,
Job.JobStatus.INITIALIZE,statement,executorSetting, LocalDateTime.now(),executor);
JobContextHolder.setJob(job);
job.setType(Operations.getSqlTypeFromStatements(statement));
ready();
String[] statements = statement.split(";");
String currentSql = "";
JobParam jobParam = pretreatStatements(statements);
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
try {
for (StatementParam item : jobParam.getDdl()) {
currentSql = item.getValue();
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) {
executor.executeSql(item.getValue());
}
}
if(config.isUseStatementSet()) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) {
inserts.add(item.getValue());
}
}
currentSql = inserts.toString();
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
InsertResult insertResult = new InsertResult(gatewayResult.getAppId(), true);
job.setResult(insertResult);
job.setJobId(gatewayResult.getAppId());
job.setJobManagerAddress(gatewayResult.getWebURL());
}else{
for (StatementParam item : jobParam.getTrans()) {
currentSql = item.getValue();
if (!FlinkInterceptor.build(stEnvironment, item.getValue())) {
TableResult tableResult = executor.executeSql(item.getValue());
if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
}
if (config.isUseResult()) {
IResult result = ResultBuilder.build(item.getType(), maxRowNum, "", true).getResult(tableResult);
job.setResult(result);
}
}
}
}
job.setEndTime(LocalDateTime.now());
job.setStatus(Job.JobStatus.SUCCESS);
success();
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getStackTrace();
StringBuffer resMsg = new StringBuffer("");
for (StackTraceElement s : trace) {
resMsg.append(" \n " + s + " ");
}
LocalDateTime now = LocalDateTime.now();
job.setEndTime(now);
job.setStatus(Job.JobStatus.FAILED);
String error = now.toString() + ":" + "运行语句:\n" + currentSql + " \n时出现异常:" + e.getMessage() + " \n >>>堆栈信息<<<" + resMsg.toString();
job.setError(error);
failed();
close();
}
close();
return job.getJobResult();
}
private JobParam pretreatStatements(String[] statements){
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
for (String item : statements) {
String statement = SqlUtil.removeNote(item);
if (statement.isEmpty()) {
continue;
}
SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.INSERT)||operationType.equals(SqlType.SELECT)) {
trans.add(new StatementParam(statement,operationType));
if(!config.isUseStatementSet()){
break;
}
}else{
ddl.add(new StatementParam(statement,operationType));
}
}
return new JobParam(ddl,trans);
}
public IResult executeDDL(String statement) {
String[] statements = statement.split(";");
try {
......
package com.dlink.job;
import java.util.List;
/**
* JobParam
*
* @author wenmo
* @since 2021/11/16
*/
public class JobParam {
private List<StatementParam> ddl;
private List<StatementParam> trans;
public JobParam(List<StatementParam> ddl, List<StatementParam> trans) {
this.ddl = ddl;
this.trans = trans;
}
public List<StatementParam> getDdl() {
return ddl;
}
public void setDdl(List<StatementParam> ddl) {
this.ddl = ddl;
}
public List<StatementParam> getTrans() {
return trans;
}
public void setTrans(List<StatementParam> trans) {
this.trans = trans;
}
}
package com.dlink.job;
import com.dlink.parser.SqlType;
/**
* StatementParam
*
* @author wenmo
* @since 2021/11/16
*/
public class StatementParam {
private String value;
private SqlType type;
public StatementParam(String value, SqlType type) {
this.value = value;
this.type = type;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public SqlType getType() {
return type;
}
public void setType(SqlType type) {
this.type = type;
}
}
......@@ -36,4 +36,8 @@ public enum SqlType {
public String getType() {
return type;
}
public boolean equalsValue(String value){
return type.equalsIgnoreCase(value);
}
}
......@@ -17,4 +17,11 @@ public class SqlUtil {
}
return sql.split(FlinkSQLConstant.SEPARATOR);
}
public static String removeNote(String sql){
if(Asserts.isNotNullString(sql)) {
sql = sql.replaceAll("--([^'\\r\\n]{0,}('[^'\\r\\n]{0,}'){0,1}[^'\\r\\n]{0,}){0,}$", "").trim();
}
return sql;
}
}
......@@ -375,6 +375,9 @@ export default (): React.ReactNode => {
<li>
<Link>新增 Jar 管理及维护页面</Link>
</li>
<li>
<Link>新增 FlinkSQL 语句集提交</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment