Unverified Commit f8c751af authored by Kerwin's avatar Kerwin Committed by GitHub

[Optimization][Style] Added dlink-core module code style (#917)

* Added dlink-core module code style.

* fix AbstractTrans error
parent a95ce812
...@@ -17,11 +17,8 @@ ...@@ -17,11 +17,8 @@
* *
*/ */
package com.dlink.api; package com.dlink.api;
import cn.hutool.http.HttpUtil;
import cn.hutool.http.Method;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkRestAPIConstant; import com.dlink.constant.FlinkRestAPIConstant;
import com.dlink.constant.NetConstant; import com.dlink.constant.NetConstant;
...@@ -29,11 +26,19 @@ import com.dlink.gateway.GatewayType; ...@@ -29,11 +26,19 @@ import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.SavePointType; import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*; import cn.hutool.http.HttpUtil;
import cn.hutool.http.Method;
/** /**
* FlinkAPI * FlinkAPI
...@@ -238,6 +243,7 @@ public class FlinkAPI { ...@@ -238,6 +243,7 @@ public class FlinkAPI {
public String getJobManagerStdOut() { public String getJobManagerStdOut() {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.STDOUT); return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.STDOUT);
} }
/** /**
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
...@@ -247,6 +253,7 @@ public class FlinkAPI { ...@@ -247,6 +253,7 @@ public class FlinkAPI {
public JsonNode getJobManagerLogList() { public JsonNode getJobManagerLogList() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS); return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS);
} }
/** /**
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
...@@ -257,6 +264,7 @@ public class FlinkAPI { ...@@ -257,6 +264,7 @@ public class FlinkAPI {
public String getJobManagerLogFileDetail(String logName) { public String getJobManagerLogFileDetail(String logName) {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS + logName); return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS + logName);
} }
/** /**
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
...@@ -278,12 +286,13 @@ public class FlinkAPI { ...@@ -278,12 +286,13 @@ public class FlinkAPI {
JsonNode jsonNode = get(type + FlinkRestAPIConstant.METRICS); JsonNode jsonNode = get(type + FlinkRestAPIConstant.METRICS);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
Iterator<JsonNode> jsonNodeIterator = jsonNode.elements(); Iterator<JsonNode> jsonNodeIterator = jsonNode.elements();
while(jsonNodeIterator.hasNext()) { while (jsonNodeIterator.hasNext()) {
JsonNode node = jsonNodeIterator.next(); JsonNode node = jsonNodeIterator.next();
sb.append(node.get("id").asText()).append(","); sb.append(node.get("id").asText()).append(",");
} }
return sb.deleteCharAt(sb.length() - 1).toString(); return sb.deleteCharAt(sb.length() - 1).toString();
} }
/** /**
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
...@@ -291,9 +300,9 @@ public class FlinkAPI { ...@@ -291,9 +300,9 @@ public class FlinkAPI {
* @return JsonNode * @return JsonNode
*/ */
public JsonNode getTaskManagerMetrics(String containerId) { public JsonNode getTaskManagerMetrics(String containerId) {
JsonNode TaskManagerMetricsJsonNode = get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER)); return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER));
return TaskManagerMetricsJsonNode;
} }
/** /**
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
...@@ -304,6 +313,7 @@ public class FlinkAPI { ...@@ -304,6 +313,7 @@ public class FlinkAPI {
public String getTaskManagerLog(String containerId) { public String getTaskManagerLog(String containerId) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOG); return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOG);
} }
/** /**
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
...@@ -314,6 +324,7 @@ public class FlinkAPI { ...@@ -314,6 +324,7 @@ public class FlinkAPI {
public String getTaskManagerStdOut(String containerId) { public String getTaskManagerStdOut(String containerId) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.STDOUT); return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.STDOUT);
} }
/** /**
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
......
...@@ -17,15 +17,16 @@ ...@@ -17,15 +17,16 @@
* *
*/ */
package com.dlink.cluster; package com.dlink.cluster;
import cn.hutool.core.io.IORuntimeException;
import com.dlink.api.FlinkAPI; import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import cn.hutool.core.io.IORuntimeException;
/** /**
* FlinkCluster * FlinkCluster
* *
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.cluster; package com.dlink.cluster;
import lombok.Getter; import lombok.Getter;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.config; package com.dlink.config;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.constant; package com.dlink.constant;
public interface FlinkHistoryConstant { public interface FlinkHistoryConstant {
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.constant; package com.dlink.constant;
/** /**
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.constant; package com.dlink.constant;
/** /**
......
...@@ -17,13 +17,17 @@ ...@@ -17,13 +17,17 @@
* *
*/ */
package com.dlink.explainer; package com.dlink.explainer;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.explainer.ca.*; import com.dlink.explainer.ca.ColumnCA;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA;
import com.dlink.explainer.ca.TableCAGenerator;
import com.dlink.explainer.ca.TableCAResult;
import com.dlink.explainer.lineage.LineageColumnGenerator; import com.dlink.explainer.lineage.LineageColumnGenerator;
import com.dlink.explainer.lineage.LineageTableGenerator; import com.dlink.explainer.lineage.LineageTableGenerator;
import com.dlink.explainer.trans.Trans; import com.dlink.explainer.trans.Trans;
...@@ -39,8 +43,7 @@ import com.dlink.trans.Operations; ...@@ -39,8 +43,7 @@ import com.dlink.trans.Operations;
import com.dlink.utils.FlinkUtil; import com.dlink.utils.FlinkUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
...@@ -49,6 +52,9 @@ import java.util.ArrayList; ...@@ -49,6 +52,9 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/** /**
* Explainer * Explainer
* *
...@@ -276,7 +282,7 @@ public class Explainer { ...@@ -276,7 +282,7 @@ public class Explainer {
public ObjectNode getStreamGraph(String statement) { public ObjectNode getStreamGraph(String statement) {
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator)); JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
if (jobParam.getDdl().size() > 0) { if (jobParam.getDdl().size() > 0) {
for(StatementParam statementParam: jobParam.getDdl()){ for (StatementParam statementParam: jobParam.getDdl()) {
executor.executeSql(statementParam.getValue()); executor.executeSql(statementParam.getValue());
} }
} }
...@@ -296,7 +302,7 @@ public class Explainer { ...@@ -296,7 +302,7 @@ public class Explainer {
public JobPlanInfo getJobPlanInfo(String statement) { public JobPlanInfo getJobPlanInfo(String statement) {
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator)); JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
if (jobParam.getDdl().size() > 0) { if (jobParam.getDdl().size() > 0) {
for(StatementParam statementParam: jobParam.getDdl()){ for (StatementParam statementParam: jobParam.getDdl()) {
executor.executeSql(statementParam.getValue()); executor.executeSql(statementParam.getValue());
} }
} }
...@@ -453,11 +459,12 @@ public class Explainer { ...@@ -453,11 +459,12 @@ public class Explainer {
for (NodeRel nodeRel : columnCAResult.getColumnCASRelChain()) { for (NodeRel nodeRel : columnCAResult.getColumnCASRelChain()) {
if (nodeRel.getPreId().equals(item.getValue().getId())) { if (nodeRel.getPreId().equals(item.getValue().getId())) {
for (NodeRel nodeRel2 : columnCAResult.getColumnCASRelChain()) { for (NodeRel nodeRel2 : columnCAResult.getColumnCASRelChain()) {
if (columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getSufId()) && columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getPreId()) && if (columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getSufId())
columnCAResult.getColumnCASMaps().containsKey(nodeRel.getSufId()) && && columnCAResult.getColumnCASMaps().containsKey(nodeRel2.getPreId())
columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getTableId().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getTableId()) && && columnCAResult.getColumnCASMaps().containsKey(nodeRel.getSufId())
columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getName().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getName()) && && columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getTableId().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getTableId())
!columnCAResult.getColumnCASMaps().get(nodeRel2.getPreId()).getType().equals("Data Sink")) { && columnCAResult.getColumnCASMaps().get(nodeRel2.getSufId()).getName().equals(columnCAResult.getColumnCASMaps().get(nodeRel.getSufId()).getName())
&& !columnCAResult.getColumnCASMaps().get(nodeRel2.getPreId()).getType().equals("Data Sink")) {
addNodeRels.add(new NodeRel(nodeRel2.getPreId(), nodeRel.getPreId())); addNodeRels.add(new NodeRel(nodeRel2.getPreId(), nodeRel.getPreId()));
} }
} }
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import com.dlink.plus.FlinkSqlPlus; import com.dlink.plus.FlinkSqlPlus;
...@@ -104,7 +103,7 @@ public class CABuilder { ...@@ -104,7 +103,7 @@ public class CABuilder {
for (NodeRel nodeRel : columnCASRel) { for (NodeRel nodeRel : columnCASRel) {
if (columnId.equals(nodeRel.getSufId())) { if (columnId.equals(nodeRel.getSufId())) {
ColumnCA childca = (ColumnCA) result.getColumnCASMaps().get(nodeRel.getPreId()); ColumnCA childca = (ColumnCA) result.getColumnCASMaps().get(nodeRel.getPreId());
// operation = operation.replaceAll(childca.getAlias().replaceAll("\\$","\\\\$"),childca.getOperation()); //operation = operation.replaceAll(childca.getAlias().replaceAll("\\$","\\\\$"),childca.getOperation());
operation = operation.replaceAll(childca.getAlias() operation = operation.replaceAll(childca.getAlias()
.replaceAll("\\)", ""), childca.getOperation()); .replaceAll("\\)", ""), childca.getOperation());
buildColumnCANodeChildren(children, result, nodeRel.getPreId(), operation); buildColumnCANodeChildren(children, result, nodeRel.getPreId(), operation);
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
public interface CAGenerator { public interface CAGenerator {
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import java.util.List; import java.util.List;
...@@ -55,17 +54,6 @@ public class ColumnCA implements ICA { ...@@ -55,17 +54,6 @@ public class ColumnCA implements ICA {
this.type = tableCA.getType(); this.type = tableCA.getType();
} }
/* public ColumnCA(Integer id, List<Integer> parentId, String name, String alias, String columnName, String familyName, String type, TableCA tableCA) {
this.id = id;
this.parentId = parentId;
this.name = name;
this.alias = alias;
this.columnName = columnName;
this.familyName = familyName;
this.type = type;
this.tableCA = tableCA;
}*/
public Integer getId() { public Integer getId() {
return id; return id;
} }
......
...@@ -17,16 +17,25 @@ ...@@ -17,16 +17,25 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.explainer.trans.*; import com.dlink.explainer.trans.Field;
import com.dlink.explainer.trans.OperatorTrans;
import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.SourceTrans;
import com.dlink.explainer.trans.Trans;
import com.dlink.utils.MapParseUtils; import com.dlink.utils.MapParseUtils;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* ColumnCAGenerator * ColumnCAGenerator
...@@ -176,7 +185,7 @@ public class ColumnCAGenerator implements CAGenerator { ...@@ -176,7 +185,7 @@ public class ColumnCAGenerator implements CAGenerator {
} }
if (!isHad) { if (!isHad) {
cid = index++; cid = index++;
// String columnOperation = MapParseUtils.replaceField(operation,columnCA.getAlias(),columnCA.getOperation()); //String columnOperation = MapParseUtils.replaceField(operation,columnCA.getAlias(),columnCA.getOperation());
ColumnCA columnCA2 = new ColumnCA(cid, alias, alias, alias, alias, operation, tableCA); ColumnCA columnCA2 = new ColumnCA(cid, alias, alias, alias, alias, operation, tableCA);
this.columnCASMaps.put(cid, columnCA2); this.columnCASMaps.put(cid, columnCA2);
this.columnCAS.add(columnCA2); this.columnCAS.add(columnCA2);
......
...@@ -17,15 +17,14 @@ ...@@ -17,15 +17,14 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import lombok.Getter;
import lombok.Setter;
/** /**
* ColumnCANode * ColumnCANode
* *
...@@ -43,12 +42,9 @@ public class ColumnCANode implements Serializable { ...@@ -43,12 +42,9 @@ public class ColumnCANode implements Serializable {
private String value; private String value;
private String type; private String type;
private String operation; private String operation;
// private Tables tables;
// private Columns columns;
private List<ColumnCANode> children; private List<ColumnCANode> children;
public ColumnCANode() { public ColumnCANode() {
} }
} }
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import com.dlink.explainer.lineage.LineageColumnGenerator; import com.dlink.explainer.lineage.LineageColumnGenerator;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
public interface ICA { public interface ICA {
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import java.util.Objects; import java.util.Objects;
...@@ -55,11 +54,15 @@ public class NodeRel { ...@@ -55,11 +54,15 @@ public class NodeRel {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NodeRel nodeRel = (NodeRel) o; NodeRel nodeRel = (NodeRel) o;
return Objects.equals(preId, nodeRel.preId) && return Objects.equals(preId, nodeRel.preId)
Objects.equals(sufId, nodeRel.sufId); && Objects.equals(sufId, nodeRel.sufId);
} }
@Override @Override
......
...@@ -17,19 +17,23 @@ ...@@ -17,19 +17,23 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.explainer.trans.*; import com.dlink.explainer.trans.Field;
import lombok.Getter; import com.dlink.explainer.trans.OperatorTrans;
import lombok.Setter; import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.SourceTrans;
import com.dlink.explainer.trans.Trans;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import lombok.Getter;
import lombok.Setter;
/** /**
* TableCA * TableCA
* *
...@@ -135,17 +139,17 @@ public class TableCA implements ICA { ...@@ -135,17 +139,17 @@ public class TableCA implements ICA {
@Override @Override
public String toString() { public String toString() {
return "TableCA{" + return "TableCA{"
"id=" + id + + "id=" + id
", parentId=" + parentId + + ", parentId=" + parentId
", name='" + name + '\'' + + ", name='" + name + '\''
", catalog='" + catalog + '\'' + + ", catalog='" + catalog + '\''
", database='" + database + '\'' + + ", database='" + database + '\''
", table='" + table + '\'' + + ", table='" + table + '\''
", fields=" + fields + + ", fields=" + fields
", useFields=" + useFields + + ", useFields=" + useFields
", parallelism=" + parallelism + + ", parallelism=" + parallelism
'}'; + '}';
} }
@Override @Override
......
...@@ -17,14 +17,24 @@ ...@@ -17,14 +17,24 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.explainer.trans.*; import com.dlink.explainer.trans.Field;
import com.dlink.explainer.trans.OperatorTrans;
import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.SourceTrans;
import com.dlink.explainer.trans.Trans;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
......
...@@ -17,15 +17,14 @@ ...@@ -17,15 +17,14 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import lombok.Getter;
import lombok.Setter;
/** /**
* TableCANode * TableCANode
* *
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.ca; package com.dlink.explainer.ca;
import java.util.List; import java.util.List;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import com.dlink.explainer.ca.ColumnCAResult; import com.dlink.explainer.ca.ColumnCAResult;
...@@ -25,7 +24,12 @@ import com.dlink.explainer.ca.NodeRel; ...@@ -25,7 +24,12 @@ import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA; import com.dlink.explainer.ca.TableCA;
import com.dlink.plus.FlinkSqlPlus; import com.dlink.plus.FlinkSqlPlus;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* LineageBuilder * LineageBuilder
...@@ -51,14 +55,15 @@ public class LineageBuilder { ...@@ -51,14 +55,15 @@ public class LineageBuilder {
} }
Set<String> keySet = new HashSet<>(); Set<String> keySet = new HashSet<>();
for (NodeRel nodeRel : item.getColumnCASRelChain()) { for (NodeRel nodeRel : item.getColumnCASRelChain()) {
if( item.getColumnCASMaps().containsKey(nodeRel.getPreId())&&item.getColumnCASMaps().containsKey(nodeRel.getSufId()) if (item.getColumnCASMaps().containsKey(nodeRel.getPreId())
&& item.getColumnCASMaps().containsKey(nodeRel.getSufId())
&& !item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().equals(item.getColumnCASMaps().get(nodeRel.getSufId()).getTableId())) { && !item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().equals(item.getColumnCASMaps().get(nodeRel.getSufId()).getTableId())) {
String key = item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().toString() + "@" + String key = item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().toString() + "@"
item.getColumnCASMaps().get(nodeRel.getSufId()).getTableId().toString() + "@" + + item.getColumnCASMaps().get(nodeRel.getSufId()).getTableId().toString() + "@"
item.getColumnCASMaps().get(nodeRel.getPreId()).getName() + "@" + + item.getColumnCASMaps().get(nodeRel.getPreId()).getName() + "@"
item.getColumnCASMaps().get(nodeRel.getSufId()).getName(); + item.getColumnCASMaps().get(nodeRel.getSufId()).getName();
//去重 //去重
if(!keySet.contains(key)){ if (!keySet.contains(key)) {
index++; index++;
relations.add(LineageRelation.build(index + "", relations.add(LineageRelation.build(index + "",
item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().toString(), item.getColumnCASMaps().get(nodeRel.getPreId()).getTableId().toString(),
...@@ -87,18 +92,18 @@ public class LineageBuilder { ...@@ -87,18 +92,18 @@ public class LineageBuilder {
} }
//重复表合并 //重复表合并
Map<String,String> correctTableIdMap = new HashMap<>(); Map<String,String> correctTableIdMap = new HashMap<>();
for(List<LineageTable> tableList : repeatTablesList){ for (List<LineageTable> tableList : repeatTablesList) {
LineageTable newTable = new LineageTable(); LineageTable newTable = new LineageTable();
Set<String> columnKeySet = new HashSet<>(); Set<String> columnKeySet = new HashSet<>();
for(LineageTable table: tableList){ for (LineageTable table: tableList) {
if(newTable.getId() == null || newTable.getName() == null){ if (newTable.getId() == null || newTable.getName() == null) {
newTable.setId(table.getId()); newTable.setId(table.getId());
newTable.setName(table.getName()); newTable.setName(table.getName());
newTable.setColumns(new ArrayList<>()); newTable.setColumns(new ArrayList<>());
} }
for(LineageColumn column : table.getColumns()){ for (LineageColumn column : table.getColumns()) {
String key = column.getName() + "@&" + column.getTitle(); String key = column.getName() + "@&" + column.getTitle();
if(!columnKeySet.contains(key)){ if (!columnKeySet.contains(key)) {
newTable.getColumns().add(column); newTable.getColumns().add(column);
columnKeySet.add(key); columnKeySet.add(key);
} }
...@@ -109,11 +114,11 @@ public class LineageBuilder { ...@@ -109,11 +114,11 @@ public class LineageBuilder {
tables.add(newTable); tables.add(newTable);
} }
//关系中id重新指向 //关系中id重新指向
for (LineageRelation relation : relations){ for (LineageRelation relation : relations) {
if(correctTableIdMap.containsKey(relation.getSrcTableId())){ if (correctTableIdMap.containsKey(relation.getSrcTableId())) {
relation.setSrcTableId(correctTableIdMap.get(relation.getSrcTableId())); relation.setSrcTableId(correctTableIdMap.get(relation.getSrcTableId()));
} }
if(correctTableIdMap.containsKey(relation.getTgtTableId())){ if (correctTableIdMap.containsKey(relation.getTgtTableId())) {
relation.setTgtTableId(correctTableIdMap.get(relation.getTgtTableId())); relation.setTgtTableId(correctTableIdMap.get(relation.getTgtTableId()));
} }
} }
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
/** /**
......
...@@ -17,16 +17,8 @@ ...@@ -17,16 +17,8 @@
* *
*/ */
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.explainer.ca.ColumnCA; import com.dlink.explainer.ca.ColumnCA;
import com.dlink.explainer.ca.NodeRel; import com.dlink.explainer.ca.NodeRel;
...@@ -38,6 +30,13 @@ import com.dlink.explainer.trans.SinkTrans; ...@@ -38,6 +30,13 @@ import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.Trans; import com.dlink.explainer.trans.Trans;
import com.dlink.utils.MapParseUtils; import com.dlink.utils.MapParseUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* LineageColumnGenerator * LineageColumnGenerator
* *
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
/** /**
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import java.util.List; import java.util.List;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import com.dlink.explainer.ca.TableCA; import com.dlink.explainer.ca.TableCA;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
......
...@@ -17,16 +17,8 @@ ...@@ -17,16 +17,8 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqlLineage;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLInsertStatement;
import com.alibaba.druid.stat.TableStat;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.explainer.lineage.LineageRelation; import com.dlink.explainer.lineage.LineageRelation;
import com.dlink.explainer.lineage.LineageResult; import com.dlink.explainer.lineage.LineageResult;
...@@ -35,10 +27,23 @@ import com.dlink.metadata.driver.Driver; ...@@ -35,10 +27,23 @@ import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.driver.DriverConfig; import com.dlink.metadata.driver.DriverConfig;
import com.dlink.model.Column; import com.dlink.model.Column;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLInsertStatement;
import com.alibaba.druid.stat.TableStat;
public class LineageBuilder { public class LineageBuilder {
public static LineageResult getSqlLineageByOne(String statement ,String type) { public static LineageResult getSqlLineageByOne(String statement, String type) {
List<LineageTable> tables = new ArrayList<>(); List<LineageTable> tables = new ArrayList<>();
List<LineageRelation> relations = new ArrayList<>(); List<LineageRelation> relations = new ArrayList<>();
try { try {
...@@ -166,7 +171,7 @@ public class LineageBuilder { ...@@ -166,7 +171,7 @@ public class LineageBuilder {
//处理target表中字段 //处理target表中字段
if (columns.size() <= 0 || sqls[n].contains("*")) { if (columns.size() <= 0 || sqls[n].contains("*")) {
Driver driver = Driver.build(driverConfig); Driver driver = Driver.build(driverConfig);
if(!targetTable.contains(".")){ if (!targetTable.contains(".")) {
return null; return null;
} }
List<Column> columns1 = driver.listColumns(targetTable.split("\\.")[0], targetTable.split("\\.")[1]); List<Column> columns1 = driver.listColumns(targetTable.split("\\.")[0], targetTable.split("\\.")[1]);
......
...@@ -17,10 +17,10 @@ ...@@ -17,10 +17,10 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqlLineage;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import lombok.Data; import lombok.Data;
@Data @Data
...@@ -83,16 +83,15 @@ public class LineageColumn implements Comparable<LineageColumn> { ...@@ -83,16 +83,15 @@ public class LineageColumn implements Comparable<LineageColumn> {
private Boolean isEnd = false; private Boolean isEnd = false;
public void setSourceTableName(String sourceTableName) { public void setSourceTableName(String sourceTableName) {
sourceTableName = Asserts.isNotNullString(sourceTableName) ? sourceTableName.replace("`", sourceTableName = Asserts.isNotNullString(sourceTableName) ? sourceTableName.replace("`", "") : sourceTableName;
"") : sourceTableName; if (sourceTableName.contains(" ")) {
if(sourceTableName.contains(" ")){
sourceTableName = sourceTableName.substring(0,sourceTableName.indexOf(" ")); sourceTableName = sourceTableName.substring(0,sourceTableName.indexOf(" "));
} }
if (sourceTableName.contains(".")) { if (sourceTableName.contains(".")) {
if(Asserts.isNullString(this.sourceDbName)){ if (Asserts.isNullString(this.sourceDbName)) {
this.sourceDbName = sourceTableName.substring(0, sourceTableName.indexOf(".")); this.sourceDbName = sourceTableName.substring(0, sourceTableName.indexOf("."));
} }
// this.sourceDbName = sourceTableName.substring(0, sourceTableName.indexOf(".")); //this.sourceDbName = sourceTableName.substring(0, sourceTableName.indexOf("."));
this.sourceTableName = sourceTableName.substring(sourceTableName.indexOf(".") + 1); this.sourceTableName = sourceTableName.substring(sourceTableName.indexOf(".") + 1);
} else { } else {
this.sourceTableName = sourceTableName; this.sourceTableName = sourceTableName;
...@@ -100,12 +99,16 @@ public class LineageColumn implements Comparable<LineageColumn> { ...@@ -100,12 +99,16 @@ public class LineageColumn implements Comparable<LineageColumn> {
} }
public int compareTo(LineageColumn o) { public int compareTo(LineageColumn o) {
if(Asserts.isNotNullString(this.getSourceDbName())&& Asserts.isNotNullString(this.getSourceTableName())){ if (Asserts.isNotNullString(this.getSourceDbName())
if(this.getSourceDbName().equals(o.getSourceDbName())&&this.getSourceTableName().equals(o.getSourceTableName())&&this.getTargetColumnName().equals(o.getTargetColumnName())){ && Asserts.isNotNullString(this.getSourceTableName())) {
if (this.getSourceDbName().equals(o.getSourceDbName())
&& this.getSourceTableName().equals(o.getSourceTableName())
&& this.getTargetColumnName().equals(o.getTargetColumnName())) {
return 0; return 0;
} }
} else if(Asserts.isNotNullString(this.getSourceTableName())){ } else if (Asserts.isNotNullString(this.getSourceTableName())) {
if(this.getSourceTableName().equals(o.getSourceTableName())&&this.getTargetColumnName().equals(o.getTargetColumnName())){ if (this.getSourceTableName().equals(o.getSourceTableName())
&& this.getTargetColumnName().equals(o.getTargetColumnName())) {
return 0; return 0;
} }
} else { } else {
......
...@@ -17,20 +17,37 @@ ...@@ -17,20 +17,37 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqlLineage;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.*;
import com.alibaba.druid.sql.ast.statement.*;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLAggregateExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr;
import com.alibaba.druid.sql.ast.expr.SQLCaseExpr;
import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr;
import com.alibaba.druid.sql.ast.expr.SQLNumberExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQuery;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLSubqueryTableSource;
import com.alibaba.druid.sql.ast.statement.SQLTableSource;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.ast.statement.SQLUnionQueryTableSource;
public class LineageUtils { public class LineageUtils {
public static void columnLineageAnalyzer(String sql, String type, TreeNode<LineageColumn> node) { public static void columnLineageAnalyzer(String sql, String type, TreeNode<LineageColumn> node) {
...@@ -59,8 +76,7 @@ public class LineageUtils { ...@@ -59,8 +76,7 @@ public class LineageUtils {
SQLSelectQueryBlock sqlSelectQueryBlock = (SQLSelectQueryBlock) sqlSelectQuery; SQLSelectQueryBlock sqlSelectQueryBlock = (SQLSelectQueryBlock) sqlSelectQuery;
// 获取字段列表 // 获取字段列表
List<SQLSelectItem> selectItems = sqlSelectQueryBlock.getSelectList(); List<SQLSelectItem> selectItems = sqlSelectQueryBlock.getSelectList();
selectItems.forEach(x -> selectItems.forEach(x -> {
{
// 处理--------------------- // 处理---------------------
String column = Asserts.isNullString(x.getAlias()) ? x.toString() : x.getAlias(); String column = Asserts.isNullString(x.getAlias()) ? x.toString() : x.getAlias();
...@@ -122,8 +138,7 @@ public class LineageUtils { ...@@ -122,8 +138,7 @@ public class LineageUtils {
* @param table * @param table
*/ */
private static void handlerSQLUnionQueryTableSource(TreeNode<LineageColumn> node, SQLUnionQueryTableSource table, String type) { private static void handlerSQLUnionQueryTableSource(TreeNode<LineageColumn> node, SQLUnionQueryTableSource table, String type) {
node.getAllLeafs().stream().filter(e -> !e.getData().getIsEnd()).forEach(e -> node.getAllLeafs().stream().filter(e -> !e.getData().getIsEnd()).forEach(e -> {
{
columnLineageAnalyzer(table.getUnion().toString(), type, e); columnLineageAnalyzer(table.getUnion().toString(), type, e);
}); });
} }
...@@ -135,8 +150,7 @@ public class LineageUtils { ...@@ -135,8 +150,7 @@ public class LineageUtils {
* @param table * @param table
*/ */
private static void handlerSQLSubqueryTableSource(TreeNode<LineageColumn> node, SQLTableSource table, String type) { private static void handlerSQLSubqueryTableSource(TreeNode<LineageColumn> node, SQLTableSource table, String type) {
node.getAllLeafs().stream().filter(e -> !e.getData().getIsEnd()).forEach(e -> node.getAllLeafs().stream().filter(e -> !e.getData().getIsEnd()).forEach(e -> {
{
if (Asserts.isNotNullString(e.getData().getSourceTableName())) { if (Asserts.isNotNullString(e.getData().getSourceTableName())) {
if (e.getData().getSourceTableName().equals(table.getAlias())) { if (e.getData().getSourceTableName().equals(table.getAlias())) {
columnLineageAnalyzer(((SQLSubqueryTableSource) table).getSelect().toString(), type, e); columnLineageAnalyzer(((SQLSubqueryTableSource) table).getSelect().toString(), type, e);
...@@ -157,8 +171,7 @@ public class LineageUtils { ...@@ -157,8 +171,7 @@ public class LineageUtils {
SQLJoinTableSource table, String type) { SQLJoinTableSource table, String type) {
// 处理--------------------- // 处理---------------------
// 子查询作为表 // 子查询作为表
node.getAllLeafs().stream().filter(e -> !e.getData().getIsEnd()).forEach(e -> node.getAllLeafs().stream().filter(e -> !e.getData().getIsEnd()).forEach(e -> {
{
if (table.getLeft() instanceof SQLJoinTableSource) { if (table.getLeft() instanceof SQLJoinTableSource) {
handlerSQLJoinTableSource(node, (SQLJoinTableSource) table.getLeft(), type); handlerSQLJoinTableSource(node, (SQLJoinTableSource) table.getLeft(), type);
} else if (table.getLeft() instanceof SQLExprTableSource) { } else if (table.getLeft() instanceof SQLExprTableSource) {
...@@ -171,8 +184,7 @@ public class LineageUtils { ...@@ -171,8 +184,7 @@ public class LineageUtils {
handlerSQLUnionQueryTableSource(node, (SQLUnionQueryTableSource) table.getLeft(), type); handlerSQLUnionQueryTableSource(node, (SQLUnionQueryTableSource) table.getLeft(), type);
} }
}); });
node.getAllLeafs().stream().filter(e -> !e.getData().getIsEnd()).forEach(e -> node.getAllLeafs().stream().filter(e -> !e.getData().getIsEnd()).forEach(e -> {
{
if (table.getRight() instanceof SQLJoinTableSource) { if (table.getRight() instanceof SQLJoinTableSource) {
handlerSQLJoinTableSource(node, (SQLJoinTableSource) table.getRight(), type); handlerSQLJoinTableSource(node, (SQLJoinTableSource) table.getRight(), type);
} else if (table.getRight() instanceof SQLExprTableSource) { } else if (table.getRight() instanceof SQLExprTableSource) {
...@@ -199,8 +211,7 @@ public class LineageUtils { ...@@ -199,8 +211,7 @@ public class LineageUtils {
String tableName = tableSource.getExpr() instanceof SQLPropertyExpr ? (( String tableName = tableSource.getExpr() instanceof SQLPropertyExpr ? ((
SQLPropertyExpr) tableSource.getExpr()).getName().replace("`", "").replace("\"", "") : ""; SQLPropertyExpr) tableSource.getExpr()).getName().replace("`", "").replace("\"", "") : "";
String alias = Asserts.isNotNullString(tableSource.getAlias()) ? tableSource.getAlias().replace("`", "").replace("\"", "") : ""; String alias = Asserts.isNotNullString(tableSource.getAlias()) ? tableSource.getAlias().replace("`", "").replace("\"", "") : "";
node.getChildren().forEach(e -> node.getChildren().forEach(e -> {
{
e.getChildren().forEach(f -> { e.getChildren().forEach(f -> {
if (!f.getData().getIsEnd() && (f.getData().getSourceTableName() == null || f.getData().getSourceTableName().equals(tableName) || f if (!f.getData().getIsEnd() && (f.getData().getSourceTableName() == null || f.getData().getSourceTableName().equals(tableName) || f
.getData().getSourceTableName().equals(alias))) { .getData().getSourceTableName().equals(alias))) {
...@@ -271,8 +282,7 @@ public class LineageUtils { ...@@ -271,8 +282,7 @@ public class LineageUtils {
node.getData().setIsEnd(true); node.getData().setIsEnd(true);
} }
} else { } else {
expr.getArguments().forEach(expr1 -> expr.getArguments().forEach(expr1 -> {
{
handlerExpr(expr1, node); handlerExpr(expr1, node);
}); });
} }
...@@ -285,8 +295,7 @@ public class LineageUtils { ...@@ -285,8 +295,7 @@ public class LineageUtils {
* @param node * @param node
*/ */
public static void visitSQLAggregateExpr(SQLAggregateExpr expr, TreeNode<LineageColumn> node) { public static void visitSQLAggregateExpr(SQLAggregateExpr expr, TreeNode<LineageColumn> node) {
expr.getArguments().forEach(expr1 -> expr.getArguments().forEach(expr1 -> {
{
handlerExpr(expr1, node); handlerExpr(expr1, node);
}); });
} }
...@@ -299,8 +308,7 @@ public class LineageUtils { ...@@ -299,8 +308,7 @@ public class LineageUtils {
*/ */
public static void visitSQLCaseExpr(SQLCaseExpr expr, TreeNode<LineageColumn> node) { public static void visitSQLCaseExpr(SQLCaseExpr expr, TreeNode<LineageColumn> node) {
handlerExpr(expr.getValueExpr(), node); handlerExpr(expr.getValueExpr(), node);
expr.getItems().forEach(expr1 -> expr.getItems().forEach(expr1 -> {
{
handlerExpr(expr1.getValueExpr(), node); handlerExpr(expr1.getValueExpr(), node);
}); });
handlerExpr(expr.getElseExpr(), node); handlerExpr(expr.getElseExpr(), node);
......
...@@ -17,10 +17,13 @@ ...@@ -17,10 +17,13 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqlLineage;
import java.util.*; import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
public class TreeNode<T> implements Iterable<TreeNode<T>> { public class TreeNode<T> implements Iterable<TreeNode<T>> {
......
...@@ -17,9 +17,10 @@ ...@@ -17,9 +17,10 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqlLineage;
import java.util.Iterator; import java.util.Iterator;
public class TreeNodeIterator<T> implements Iterator<TreeNode<T>> { public class TreeNodeIterator<T> implements Iterator<TreeNode<T>> {
private ProcessStages doNext; private ProcessStages doNext;
......
...@@ -17,16 +17,15 @@ ...@@ -17,16 +17,15 @@
* *
*/ */
package com.dlink.explainer.trans; package com.dlink.explainer.trans;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.fasterxml.jackson.databind.JsonNode;
/** /**
* AbstractTrans * AbstractTrans
* *
...@@ -45,7 +44,6 @@ public abstract class AbstractTrans { ...@@ -45,7 +44,6 @@ public abstract class AbstractTrans {
protected Integer parallelism; protected Integer parallelism;
protected List<Predecessor> predecessors; protected List<Predecessor> predecessors;
public void build(JsonNode node) { public void build(JsonNode node) {
id = node.get("id").asInt(); id = node.get("id").asInt();
text = node.toPrettyString(); text = node.toPrettyString();
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.trans; package com.dlink.explainer.trans;
/** /**
......
...@@ -17,10 +17,10 @@ ...@@ -17,10 +17,10 @@
* *
*/ */
package com.dlink.explainer.trans; package com.dlink.explainer.trans;
import com.dlink.utils.MapParseUtils; import com.dlink.utils.MapParseUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -44,8 +44,8 @@ public class OperatorTrans extends AbstractTrans implements Trans { ...@@ -44,8 +44,8 @@ public class OperatorTrans extends AbstractTrans implements Trans {
private List<String> leftInputSpec; private List<String> leftInputSpec;
private List<String> rightInputSpec; private List<String> rightInputSpec;
public final static String TRANS_TYPE = "Operator"; public static final String TRANS_TYPE = "Operator";
private final static String FIELD_AS = " AS "; private static final String FIELD_AS = " AS ";
public List<Field> getSelect() { public List<Field> getSelect() {
return select; return select;
...@@ -89,7 +89,6 @@ public class OperatorTrans extends AbstractTrans implements Trans { ...@@ -89,7 +89,6 @@ public class OperatorTrans extends AbstractTrans implements Trans {
return TRANS_TYPE.equals(pact); return TRANS_TYPE.equals(pact);
} }
@Override @Override
public void translate() { public void translate() {
name = pact; name = pact;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.explainer.trans; package com.dlink.explainer.trans;
import lombok.Getter; import lombok.Getter;
...@@ -44,10 +43,10 @@ public class Predecessor { ...@@ -44,10 +43,10 @@ public class Predecessor {
@Override @Override
public String toString() { public String toString() {
return "Predecessor{" + return "Predecessor{"
"id=" + id + + "id=" + id
", shipStrategy='" + shipStrategy + '\'' + + ", shipStrategy='" + shipStrategy + '\''
", side='" + side + '\'' + + ", side='" + side + '\''
'}'; + '}';
} }
} }
...@@ -17,10 +17,8 @@ ...@@ -17,10 +17,8 @@
* *
*/ */
package com.dlink.explainer.trans; package com.dlink.explainer.trans;
import com.dlink.utils.MapParseUtils; import com.dlink.utils.MapParseUtils;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -40,7 +38,7 @@ public class SinkTrans extends AbstractTrans implements Trans { ...@@ -40,7 +38,7 @@ public class SinkTrans extends AbstractTrans implements Trans {
private String table; private String table;
private List<String> fields; private List<String> fields;
public final static String TRANS_TYPE = "Data Sink"; public static final String TRANS_TYPE = "Data Sink";
public SinkTrans() { public SinkTrans() {
} }
......
...@@ -17,15 +17,19 @@ ...@@ -17,15 +17,19 @@
* *
*/ */
package com.dlink.explainer.trans; package com.dlink.explainer.trans;
import com.dlink.utils.MapParseUtils; import com.dlink.utils.MapParseUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils; import org.apache.flink.table.operations.OperationUtils;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/** /**
* SourceTrans * SourceTrans
...@@ -41,7 +45,7 @@ public class SourceTrans extends AbstractTrans implements Trans { ...@@ -41,7 +45,7 @@ public class SourceTrans extends AbstractTrans implements Trans {
private List<String> project; private List<String> project;
private List<String> fields; private List<String> fields;
public final static String TRANS_TYPE = "Data Source"; public static final String TRANS_TYPE = "Data Source";
public SourceTrans() { public SourceTrans() {
} }
......
...@@ -17,13 +17,12 @@ ...@@ -17,13 +17,12 @@
* *
*/ */
package com.dlink.explainer.trans; package com.dlink.explainer.trans;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.List; import java.util.List;
import com.fasterxml.jackson.databind.JsonNode;
/** /**
* Trans * Trans
* *
......
...@@ -17,18 +17,18 @@ ...@@ -17,18 +17,18 @@
* *
*/ */
package com.dlink.explainer.trans; package com.dlink.explainer.trans;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/** /**
* TransGenerator * TransGenerator
* *
......
...@@ -17,19 +17,19 @@ ...@@ -17,19 +17,19 @@
* *
*/ */
package com.dlink.job; package com.dlink.job;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import lombok.Getter;
import lombok.Setter;
/** /**
* Job * Job
* *
......
...@@ -17,12 +17,8 @@ ...@@ -17,12 +17,8 @@
* *
*/ */
package com.dlink.job; package com.dlink.job;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
...@@ -33,6 +29,9 @@ import com.dlink.gateway.config.GatewayConfig; ...@@ -33,6 +29,9 @@ import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.SavePointStrategy; import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
import java.util.List;
import java.util.Map;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.job; package com.dlink.job;
/** /**
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.job; package com.dlink.job;
import com.dlink.exception.JobException; import com.dlink.exception.JobException;
......
...@@ -17,32 +17,8 @@ ...@@ -17,32 +17,8 @@
* *
*/ */
package com.dlink.job; package com.dlink.job;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.api.FlinkAPI; import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
...@@ -79,6 +55,30 @@ import com.dlink.trans.Operations; ...@@ -79,6 +55,30 @@ import com.dlink.trans.Operations;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import com.dlink.utils.UDFUtil; import com.dlink.utils.UDFUtil;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
/** /**
...@@ -300,9 +300,11 @@ public class JobManager { ...@@ -300,9 +300,11 @@ public class JobManager {
TableResult tableResult = executor.executeStatementSet(inserts); TableResult tableResult = executor.executeStatementSet(inserts);
if (tableResult.getJobClient().isPresent()) { if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
job.setJids(new ArrayList<String>() {{ job.setJids(new ArrayList<String>() {
{
add(job.getJobId()); add(job.getJobId());
}}); }
});
} }
if (config.isUseResult()) { if (config.isUseResult()) {
// Build insert result. // Build insert result.
...@@ -339,9 +341,11 @@ public class JobManager { ...@@ -339,9 +341,11 @@ public class JobManager {
TableResult tableResult = executor.executeSql(item.getValue()); TableResult tableResult = executor.executeSql(item.getValue());
if (tableResult.getJobClient().isPresent()) { if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
job.setJids(new ArrayList<String>() {{ job.setJids(new ArrayList<String>() {
{
add(job.getJobId()); add(job.getJobId());
}}); }
});
} }
if (config.isUseResult()) { if (config.isUseResult()) {
IResult result = IResult result =
...@@ -390,9 +394,11 @@ public class JobManager { ...@@ -390,9 +394,11 @@ public class JobManager {
JobClient jobClient = executor.executeAsync(config.getJobName()); JobClient jobClient = executor.executeAsync(config.getJobName());
if (Asserts.isNotNull(jobClient)) { if (Asserts.isNotNull(jobClient)) {
job.setJobId(jobClient.getJobID().toHexString()); job.setJobId(jobClient.getJobID().toHexString());
job.setJids(new ArrayList<String>() {{ job.setJids(new ArrayList<String>() {
{
add(job.getJobId()); add(job.getJobId());
}}); }
});
} }
if (config.isUseResult()) { if (config.isUseResult()) {
IResult result = ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(null); IResult result = ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(null);
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.job; package com.dlink.job;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -69,7 +68,7 @@ public class JobParam { ...@@ -69,7 +68,7 @@ public class JobParam {
public List<String> getTransStatement() { public List<String> getTransStatement() {
List<String> statementList = new ArrayList<>(); List<String> statementList = new ArrayList<>();
for(StatementParam statementParam: trans){ for (StatementParam statementParam: trans) {
statementList.add(statementParam.getValue()); statementList.add(statementParam.getValue());
} }
return statementList; return statementList;
......
...@@ -17,15 +17,15 @@ ...@@ -17,15 +17,15 @@
* *
*/ */
package com.dlink.job; package com.dlink.job;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
/** /**
* JobResult * JobResult
* *
...@@ -51,7 +51,8 @@ public class JobResult { ...@@ -51,7 +51,8 @@ public class JobResult {
public JobResult() { public JobResult() {
} }
public JobResult(Integer id, Integer jobInstanceId, JobConfig jobConfig, String jobManagerAddress, Job.JobStatus status, String statement, String jobId, String error, IResult result, LocalDateTime startTime, LocalDateTime endTime) { public JobResult(Integer id, Integer jobInstanceId, JobConfig jobConfig, String jobManagerAddress, Job.JobStatus status,
String statement, String jobId, String error, IResult result, LocalDateTime startTime, LocalDateTime endTime) {
this.id = id; this.id = id;
this.jobInstanceId = jobInstanceId; this.jobInstanceId = jobInstanceId;
this.jobConfig = jobConfig; this.jobConfig = jobConfig;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.job; package com.dlink.job;
/** /**
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.job; package com.dlink.job;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.plus; package com.dlink.plus;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
...@@ -25,12 +24,14 @@ import com.dlink.explainer.Explainer; ...@@ -25,12 +24,14 @@ import com.dlink.explainer.Explainer;
import com.dlink.explainer.ca.ColumnCAResult; import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.TableCAResult; import com.dlink.explainer.ca.TableCAResult;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.fasterxml.jackson.databind.node.ObjectNode;
/** /**
* FlinkSqlPlus * FlinkSqlPlus
* *
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.plus; package com.dlink.plus;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
......
...@@ -17,17 +17,16 @@ ...@@ -17,17 +17,16 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import lombok.Getter;
import lombok.Setter;
/** /**
* DDLResult * DDLResult
* *
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import java.time.LocalDateTime; import java.time.LocalDateTime;
......
...@@ -17,14 +17,13 @@ ...@@ -17,14 +17,13 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import java.time.LocalDateTime;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.time.LocalDateTime;
/** /**
* InsertResult * InsertResult
* *
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
/** /**
......
...@@ -17,13 +17,12 @@ ...@@ -17,13 +17,12 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import org.apache.flink.table.api.TableResult;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import org.apache.flink.table.api.TableResult;
/** /**
* ResultBuilder * ResultBuilder
* *
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import java.util.HashMap; import java.util.HashMap;
......
...@@ -17,9 +17,11 @@ ...@@ -17,9 +17,11 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import com.dlink.constant.FlinkConstant;
import com.dlink.utils.FlinkUtil;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -34,9 +36,6 @@ import java.util.List; ...@@ -34,9 +36,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import com.dlink.constant.FlinkConstant;
import com.dlink.utils.FlinkUtil;
/** /**
* ResultRunnable * ResultRunnable
* *
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
......
...@@ -17,17 +17,16 @@ ...@@ -17,17 +17,16 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import lombok.Getter;
import lombok.Setter;
/** /**
* SelectResult * SelectResult
* *
...@@ -53,7 +52,7 @@ public class SelectResult extends AbstractResult implements IResult { ...@@ -53,7 +52,7 @@ public class SelectResult extends AbstractResult implements IResult {
this.columns = columns; this.columns = columns;
this.jobID = jobID; this.jobID = jobID;
this.success = success; this.success = success;
// this.endTime = LocalDateTime.now(); //this.endTime = LocalDateTime.now();
this.isDestroyed = false; this.isDestroyed = false;
} }
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
......
...@@ -17,14 +17,20 @@ ...@@ -17,14 +17,20 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import com.dlink.utils.FlinkUtil; import com.dlink.utils.FlinkUtil;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import java.util.*; import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* ShowResultBuilder * ShowResultBuilder
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.result; package com.dlink.result;
import java.time.LocalDateTime; import java.time.LocalDateTime;
......
...@@ -17,15 +17,15 @@ ...@@ -17,15 +17,15 @@
* *
*/ */
package com.dlink.session; package com.dlink.session;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
/** /**
* FlinkEntity * FlinkEntity
* *
......
...@@ -17,10 +17,10 @@ ...@@ -17,10 +17,10 @@
* *
*/ */
package com.dlink.session; package com.dlink.session;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
......
...@@ -17,14 +17,13 @@ ...@@ -17,14 +17,13 @@
* *
*/ */
package com.dlink.session; package com.dlink.session;
import java.time.LocalDateTime;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.time.LocalDateTime;
/** /**
* SessionInfo * SessionInfo
* *
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.session; package com.dlink.session;
import com.dlink.constant.FlinkConstant; import com.dlink.constant.FlinkConstant;
......
...@@ -17,10 +17,8 @@ ...@@ -17,10 +17,8 @@
* *
*/ */
package com.dlink.utils; package com.dlink.utils;
import javax.tools.*;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
...@@ -32,6 +30,17 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -32,6 +30,17 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.tools.Diagnostic;
import javax.tools.DiagnosticCollector;
import javax.tools.FileObject;
import javax.tools.ForwardingJavaFileManager;
import javax.tools.JavaCompiler;
import javax.tools.JavaFileManager;
import javax.tools.JavaFileObject;
import javax.tools.SimpleJavaFileObject;
import javax.tools.StandardJavaFileManager;
import javax.tools.ToolProvider;
/** /**
* CustomStringJavaCompiler * CustomStringJavaCompiler
* *
......
...@@ -17,10 +17,16 @@ ...@@ -17,10 +17,16 @@
* *
*/ */
package com.dlink.utils; package com.dlink.utils;
import java.util.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -293,8 +299,6 @@ public class MapParseUtils { ...@@ -293,8 +299,6 @@ public class MapParseUtils {
* @date 2021/8/20 15:03 * @date 2021/8/20 15:03
*/ */
public static Map parseForSelect(String inStr) { public static Map parseForSelect(String inStr) {
// Map map = new HashMap();
// map.put(getMapKeyOnlySelectOrField(inStr), getSelectList(inStr));
return getKeyAndValues(inStr); return getKeyAndValues(inStr);
} }
......
...@@ -17,14 +17,15 @@ ...@@ -17,14 +17,15 @@
* *
*/ */
package com.dlink.utils; package com.dlink.utils;
import com.dlink.pool.ClassEntity; import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool; import com.dlink.pool.ClassPool;
import groovy.lang.GroovyClassLoader;
import org.codehaus.groovy.control.CompilerConfiguration; import org.codehaus.groovy.control.CompilerConfiguration;
import groovy.lang.GroovyClassLoader;
/** /**
* UDFUtil * UDFUtil
* *
...@@ -58,6 +59,6 @@ public class UDFUtil { ...@@ -58,6 +59,6 @@ public class UDFUtil {
groovyClassLoader.setShouldRecompile(true); groovyClassLoader.setShouldRecompile(true);
groovyClassLoader.defineClass(classEntity.getName(), classEntity.getClassByte()); groovyClassLoader.defineClass(classEntity.getName(), classEntity.getClassByte());
Thread.currentThread().setContextClassLoader(groovyClassLoader); Thread.currentThread().setContextClassLoader(groovyClassLoader);
// Class<?> clazz = groovyClassLoader.parseClass(codeSource,"com.dlink.ud.udf.SubstringFunction"); //Class<?> clazz = groovyClassLoader.parseClass(codeSource,"com.dlink.ud.udf.SubstringFunction");
} }
} }
...@@ -17,18 +17,12 @@ ...@@ -17,18 +17,12 @@
* *
*/ */
package com.dlink.core; package com.dlink.core;
import com.dlink.executor.CustomTableEnvironmentImpl;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.junit.Test; import org.junit.Test;
/** /**
...@@ -40,20 +34,20 @@ import org.junit.Test; ...@@ -40,20 +34,20 @@ import org.junit.Test;
public class BatchTest { public class BatchTest {
@Test @Test
public void batchTest() { public void batchTest() {
String source = "CREATE TABLE Orders (\n" + String source = "CREATE TABLE Orders (\n"
" order_number BIGINT,\n" + + " order_number BIGINT,\n"
" price DECIMAL(32,2),\n" + + " price DECIMAL(32,2),\n"
" buyer ROW<first_name STRING, last_name STRING>,\n" + + " buyer ROW<first_name STRING, last_name STRING>,\n"
" order_time TIMESTAMP(3)\n" + + " order_time TIMESTAMP(3)\n"
") WITH (\n" + + ") WITH (\n"
" 'connector' = 'datagen',\n" + + " 'connector' = 'datagen',\n"
" 'number-of-rows' = '100'\n" + + " 'number-of-rows' = '100'\n"
")"; + ")";
String select = "select order_number,price,order_time from Orders"; String select = "select order_number,price,order_time from Orders";
// LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment(); //LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
EnvironmentSettings settings = EnvironmentSettings EnvironmentSettings settings = EnvironmentSettings
.newInstance() .newInstance()
// .inStreamingMode() // 声明为流任务 //.inStreamingMode() // 声明为流任务
.inBatchMode() // 声明为批任务 .inBatchMode() // 声明为批任务
.build(); .build();
......
...@@ -17,16 +17,17 @@ ...@@ -17,16 +17,17 @@
* *
*/ */
package com.dlink.core; package com.dlink.core;
import com.dlink.api.FlinkAPI; import com.dlink.api.FlinkAPI;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.fasterxml.jackson.databind.JsonNode;
import org.junit.Test;
import java.util.List; import java.util.List;
import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
/** /**
* FlinkRestAPITest * FlinkRestAPITest
* *
...@@ -56,7 +57,6 @@ public class FlinkRestAPITest { ...@@ -56,7 +57,6 @@ public class FlinkRestAPITest {
FlinkAPI.build(address).stop("0727f796fcf9e07d89e724f7e15598cf"); FlinkAPI.build(address).stop("0727f796fcf9e07d89e724f7e15598cf");
} }
@Test @Test
public void getCheckPointsDetailInfoTest() { public void getCheckPointsDetailInfoTest() {
JsonNode checkPointsDetailInfo = FlinkAPI.build(address).getCheckPointsConfig("9b0910c865874430b98d3817a248eb24"); JsonNode checkPointsDetailInfo = FlinkAPI.build(address).getCheckPointsConfig("9b0910c865874430b98d3817a248eb24");
...@@ -87,14 +87,12 @@ public class FlinkRestAPITest { ...@@ -87,14 +87,12 @@ public class FlinkRestAPITest {
System.out.println(jobManagerConfig.toString()); System.out.println(jobManagerConfig.toString());
} }
@Test @Test
public void getJobManagerLogTest() { public void getJobManagerLogTest() {
String jobManagerLog = FlinkAPI.build(address).getJobManagerLog(); String jobManagerLog = FlinkAPI.build(address).getJobManagerLog();
System.out.println(jobManagerLog); System.out.println(jobManagerLog);
} }
@Test @Test
public void getJobManagerStdOutTest() { public void getJobManagerStdOutTest() {
String jobManagerLogs = FlinkAPI.build(address).getJobManagerStdOut(); String jobManagerLogs = FlinkAPI.build(address).getJobManagerStdOut();
...@@ -106,6 +104,7 @@ public class FlinkRestAPITest { ...@@ -106,6 +104,7 @@ public class FlinkRestAPITest {
JsonNode jobManagerLogList = FlinkAPI.build(address).getJobManagerLogList(); JsonNode jobManagerLogList = FlinkAPI.build(address).getJobManagerLogList();
System.out.println(jobManagerLogList.toString()); System.out.println(jobManagerLogList.toString());
} }
@Test @Test
public void getJobManagerLogListToDetailTest() { public void getJobManagerLogListToDetailTest() {
String jobManagerLogList = FlinkAPI.build(address).getJobManagerLogFileDetail("jobmanager.log"); String jobManagerLogList = FlinkAPI.build(address).getJobManagerLogFileDetail("jobmanager.log");
...@@ -124,7 +123,6 @@ public class FlinkRestAPITest { ...@@ -124,7 +123,6 @@ public class FlinkRestAPITest {
System.out.println(taskManagerMetrics.toString()); System.out.println(taskManagerMetrics.toString());
} }
@Test @Test
public void getTaskManagerLogTest() { public void getTaskManagerLogTest() {
String taskManagerLog = FlinkAPI.build(address).getTaskManagerLog("container_e46_1655948912029_0061_01_000002"); String taskManagerLog = FlinkAPI.build(address).getTaskManagerLog("container_e46_1655948912029_0061_01_000002");
...@@ -142,6 +140,7 @@ public class FlinkRestAPITest { ...@@ -142,6 +140,7 @@ public class FlinkRestAPITest {
JsonNode taskManagerLogList = FlinkAPI.build(address).getTaskManagerLogList("container_e46_1655948912029_0061_01_000002"); JsonNode taskManagerLogList = FlinkAPI.build(address).getTaskManagerLogList("container_e46_1655948912029_0061_01_000002");
System.out.println(taskManagerLogList.toString()); System.out.println(taskManagerLogList.toString());
} }
@Test @Test
public void getTaskManagerLogListToDetail() { public void getTaskManagerLogListToDetail() {
String taskManagerLogDetail = FlinkAPI.build(address).getTaskManagerLogFileDeatil("container_e46_1655948912029_0061_01_000002","taskmanager.log"); String taskManagerLogDetail = FlinkAPI.build(address).getTaskManagerLogFileDeatil("container_e46_1655948912029_0061_01_000002","taskmanager.log");
......
...@@ -17,11 +17,12 @@ ...@@ -17,11 +17,12 @@
* *
*/ */
package com.dlink.core; package com.dlink.core;
import com.dlink.plus.FlinkSqlPlus; import com.dlink.plus.FlinkSqlPlus;
import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.junit.Test; import org.junit.Test;
/** /**
...@@ -34,48 +35,48 @@ public class FlinkSqlPlusTest { ...@@ -34,48 +35,48 @@ public class FlinkSqlPlusTest {
@Test @Test
public void getJobPlanInfo() { public void getJobPlanInfo() {
String sql = "jdbcconfig:='connector' = 'jdbc',\n" + String sql = "jdbcconfig:='connector' = 'jdbc',\n"
" 'url' = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" + + " 'url' = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
" 'username'='dlink',\n" + + "&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n"
" 'password'='dlink',;\n" + + " 'username'='dlink',\n"
"create temporary function TOP2 as 'com.dlink.ud.udtaf.Top2';\n" + + " 'password'='dlink',;\n"
"CREATE TABLE student (\n" + + "create temporary function TOP2 as 'com.dlink.ud.udtaf.Top2';\n"
" sid INT,\n" + + "CREATE TABLE student (\n"
" name STRING,\n" + + " sid INT,\n"
" PRIMARY KEY (sid) NOT ENFORCED\n" + + " name STRING,\n"
") WITH (\n" + + " PRIMARY KEY (sid) NOT ENFORCED\n"
" ${jdbcconfig}\n" + + ") WITH (\n"
" 'table-name' = 'student'\n" + + " ${jdbcconfig}\n"
");\n" + + " 'table-name' = 'student'\n"
"CREATE TABLE score (\n" + + ");\n"
" cid INT,\n" + + "CREATE TABLE score (\n"
" sid INT,\n" + + " cid INT,\n"
" cls STRING,\n" + + " sid INT,\n"
" score INT,\n" + + " cls STRING,\n"
" PRIMARY KEY (cid) NOT ENFORCED\n" + + " score INT,\n"
") WITH (\n" + + " PRIMARY KEY (cid) NOT ENFORCED\n"
" ${jdbcconfig}\n" + + ") WITH (\n"
" 'table-name' = 'score'\n" + + " ${jdbcconfig}\n"
");\n" + + " 'table-name' = 'score'\n"
"CREATE TABLE scoretop2 (\n" + + ");\n"
" cls STRING,\n" + + "CREATE TABLE scoretop2 (\n"
" score INT,\n" + + " cls STRING,\n"
" `rank` INT,\n" + + " score INT,\n"
" PRIMARY KEY (cls,`rank`) NOT ENFORCED\n" + + " `rank` INT,\n"
") WITH (\n" + + " PRIMARY KEY (cls,`rank`) NOT ENFORCED\n"
" ${jdbcconfig}\n" + + ") WITH (\n"
" 'table-name' = 'scoretop2'\n" + + " ${jdbcconfig}\n"
");\n" + + " 'table-name' = 'scoretop2'\n"
"CREATE AGGTABLE aggscore AS \n" + + ");\n"
"SELECT cls,score,rank\n" + + "CREATE AGGTABLE aggscore AS \n"
"FROM score\n" + + "SELECT cls,score,rank\n"
"GROUP BY cls\n" + + "FROM score\n"
"AGG BY TOP2(score) as (score,rank);\n" + + "GROUP BY cls\n"
"\n" + + "AGG BY TOP2(score) as (score,rank);\n"
"insert into scoretop2\n" + + "insert into scoretop2\n"
"select \n" + + "select \n"
"b.cls,b.score,b.`rank`\n" + + "b.cls,b.score,b.`rank`\n"
"from aggscore b"; + "from aggscore b";
FlinkSqlPlus plus = FlinkSqlPlus.build(); FlinkSqlPlus plus = FlinkSqlPlus.build();
JobPlanInfo jobPlanInfo = plus.getJobPlanInfo(sql); JobPlanInfo jobPlanInfo = plus.getJobPlanInfo(sql);
......
...@@ -17,19 +17,18 @@ ...@@ -17,19 +17,18 @@
* *
*/ */
package com.dlink.core; package com.dlink.core;
import java.util.HashMap;
import org.junit.Test;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import com.dlink.job.JobManager; import com.dlink.job.JobManager;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.result.ResultPool; import com.dlink.result.ResultPool;
import com.dlink.result.SelectResult; import com.dlink.result.SelectResult;
import java.util.HashMap;
import org.junit.Test;
/** /**
* JobManagerTest * JobManagerTest
* *
...@@ -48,14 +47,14 @@ public class JobManagerTest { ...@@ -48,14 +47,14 @@ public class JobManagerTest {
config.setAddress("192.168.123.157:8081"); config.setAddress("192.168.123.157:8081");
} }
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
String sql1 = "CREATE TABLE Orders (\n" + String sql1 = "CREATE TABLE Orders (\n"
" order_number BIGINT,\n" + + " order_number BIGINT,\n"
" price DECIMAL(32,2),\n" + + " price DECIMAL(32,2),\n"
" order_time TIMESTAMP(3)\n" + + " order_time TIMESTAMP(3)\n"
") WITH (\n" + + ") WITH (\n"
" 'connector' = 'datagen',\n" + + " 'connector' = 'datagen',\n"
" 'rows-per-second' = '1'\n" + + " 'rows-per-second' = '1'\n"
");"; + ");";
String sql3 = "select order_number,price,order_time from Orders"; String sql3 = "select order_number,price,order_time from Orders";
String sql = sql1 + sql3; String sql = sql1 + sql3;
JobResult result = jobManager.executeSql(sql); JobResult result = jobManager.executeSql(sql);
......
...@@ -17,11 +17,11 @@ ...@@ -17,11 +17,11 @@
* *
*/ */
package com.dlink.core; package com.dlink.core;
import com.dlink.explainer.lineage.LineageBuilder; import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult; import com.dlink.explainer.lineage.LineageResult;
import org.junit.Test; import org.junit.Test;
/** /**
...@@ -34,21 +34,21 @@ public class LineageTest { ...@@ -34,21 +34,21 @@ public class LineageTest {
@Test @Test
public void sumTest() { public void sumTest() {
String sql = "CREATE TABLE ST (\n" + String sql = "CREATE TABLE ST (\n"
" a STRING,\n" + + " a STRING,\n"
" b STRING,\n" + + " b STRING,\n"
" c STRING\n" + + " c STRING\n"
") WITH (\n" + + ") WITH (\n"
" 'connector' = 'datagen',\n" + + " 'connector' = 'datagen',\n"
" 'rows-per-second' = '1'\n" + + " 'rows-per-second' = '1'\n"
");\n" + + ");\n"
"CREATE TABLE TT (\n" + + "CREATE TABLE TT (\n"
" A STRING,\n" + + " A STRING,\n"
" B STRING\n" + + " B STRING\n"
") WITH (\n" + + ") WITH (\n"
" 'connector' = 'print'\n" + + " 'connector' = 'print'\n"
");\n" + + ");\n"
"insert into TT select a||c A ,b||c B from ST"; + "insert into TT select a||c A ,b||c B from ST";
LineageResult result = LineageBuilder.getLineage(sql); LineageResult result = LineageBuilder.getLineage(sql);
System.out.println("end"); System.out.println("end");
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment