Commit 730e5822 authored by wenmo's avatar wenmo

0.2.2-rc2

parent bea8e111
......@@ -93,7 +93,7 @@ DataLink 开源项目及社区正在建设,希望本项目可以帮助你更
### 最新版本
dlink-0.2.2-rc1
dlink-0.2.2
### 从安装包开始
......
......@@ -47,10 +47,18 @@ public class StudioServiceImpl implements StudioService {
clusterService.updateById(cluster);
}
}
JobManager jobManager = new JobManager(host,studioExecuteDTO.getSession(),studioExecuteDTO.getMaxRowNum());
return jobManager.execute(studioExecuteDTO.getStatement(), new ExecutorSetting(
ExecuteType,studioExecuteDTO.getCheckPoint(),studioExecuteDTO.getParallelism(),
studioExecuteDTO.isFragment(),studioExecuteDTO.getSavePointPath(),studioExecuteDTO.getJobName()));
JobManager jobManager = new JobManager(
host,
studioExecuteDTO.getSession(),
studioExecuteDTO.getMaxRowNum(),
new ExecutorSetting(
ExecuteType,
studioExecuteDTO.getCheckPoint(),
studioExecuteDTO.getParallelism(),
studioExecuteDTO.isFragment(),
studioExecuteDTO.getSavePointPath(),
studioExecuteDTO.getJobName()));
return jobManager.execute(studioExecuteDTO.getStatement());
}
@Override
......@@ -70,10 +78,13 @@ public class StudioServiceImpl implements StudioService {
clusterService.updateById(cluster);
}
}
JobManager jobManager = new JobManager(host,studioDDLDTO.getSession(),1000);
return jobManager.execute(studioDDLDTO.getStatement(), new ExecutorSetting(
ExecuteType));
}
JobManager jobManager = new JobManager(
host,
studioDDLDTO.getSession(),
1000,
new ExecutorSetting(ExecuteType));
return jobManager.execute(studioDDLDTO.getStatement());
}
@Override
public boolean clearSession(String session) {
......
......@@ -47,11 +47,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
cluster.setJobManagerHost(host);
clusterService.updateById(cluster);
}
JobManager jobManager = new JobManager(host);
return jobManager.submit(statement.getStatement(), task.getRemoteExecutorSetting());
JobManager jobManager = new JobManager(host,task.getRemoteExecutorSetting());
return jobManager.submit(statement.getStatement());
}else if(task.getClusterId()==0){
JobManager jobManager = new JobManager();
return jobManager.submit(statement.getStatement(), task.getLocalExecutorSetting());
JobManager jobManager = new JobManager(task.getLocalExecutorSetting());
return jobManager.submit(statement.getStatement());
}else{
throw new BusException("该任务的集群不存在");
}
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.3.0-SANPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>dlink-client-1.13</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--打jar包-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
public final class SqlManager {
private Map<String, String> sqlFragments;
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
sqlFragments = new HashMap<>();
}
/**
* Get names of sql fragments loaded.
*
* @return a list of names of sql fragments loaded
*/
public List<String> listSqlFragments() {
return new ArrayList<>(sqlFragments.keySet());
}
/**
* Registers a fragment of sql under the given name. The sql fragment name must be unique.
*
* @param sqlFragmentName name under which to register the given sql fragment
* @param sqlFragment a fragment of sql to register
* @throws CatalogException if the registration of the sql fragment under the given name failed.
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
}
sqlFragments.put(sqlFragmentName, sqlFragment);
}
/**
* Unregisters a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @param ignoreIfNotExists If false exception will be thrown if the fragment of sql to be
* altered does not exist.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @param sqlFragmentName name under which to unregister the given sql fragment.
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
/**
* Get a fragment of sql under the given name. The sql fragment name must be existed.
*
* @throws CatalogException if the unregistration of the sql fragment under the given name
* failed. But at the moment, with CatalogException, not SqlException
*/
public Map<String, String> getSqlFragment() {
return sqlFragments;
}
public TableResult getSqlFragments() {
List<Row> rows = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("sql fragment name", DataTypes.STRING()))), rows);
}
public Iterator getSqlFragmentsIterator() {
return sqlFragments.entrySet().iterator();
}
public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
}
return environment.fromValues(keys);
}
public boolean checkShowFragments(String sql){
return SHOW_FRAGMENTS.equals(sql.trim().toUpperCase());
}
/**
* Parse some variables under the given sql.
*
* @param statement A sql will be parsed.
* @throws ExpressionParserException if the name of the variable under the given sql failed.
*/
public String parseVariable(String statement) {
if (statement == null || "".equals(statement)) {
return statement;
}
String[] strs = statement.split(";");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i].trim();
if (str.length() == 0) {
continue;
}
if (str.contains(":=")) {
String[] strs2 = str.split(":=");
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(":=") + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
}
} else {
sb.append(replaceVariable(str));
}
}
return sb.toString();
}
/**
* Replace some variables under the given sql.
*
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
}
package com.dlink.executor.custom;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
package com.dlink.result;
import java.util.Date;
/**
* 解释结果
*
* @author wenmo
* @since 2021/6/7 22:06
**/
public class SqlExplainResult {
private Integer index;
private String type;
private String sql;
private String parse;
private String explain;
private String error;
private boolean parseTrue;
private boolean explainTrue;
private Date explainTime;
public Integer getIndex() {
return index;
}
public void setIndex(Integer index) {
this.index = index;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public String getParse() {
return parse;
}
public void setParse(String parse) {
this.parse = parse;
}
public String getExplain() {
return explain;
}
public void setExplain(String explain) {
this.explain = explain;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public boolean isParseTrue() {
return parseTrue;
}
public void setParseTrue(boolean parseTrue) {
this.parseTrue = parseTrue;
}
public boolean isExplainTrue() {
return explainTrue;
}
public void setExplainTrue(boolean explainTrue) {
this.explainTrue = explainTrue;
}
public Date getExplainTime() {
return explainTime;
}
public void setExplainTime(Date explainTime) {
this.explainTime = explainTime;
}
@Override
public String toString() {
return "SqlExplainRecord{" +
"index=" + index +
", type='" + type + '\'' +
", sql='" + sql + '\'' +
", parse='" + parse + '\'' +
", explain='" + explain + '\'' +
", error='" + error + '\'' +
", parseTrue=" + parseTrue +
", explainTrue=" + explainTrue +
", explainTime=" + explainTime +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.*;
/** Helper class for generating a JSON representation from a {@link StreamGraph}. */
@Internal
public class JSONGenerator {
public static final String STEPS = "step_function";
public static final String ID = "id";
public static final String SIDE = "side";
public static final String SHIP_STRATEGY = "ship_strategy";
public static final String PREDECESSORS = "predecessors";
public static final String TYPE = "type";
public static final String PACT = "pact";
public static final String CONTENTS = "contents";
public static final String PARALLELISM = "parallelism";
private StreamGraph streamGraph;
private final ObjectMapper mapper = new ObjectMapper();
public JSONGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
}
public String getJSON() {
return getJSONNode().toPrettyString();
}
public ObjectNode getJSONNode() {
ObjectNode json = mapper.createObjectNode();
ArrayNode nodes = mapper.createArrayNode();
json.put("nodes", nodes);
List<Integer> operatorIDs = new ArrayList<>(streamGraph.getVertexIDs());
Comparator<Integer> operatorIDComparator =
Comparator.comparingInt(
(Integer id) -> streamGraph.getSinkIDs().contains(id) ? 1 : 0)
.thenComparingInt(id -> id);
operatorIDs.sort(operatorIDComparator);
visit(nodes, operatorIDs, new HashMap<>());
return json;
}
private void visit(
ArrayNode jsonArray, List<Integer> toVisit, Map<Integer, Integer> edgeRemapings) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
ObjectNode node = mapper.createObjectNode();
decorateNode(vertexID, node);
if (!streamGraph.getSourceIDs().contains(vertexID)) {
ArrayNode inputs = mapper.createArrayNode();
node.put(PREDECESSORS, inputs);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
Integer mappedID =
(edgeRemapings.keySet().contains(inputID))
? edgeRemapings.get(inputID)
: inputID;
decorateEdge(inputs, inEdge, mappedID);
}
}
jsonArray.add(node);
toVisit.remove(vertexID);
} else {
Integer iterationHead = -1;
for (StreamEdge inEdge : vertex.getInEdges()) {
int operator = inEdge.getSourceId();
if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
iterationHead = operator;
}
}
ObjectNode obj = mapper.createObjectNode();
ArrayNode iterationSteps = mapper.createArrayNode();
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
ArrayNode iterationInputs = mapper.createArrayNode();
obj.put(PREDECESSORS, iterationInputs);
toVisit.remove(iterationHead);
visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
jsonArray.add(obj);
}
if (!toVisit.isEmpty()) {
visit(jsonArray, toVisit, edgeRemapings);
}
}
private void visitIteration(
ArrayNode jsonArray,
List<Integer> toVisit,
int headId,
Map<Integer, Integer> edgeRemapings,
ArrayNode iterationInEdges) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);
// Ignoring head and tail to avoid redundancy
if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
ObjectNode obj = mapper.createObjectNode();
jsonArray.add(obj);
decorateNode(vertexID, obj);
ArrayNode inEdges = mapper.createArrayNode();
obj.put(PREDECESSORS, inEdges);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
if (edgeRemapings.keySet().contains(inputID)) {
decorateEdge(inEdges, inEdge, inputID);
} else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
decorateEdge(iterationInEdges, inEdge, inputID);
}
}
edgeRemapings.put(vertexID, headId);
visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
}
}
private void decorateEdge(ArrayNode inputArray, StreamEdge inEdge, int mappedInputID) {
ObjectNode input = mapper.createObjectNode();
inputArray.add(input);
input.put(ID, mappedInputID);
input.put(SHIP_STRATEGY, inEdge.getPartitioner().toString());
input.put(SIDE, (inputArray.size() == 0) ? "first" : "second");
}
private void decorateNode(Integer vertexID, ObjectNode node) {
StreamNode vertex = streamGraph.getStreamNode(vertexID);
node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
if (streamGraph.getSourceIDs().contains(vertexID)) {
node.put(PACT, "Data Source");
} else if (streamGraph.getSinkIDs().contains(vertexID)) {
node.put(PACT, "Data Sink");
} else {
node.put(PACT, "Operator");
}
node.put(CONTENTS, vertex.getOperatorName());
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}
}
......@@ -11,6 +11,7 @@
<packaging>pom</packaging>
<modules>
<module>dlink-client-1.12</module>
<module>dlink-client-1.13</module>
</modules>
<artifactId>dlink-client</artifactId>
</project>
\ No newline at end of file
......@@ -31,17 +31,17 @@
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -32,11 +32,16 @@ public class JobManager {
private Integer port;
private String sessionId;
private Integer maxRowNum = 100;
private ExecutorSetting executorSetting;
public JobManager() {
}
public JobManager(String host) {
public JobManager(ExecutorSetting executorSetting) {
this.executorSetting=executorSetting;
}
public JobManager(String host,ExecutorSetting executorSetting) {
if (host != null) {
String[] strs = host.split(":");
if (strs.length >= 2) {
......@@ -46,10 +51,11 @@ public class JobManager {
this.flinkHost = strs[0];
this.port = 8081;
}
this.executorSetting=executorSetting;
}
}
public JobManager(String host, String sessionId, Integer maxRowNum) {
public JobManager(String host, String sessionId, Integer maxRowNum,ExecutorSetting executorSetting) {
if (host != null) {
String[] strs = host.split(":");
if (strs.length >= 2) {
......@@ -57,11 +63,12 @@ public class JobManager {
this.port = Integer.parseInt(strs[1]);
} else {
this.flinkHost = strs[0];
this.port = 8081;
this.port = FlinkConstant.PORT;
}
}
this.sessionId = sessionId;
this.maxRowNum = maxRowNum;
this.executorSetting=executorSetting;
}
public JobManager(String flinkHost, Integer port) {
......@@ -76,23 +83,45 @@ public class JobManager {
this.port = port;
}
public RunResult execute(String statement, ExecutorSetting executorSetting) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost, port, executorSetting, executorSetting.getJobName());
Executor executor = null;
private boolean checkSession(){
if(sessionId!=null&&!"".equals(sessionId)){
String[] strs = sessionId.split("_");
if(strs.length>1&&!"".equals(strs[1])){
return true;
}
}
return false;
}
private Executor createExecutor(){
if (executorSetting.isRemote()) {
return Executor.build(new EnvironmentSetting(flinkHost, port), executorSetting);
} else {
return Executor.build(null, executorSetting);
}
}
private Executor createExecutorWithSession(){
Executor executor;
if(checkSession()){
ExecutorEntity executorEntity = SessionPool.get(sessionId);
if (executorEntity != null) {
executor = executorEntity.getExecutor();
} else {
if (executorSetting.isRemote()) {
executor = Executor.build(new EnvironmentSetting(flinkHost, FlinkConstant.PORT), executorSetting);
} else {
executor = Executor.build(null, executorSetting);
}
executor = createExecutor();
SessionPool.push(new ExecutorEntity(sessionId, executor));
}
}else {
executor = createExecutor();
}
return executor;
}
public RunResult execute(String statement) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost, port, executorSetting, executorSetting.getJobName());
Executor executor = createExecutorWithSession();
String[] Statements = statement.split(";");
int currentIndex = 0;
//当前只支持对 show select的操作的结果的数据查询 后期需要可添加
try {
for (String item : Statements) {
currentIndex++;
......@@ -133,24 +162,20 @@ public class JobManager {
return runResult;
}
public SubmitResult submit(String statement, ExecutorSetting executorSetting) {
public SubmitResult submit(String statement) {
if (statement == null || "".equals(statement)) {
return SubmitResult.error("FlinkSql语句不存在");
}
String[] statements = statement.split(FlinkSQLConstant.SEPARATOR);
return submit(Arrays.asList(statements), executorSetting);
return submit(Arrays.asList(statements));
}
public SubmitResult submit(List<String> sqlList, ExecutorSetting executorSetting) {
public SubmitResult submit(List<String> sqlList) {
SubmitResult result = new SubmitResult(sessionId, sqlList, flinkHost, executorSetting.getJobName());
int currentIndex = 0;
try {
if (sqlList != null && sqlList.size() > 0) {
EnvironmentSetting environmentSetting = null;
if (executorSetting.isRemote()) {
environmentSetting = new EnvironmentSetting(flinkHost, port);
}
Executor executor = Executor.build(environmentSetting, executorSetting);
Executor executor = createExecutor();
for (String sqlText : sqlList) {
currentIndex++;
String operationType = Operations.getOperationType(sqlText);
......
......@@ -20,7 +20,8 @@ public class JobManagerTest {
@Test
public void submitJobTest2(){
JobManager jobManager = new JobManager("192.168.123.157",8081,"test2",100);
ExecutorSetting setting = new ExecutorSetting(Executor.REMOTE);
JobManager jobManager = new JobManager("192.168.123.157:8081","test2",100, setting);
String sql1 ="CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
......@@ -48,14 +49,15 @@ public class JobManagerTest {
sqls.add(sql1);
sqls.add(sql2);
sqls.add(sql3);
ExecutorSetting setting = new ExecutorSetting(Executor.REMOTE);
SubmitResult result = jobManager.submit(sqls, setting);
SubmitResult result = jobManager.submit(sqls);
System.out.println(result.isSuccess());
}
@Test
public void executeJobTest(){
JobManager jobManager = new JobManager("192.168.123.157",8081,"test2",100);
ExecutorSetting setting = new ExecutorSetting(Executor.REMOTE,0,1,false,null);
JobManager jobManager = new JobManager("192.168.123.157:8081","test2",100, setting);
String sql1 ="CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
......@@ -84,8 +86,7 @@ public class JobManagerTest {
sqls.add(sql2);
sqls.add(sql3);
String sql = sql1+sql2+sql3;
ExecutorSetting setting = new ExecutorSetting(Executor.REMOTE,0,1,false,null);
RunResult result = jobManager.execute(sql, setting);
RunResult result = jobManager.execute(sql);
System.out.println(result.isSuccess());
}
}
......@@ -22,7 +22,7 @@ const menu = (
const StudioMenu = (props: any) => {
const {tabs,current,currentPath,form,dispatch,monaco} = props;
const {tabs,current,currentPath,form,dispatch} = props;
const execute = () => {
let selection = current.monaco.current.editor.getSelection();
......
......@@ -47,12 +47,13 @@ const StudioSetting = (props: any) => {
let newTabs = tabs;
for(let i=0;i<newTabs.panes.length;i++){
if(newTabs.panes[i].key==newTabs.activeKey){
newTabs.panes[i].task={
...all
};
for(let key in change){
newTabs.panes[i].task[key]=change[key];
}
break;
}
}
dispatch&&dispatch({
type: "Studio/saveTabs",
payload: newTabs,
......@@ -144,7 +145,8 @@ const StudioSetting = (props: any) => {
className={styles.form_item}>
<Select
placeholder="选择会话"
defaultValue='admin'
// defaultValue='admin'
allowClear
dropdownRender={menu => (
<div>
{menu}
......
......@@ -136,13 +136,14 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
closable: true,
path: node.path,
task:{
session:'admin',
session:'',
maxRowNum: 100,
...result.datas
},
console:{
result:[],
}
},
monaco: {},
};
newTabs.activeKey = node.taskId;
newTabs.panes.push(newPane);
......
......@@ -124,7 +124,7 @@ const Model: ModelType = {
fragment: true,
clusterId: 0,
maxRowNum: 100,
session:'admin',
session:'',
alias:'草稿',
},
console:{
......@@ -149,7 +149,7 @@ const Model: ModelType = {
parallelism: 1,
fragment: true,
clusterId: '0',
session:'admin',
session:'',
maxRowNum: 100,
alias:'草稿',
},
......@@ -159,7 +159,7 @@ const Model: ModelType = {
monaco: {},
}],
},
session:['admin'],
session:[],
rightClickMenu:false
},
......
......@@ -205,6 +205,15 @@ export default (): React.ReactNode => {
<li>
<Link href="">增加了新增作业自动定位及打开选项卡的功能</Link>
</li>
<li>
<Link href="">增加了在不选择会话值时自动禁用会话的功能</Link>
</li>
<li>
<Link href="">解决了在修改配置后异步提交任务会将作业识别为草稿的问题</Link>
</li>
<li>
<Link href="">扩展了 Flink Client 1.13</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment