Commit 1343462e authored by wenmo's avatar wenmo

元数据base

parent 34885d7e
......@@ -78,6 +78,10 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-core</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.3.0-SANPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-common</artifactId>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.dlink.assertion;
import com.dlink.exception.JobException;
/**
* Asserts
*
......@@ -19,7 +17,7 @@ public class Asserts {
}
public static boolean isNullString(String str){
return str==null||"".equals(str);
return isNull(str)||"".equals(str);
}
public static boolean isEquals(String str1,String str2){
......@@ -32,15 +30,25 @@ public class Asserts {
}
}
public static boolean isEqualsIgnoreCase(String str1,String str2){
if(isNull(str1)&&isNull(str2)){
return true;
}else if(isNull(str1)||isNull(str2)){
return false;
}else{
return str1.equalsIgnoreCase(str2);
}
}
public static void checkNull(String key,String msg) {
if (key == null||"".equals(key)) {
throw new JobException(msg);
throw new NullPointerException(msg);
}
}
public static void checkNotNull(Object object,String msg) {
if (isNull(object)) {
throw new JobException(msg);
throw new NullPointerException(msg);
}
}
......
package com.dlink.exception;
import org.apache.flink.annotation.PublicEvolving;
/**
* JobException
......@@ -8,8 +7,8 @@ import org.apache.flink.annotation.PublicEvolving;
* @author wenmo
* @since 2021/6/27
**/
@PublicEvolving
public class JobException extends RuntimeException {
public JobException(String message, Throwable cause) {
super(message, cause);
}
......
package com.dlink.exception;
/**
* JobException
*
* @author wenmo
* @since 2021/6/27
**/
public class MetaDataException extends RuntimeException {
public MetaDataException(String message, Throwable cause) {
super(message, cause);
}
public MetaDataException(String message) {
super(message);
}
}
\ No newline at end of file
package com.dlink.exception;
/**
* RunTimeException
*
* @author wenmo
* @since 2021/6/27
**/
public class RunTimeException extends RuntimeException {
public RunTimeException(String message, Throwable cause) {
super(message, cause);
}
public RunTimeException(String message) {
super(message);
}
}
\ No newline at end of file
package com.dlink.exception;
import org.apache.flink.annotation.PublicEvolving;
/**
* SqlException
*
* @author wenmo
* @since 2021/6/22
**/
@PublicEvolving
public class SqlException extends RuntimeException {
public SqlException(String message, Throwable cause) {
super(message, cause);
}
......
package com.dlink.model;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* Column
*
* @author wenmo
* @since 2021/7/19 23:26
*/
@Setter
@Getter
public class Column implements Serializable {
private static final long serialVersionUID = 6438514547501611599L;
private boolean convert;
private boolean keyFlag;
/**
* 主键是否为自增类型
*/
private boolean keyIdentityFlag;
private String name;
private String type;
private String propertyName;
private String columnType;
private String comment;
private String fill;
private String isNotNull;
private boolean keyWords;
private String columnName;
private String columnFamily;
}
\ No newline at end of file
package com.dlink.model;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* Schema
*
* @author wenmo
* @since 2021/7/19 23:27
*/
@Getter
@Setter
public class Schema implements Serializable, Comparable<Schema> {
private static final long serialVersionUID = 4278304357661271040L;
private String name;
private List<Table> tables = new ArrayList<>();
public Schema(String name) {
this.name = name;
}
public Schema(String name, List<Table> tables) {
this.name = name;
this.tables = tables;
}
@Override
public int compareTo(Schema o) {
return this.name.compareTo(o.getName());
}
}
\ No newline at end of file
package com.dlink.model;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.List;
/**
* Table
*
* @author wenmo
* @since 2021/7/19 23:27
*/
@Getter
@Setter
public class Table implements Serializable, Comparable<Table> {
private static final long serialVersionUID = 4209205512472367171L;
private String name;
private String schema;
private String comment;
private List<Column> columns;
public Table() {
}
public Table(String name, String schema, List<Column> columns) {
this.name = name;
this.schema = schema;
this.columns = columns;
}
@Override
public int compareTo(Table o) {
return this.name.compareTo(o.getName());
}
public static Table build(String name) {
return new Table(name, null, null);
}
public static Table build(String name, String schema) {
return new Table(name, schema, null);
}
public static Table build(String name, String schema, List<Column> columns) {
return new Table(name, schema, columns);
}
}
......@@ -21,12 +21,12 @@
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
......@@ -37,17 +37,17 @@
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.*;
/** Helper class for generating a JSON representation from a {@link StreamGraph}. */
@Internal
public class JSONGenerator {
public static final String STEPS = "step_function";
public static final String ID = "id";
public static final String SIDE = "side";
public static final String SHIP_STRATEGY = "ship_strategy";
public static final String PREDECESSORS = "predecessors";
public static final String TYPE = "type";
public static final String PACT = "pact";
public static final String CONTENTS = "contents";
public static final String PARALLELISM = "parallelism";
private StreamGraph streamGraph;
private final ObjectMapper mapper = new ObjectMapper();
public JSONGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
}
public String getJSON() {
return getJSONNode().toPrettyString();
}
public ObjectNode getJSONNode() {
ObjectNode json = mapper.createObjectNode();
ArrayNode nodes = mapper.createArrayNode();
json.put("nodes", nodes);
List<Integer> operatorIDs = new ArrayList<>(streamGraph.getVertexIDs());
Comparator<Integer> operatorIDComparator =
Comparator.comparingInt(
(Integer id) -> streamGraph.getSinkIDs().contains(id) ? 1 : 0)
.thenComparingInt(id -> id);
operatorIDs.sort(operatorIDComparator);
visit(nodes, operatorIDs, new HashMap<>());
return json;
}
private void visit(
ArrayNode jsonArray, List<Integer> toVisit, Map<Integer, Integer> edgeRemapings) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
ObjectNode node = mapper.createObjectNode();
decorateNode(vertexID, node);
if (!streamGraph.getSourceIDs().contains(vertexID)) {
ArrayNode inputs = mapper.createArrayNode();
node.put(PREDECESSORS, inputs);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
Integer mappedID =
(edgeRemapings.keySet().contains(inputID))
? edgeRemapings.get(inputID)
: inputID;
decorateEdge(inputs, inEdge, mappedID);
}
}
jsonArray.add(node);
toVisit.remove(vertexID);
} else {
Integer iterationHead = -1;
for (StreamEdge inEdge : vertex.getInEdges()) {
int operator = inEdge.getSourceId();
if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
iterationHead = operator;
}
}
ObjectNode obj = mapper.createObjectNode();
ArrayNode iterationSteps = mapper.createArrayNode();
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
ArrayNode iterationInputs = mapper.createArrayNode();
obj.put(PREDECESSORS, iterationInputs);
toVisit.remove(iterationHead);
visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
jsonArray.add(obj);
}
if (!toVisit.isEmpty()) {
visit(jsonArray, toVisit, edgeRemapings);
}
}
private void visitIteration(
ArrayNode jsonArray,
List<Integer> toVisit,
int headId,
Map<Integer, Integer> edgeRemapings,
ArrayNode iterationInEdges) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);
// Ignoring head and tail to avoid redundancy
if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
ObjectNode obj = mapper.createObjectNode();
jsonArray.add(obj);
decorateNode(vertexID, obj);
ArrayNode inEdges = mapper.createArrayNode();
obj.put(PREDECESSORS, inEdges);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
if (edgeRemapings.keySet().contains(inputID)) {
decorateEdge(inEdges, inEdge, inputID);
} else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
decorateEdge(iterationInEdges, inEdge, inputID);
}
}
edgeRemapings.put(vertexID, headId);
visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
}
}
private void decorateEdge(ArrayNode inputArray, StreamEdge inEdge, int mappedInputID) {
ObjectNode input = mapper.createObjectNode();
inputArray.add(input);
input.put(ID, mappedInputID);
input.put(SHIP_STRATEGY, inEdge.getPartitioner().toString());
input.put(SIDE, (inputArray.size() == 0) ? "first" : "second");
}
private void decorateNode(Integer vertexID, ObjectNode node) {
StreamNode vertex = streamGraph.getStreamNode(vertexID);
node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
if (streamGraph.getSourceIDs().contains(vertexID)) {
node.put(PACT, "Data Source");
} else if (streamGraph.getSinkIDs().contains(vertexID)) {
node.put(PACT, "Data Sink");
} else {
node.put(PACT, "Operator");
}
node.put(CONTENTS, vertex.getOperatorName());
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-metadata</artifactId>
<groupId>com.dlink</groupId>
<version>0.3.0-SANPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-metadata-base</artifactId>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.dlink.metadata;
import com.dlink.assertion.Asserts;
import com.dlink.metadata.result.SelectResult;
import com.dlink.model.Column;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
/**
* AbstractDriver
*
* @author wenmo
* @since 2021/7/19 23:32
*/
public abstract class AbstractDriver implements Driver{
public Logger logger = LoggerFactory.getLogger(this.getClass());
protected DriverConfig config;
public boolean canHandle(String type){
return Asserts.isEqualsIgnoreCase(getType(),type);
}
public Driver setDriverConfig(DriverConfig config) {
this.config = config;
return this;
}
public abstract String getType();
public abstract boolean test();
public abstract Driver connect();
public abstract void close();
public abstract List<Schema> listSchemas();
public abstract List<Table> listTables(String schema);
public abstract List<Column> listColumns(String schema, String table);
public List<Schema> getSchemasAndTables(){
return listSchemas().stream().peek(schema -> schema.setTables(listTables(schema.getName()))).sorted().collect(Collectors.toList());
}
public List<Table> getTablesAndColumns(String schema){
return listTables(schema).stream().peek(table -> table.setColumns(listColumns(schema,table.getName()))).sorted().collect(Collectors.toList());
}
public abstract boolean existTable(Table table);
public abstract boolean createTable(Table table);
public abstract String getCreateTableSql(Table table);
public abstract boolean deleteTable(Table table);
public abstract boolean truncateTable(Table table);
public abstract boolean insert(Table table, JsonNode data);
public abstract boolean update(Table table, JsonNode data);
public abstract boolean delete(Table table, JsonNode data);
public abstract SelectResult select(String sql);
}
\ No newline at end of file
package com.dlink.metadata;
import com.dlink.assertion.Asserts;
import com.dlink.exception.MetaDataException;
import com.dlink.metadata.result.SelectResult;
import com.dlink.model.Column;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.fasterxml.jackson.databind.JsonNode;
import sun.misc.Service;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
/**
* Driver
*
* @author wenmo
* @since 2021/7/19 23:15
*/
public interface Driver {
static Optional<Driver> get(DriverConfig config) {
Asserts.checkNotNull(config, "配置不能为空");
Iterator<Driver> providers = Service.providers(Driver.class);
while (providers.hasNext()) {
Driver gainer = providers.next();
if (gainer.canHandle(config.getType())) {
return Optional.of(gainer.setDriverConfig(config));
}
}
return Optional.empty();
}
static Driver build(DriverConfig config) {
Optional<Driver> optionalDriver = Driver.get(config);
if (!optionalDriver.isPresent()) {
throw new MetaDataException("不支持数据源类型【" + config.getType() + "】");
}
return optionalDriver.get();
}
Driver setDriverConfig(DriverConfig config);
boolean canHandle(String type);
String getType();
boolean test();
Driver connect();
void close();
List<Schema> listSchemas();
List<Table> listTables(String schema);
List<Column> listColumns(String schema, String table);
List<Schema> getSchemasAndTables();
List<Table> getTablesAndColumns(String schema);
boolean existTable(Table table);
boolean createTable(Table table);
boolean deleteTable(Table table);
boolean truncateTable(Table table);
String getCreateTableSql(Table table);
boolean insert(Table table, JsonNode data);
boolean update(Table table, JsonNode data);
boolean delete(Table table, JsonNode data);
SelectResult select(String sql);
}
package com.dlink.metadata;
import lombok.Getter;
import lombok.Setter;
/**
* DriverConfig
*
* @author wenmo
* @since 2021/7/19 23:21
*/
@Getter
@Setter
public class DriverConfig {
private String type;
private String driverClassName;
private String ip;
private Integer port;
private String url;
private String username;
private String password;
}
package com.dlink.metadata.result;
import lombok.Getter;
import lombok.Setter;
import java.util.HashMap;
import java.util.List;
/**
* SelectResult
*
* @author wenmo
* @since 2021/7/19 23:31
*/
@Setter
@Getter
public class SelectResult {
private List<String> columns;
private List<HashMap<String,Object>> datas;
private Integer total;
private Integer page;
private Integer limit;
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.3.0-SANPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-metadata</artifactId>
<packaging>pom</packaging>
<modules>
<module>dlink-metadata-base</module>
</modules>
</project>
\ No newline at end of file
......@@ -17,6 +17,8 @@
<module>dlink-web</module>
<module>dlink-admin</module>
<module>dlink-assembly</module>
<module>dlink-common</module>
<module>dlink-metadata</module>
</modules>
<properties>
......@@ -151,6 +153,11 @@
<artifactId>dlink-function</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment