Unverified Commit 0765de2c authored by iamhungry's avatar iamhungry Committed by GitHub

整库同步自动建库建表--MySQL实现 (#796)

* 整库同步自动建库建表--MySQL实现

* Table add clone method

* add sink.auto.create

* add schema.changes

* fix auto.create

* merge fix

* refactor && merge fix

* fix pulsar getKey

* merge to dev & fix

* for debug

* cdcsource merge dev

* fix pulsar getKey

* cdcsource sink mysql auto create database and tables

* fix checkstyle & log

* reset application.yml
parent bb0d3cf7
...@@ -77,7 +77,7 @@ import org.slf4j.LoggerFactory; ...@@ -77,7 +77,7 @@ import org.slf4j.LoggerFactory;
* @author wenmo * @author wenmo
* @since 2022/4/12 21:28 * @since 2022/4/12 21:28
**/ **/
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder implements SinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class); protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
...@@ -307,7 +307,7 @@ public abstract class AbstractSinkBuilder { ...@@ -307,7 +307,7 @@ public abstract class AbstractSinkBuilder {
} }
} }
protected String getSinkSchemaName(Table table) { public String getSinkSchemaName(Table table) {
String schemaName = table.getSchema(); String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) { if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db"); schemaName = config.getSink().get("sink.db");
...@@ -315,7 +315,7 @@ public abstract class AbstractSinkBuilder { ...@@ -315,7 +315,7 @@ public abstract class AbstractSinkBuilder {
return schemaName; return schemaName;
} }
protected String getSinkTableName(Table table) { public String getSinkTableName(Table table) {
String tableName = table.getName(); String tableName = table.getName();
if (config.getSink().containsKey("table.prefix.schema")) { if (config.getSink().containsKey("table.prefix.schema")) {
if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) { if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) {
......
...@@ -21,6 +21,7 @@ package com.dlink.cdc; ...@@ -21,6 +21,7 @@ package com.dlink.cdc;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...@@ -38,4 +39,8 @@ public interface SinkBuilder { ...@@ -38,4 +39,8 @@ public interface SinkBuilder {
SinkBuilder create(FlinkCDCConfig config); SinkBuilder create(FlinkCDCConfig config);
DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource); DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource);
String getSinkSchemaName(Table table);
String getSinkTableName(Table table);
} }
...@@ -43,7 +43,7 @@ import java.util.Map; ...@@ -43,7 +43,7 @@ import java.util.Map;
* @author wenmo * @author wenmo
* @since 2022/4/20 19:20 * @since 2022/4/20 19:20
**/ **/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class DorisSinkBuilder extends AbstractSinkBuilder implements Serializable {
private static final String KEY_WORD = "datastream-doris"; private static final String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L; private static final long serialVersionUID = 8330362249137471854L;
......
...@@ -49,7 +49,7 @@ import java.util.Map; ...@@ -49,7 +49,7 @@ import java.util.Map;
* @author wenmo * @author wenmo
* @since 2022/4/22 23:50 * @since 2022/4/22 23:50
*/ */
public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class HudiSinkBuilder extends AbstractSinkBuilder implements Serializable {
private static final String KEY_WORD = "datastream-hudi"; private static final String KEY_WORD = "datastream-hudi";
private static final long serialVersionUID = 5324199407472847422L; private static final long serialVersionUID = 5324199407472847422L;
......
...@@ -53,7 +53,7 @@ import java.util.Map; ...@@ -53,7 +53,7 @@ import java.util.Map;
* @author wenmo * @author wenmo
* @since 2022/4/12 21:29 * @since 2022/4/12 21:29
**/ **/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializable {
private static final String KEY_WORD = "datastream-kafka"; private static final String KEY_WORD = "datastream-kafka";
......
...@@ -84,6 +84,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -84,6 +84,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
String distributionFactorLower = config.getSource().get("chunk-key.even-distribution.factor.upper-bound"); String distributionFactorLower = config.getSource().get("chunk-key.even-distribution.factor.upper-bound");
String distributionFactorUpper = config.getSource().get("chunk-key.even-distribution.factor.lower-bound"); String distributionFactorUpper = config.getSource().get("chunk-key.even-distribution.factor.lower-bound");
String scanNewlyAddedTableEnabled = config.getSource().get("scan.newly-added-table.enabled"); String scanNewlyAddedTableEnabled = config.getSource().get("scan.newly-added-table.enabled");
String schemaChanges = config.getSource().get("schema.changes");
Properties debeziumProperties = new Properties(); Properties debeziumProperties = new Properties();
// 为部分转换添加默认值 // 为部分转换添加默认值
...@@ -110,6 +111,10 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -110,6 +111,10 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
.username(config.getUsername()) .username(config.getUsername())
.password(config.getPassword()); .password(config.getPassword());
if (Asserts.isEqualsIgnoreCase(schemaChanges, "true")) {
sourceBuilder.includeSchemaChanges(true);
}
if (Asserts.isNotNullString(database)) { if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT); String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases); sourceBuilder.databaseList(databases);
......
...@@ -75,7 +75,7 @@ import javax.xml.bind.DatatypeConverter; ...@@ -75,7 +75,7 @@ import javax.xml.bind.DatatypeConverter;
* @author wenmo * @author wenmo
* @since 2022/4/25 23:02 * @since 2022/4/25 23:02
*/ */
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class SQLSinkBuilder extends AbstractSinkBuilder implements Serializable {
private static final String KEY_WORD = "sql"; private static final String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L; private static final long serialVersionUID = -3699685106324048226L;
......
...@@ -78,7 +78,7 @@ import org.slf4j.LoggerFactory; ...@@ -78,7 +78,7 @@ import org.slf4j.LoggerFactory;
* @author wenmo * @author wenmo
* @since 2022/4/12 21:28 * @since 2022/4/12 21:28
**/ **/
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder implements SinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class); protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
...@@ -342,7 +342,7 @@ public abstract class AbstractSinkBuilder { ...@@ -342,7 +342,7 @@ public abstract class AbstractSinkBuilder {
} }
} }
protected String getSinkSchemaName(Table table) { public String getSinkSchemaName(Table table) {
String schemaName = table.getSchema(); String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) { if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db"); schemaName = config.getSink().get("sink.db");
...@@ -350,7 +350,7 @@ public abstract class AbstractSinkBuilder { ...@@ -350,7 +350,7 @@ public abstract class AbstractSinkBuilder {
return schemaName; return schemaName;
} }
protected String getSinkTableName(Table table) { public String getSinkTableName(Table table) {
String tableName = table.getName(); String tableName = table.getName();
if (config.getSink().containsKey("table.prefix.schema")) { if (config.getSink().containsKey("table.prefix.schema")) {
if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) { if (Boolean.valueOf(config.getSink().get("table.prefix.schema"))) {
......
...@@ -21,6 +21,7 @@ package com.dlink.cdc; ...@@ -21,6 +21,7 @@ package com.dlink.cdc;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...@@ -38,4 +39,8 @@ public interface SinkBuilder { ...@@ -38,4 +39,8 @@ public interface SinkBuilder {
SinkBuilder create(FlinkCDCConfig config); SinkBuilder create(FlinkCDCConfig config);
DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource); DataStreamSource build(CDCBuilder cdcBuilder, StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource<String> dataStreamSource);
String getSinkSchemaName(Table table);
String getSinkTableName(Table table);
} }
...@@ -45,7 +45,7 @@ import java.util.Properties; ...@@ -45,7 +45,7 @@ import java.util.Properties;
/** /**
* DorisSinkBuilder * DorisSinkBuilder
**/ **/
public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class DorisSinkBuilder extends AbstractSinkBuilder implements Serializable {
private static final String KEY_WORD = "datastream-doris"; private static final String KEY_WORD = "datastream-doris";
private static final long serialVersionUID = 8330362249137471854L; private static final long serialVersionUID = 8330362249137471854L;
......
...@@ -55,7 +55,7 @@ import java.util.Map; ...@@ -55,7 +55,7 @@ import java.util.Map;
* @author wenmo * @author wenmo
* @since 2022/4/12 21:29 * @since 2022/4/12 21:29
**/ **/
public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializable {
private static final String KEY_WORD = "datastream-kafka"; private static final String KEY_WORD = "datastream-kafka";
......
...@@ -39,7 +39,7 @@ import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; ...@@ -39,7 +39,7 @@ import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
/** /**
* @className: com.dlink.cdc.kafka.KafkaSinkSimpleBuilder * @className: com.dlink.cdc.kafka.KafkaSinkSimpleBuilder
*/ */
public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements Serializable {
private static final String KEY_WORD = "datastream-kafka-json"; private static final String KEY_WORD = "datastream-kafka-json";
private transient ObjectMapper objectMapper; private transient ObjectMapper objectMapper;
......
...@@ -81,7 +81,8 @@ import javax.xml.bind.DatatypeConverter; ...@@ -81,7 +81,8 @@ import javax.xml.bind.DatatypeConverter;
* @author wenmo * @author wenmo
* @since 2022/4/25 23:02 * @since 2022/4/25 23:02
*/ */
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class SQLSinkBuilder extends AbstractSinkBuilder implements Serializable {
private static final String KEY_WORD = "sql"; private static final String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L; private static final long serialVersionUID = -3699685106324048226L;
private static AtomicInteger atomicInteger = new AtomicInteger(0); private static AtomicInteger atomicInteger = new AtomicInteger(0);
......
...@@ -35,7 +35,7 @@ import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; ...@@ -35,7 +35,7 @@ import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
* StarrocksSinkBuilder * StarrocksSinkBuilder
* *
**/ **/
public class StarrocksSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class StarrocksSinkBuilder extends AbstractSinkBuilder implements Serializable {
private static final String KEY_WORD = "datastream-starrocks"; private static final String KEY_WORD = "datastream-starrocks";
private static final long serialVersionUID = 8330362249137431824L; private static final long serialVersionUID = 8330362249137431824L;
......
...@@ -74,6 +74,27 @@ public class FlinkCDCConfig { ...@@ -74,6 +74,27 @@ public class FlinkCDCConfig {
this.jdbc = jdbc; this.jdbc = jdbc;
} }
public void init(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table,
String startupMode,
Map<String, String> split, Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) {
this.type = type;
this.hostname = hostname;
this.port = port;
this.username = username;
this.password = password;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.database = database;
this.schema = schema;
this.table = table;
this.startupMode = startupMode;
this.split = split;
this.debezium = debezium;
this.source = source;
this.sink = sink;
this.jdbc = jdbc;
}
public String getType() { public String getType() {
return type; return type;
} }
...@@ -177,6 +198,7 @@ public class FlinkCDCConfig { ...@@ -177,6 +198,7 @@ public class FlinkCDCConfig {
private boolean skip(String key) { private boolean skip(String key) {
switch (key) { switch (key) {
case "sink.db": case "sink.db":
case "auto.create":
case "table.prefix": case "table.prefix":
case "table.suffix": case "table.suffix":
case "table.upper": case "table.upper":
......
...@@ -41,7 +41,7 @@ import lombok.Setter; ...@@ -41,7 +41,7 @@ import lombok.Setter;
*/ */
@Getter @Getter
@Setter @Setter
public class Table implements Serializable, Comparable<Table> { public class Table implements Serializable, Comparable<Table>, Cloneable {
private static final long serialVersionUID = 4209205512472367171L; private static final long serialVersionUID = 4209205512472367171L;
...@@ -263,4 +263,15 @@ public class Table implements Serializable, Comparable<Table> { ...@@ -263,4 +263,15 @@ public class Table implements Serializable, Comparable<Table> {
sb.append(sourceName); sb.append(sourceName);
return sb.toString(); return sb.toString();
} }
@Override
public Object clone() {
Table table = null;
try {
table = (Table) super.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
return table;
}
} }
...@@ -19,10 +19,12 @@ ...@@ -19,10 +19,12 @@
package com.dlink.trans.ddl; package com.dlink.trans.ddl;
import static com.dlink.cdc.SinkBuilderFactory.buildSinkBuilder;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.CDCBuilderFactory; import com.dlink.cdc.CDCBuilderFactory;
import com.dlink.cdc.SinkBuilderFactory; import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.metadata.driver.Driver; import com.dlink.metadata.driver.Driver;
import com.dlink.metadata.driver.DriverConfig; import com.dlink.metadata.driver.DriverConfig;
...@@ -32,6 +34,7 @@ import com.dlink.model.Table; ...@@ -32,6 +34,7 @@ import com.dlink.model.Table;
import com.dlink.trans.AbstractOperation; import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation; import com.dlink.trans.Operation;
import com.dlink.utils.SplitUtil; import com.dlink.utils.SplitUtil;
import com.dlink.utils.SqlUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...@@ -81,6 +84,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -81,6 +84,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config); CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs(); Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
config.setSchemaFieldName(cdcBuilder.getSchemaFieldName()); config.setSchemaFieldName(cdcBuilder.getSchemaFieldName());
SinkBuilder sinkBuilder = buildSinkBuilder(config);
List<Schema> schemaList = new ArrayList<>(); List<Schema> schemaList = new ArrayList<>();
final List<String> schemaNameList = cdcBuilder.getSchemaList(); final List<String> schemaNameList = cdcBuilder.getSchemaList();
final List<String> tableRegList = cdcBuilder.getTableList(); final List<String> tableRegList = cdcBuilder.getTableList();
...@@ -92,6 +96,8 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -92,6 +96,8 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
// 这直接传正则过去 // 这直接传正则过去
schemaTableNameList.addAll(tableRegList.stream().map(x -> x.replaceFirst("\\\\.", ".")).collect(Collectors.toList())); schemaTableNameList.addAll(tableRegList.stream().map(x -> x.replaceFirst("\\\\.", ".")).collect(Collectors.toList()));
Driver sinkDriver = checkAndCreateSinkSchema(config, schemaTableNameList.get(0));
Set<Table> tables = driver.getSplitTables(tableRegList, cdcSource.getSplit()); Set<Table> tables = driver.getSplitTables(tableRegList, cdcSource.getSplit());
for (Table table : tables) { for (Table table : tables) {
...@@ -105,14 +111,23 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -105,14 +111,23 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
table.setColumns(driver.listColumnsSortByPK(schemaName, tableName)); table.setColumns(driver.listColumnsSortByPK(schemaName, tableName));
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
schemaList.add(schema); schemaList.add(schema);
}
if (null != sinkDriver) {
Table sinkTable = (Table) table.clone();
sinkTable.setSchema(sinkBuilder.getSinkSchemaName(table));
sinkTable.setName(sinkBuilder.getSinkTableName(table));
checkAndCreateSinkTable(sinkDriver, sinkTable);
}
}
} else { } else {
for (String schemaName : schemaNameList) { for (String schemaName : schemaNameList) {
Schema schema = Schema.build(schemaName); Schema schema = Schema.build(schemaName);
if (!allConfigMap.containsKey(schemaName)) { if (!allConfigMap.containsKey(schemaName)) {
continue; continue;
} }
Driver sinkDriver = checkAndCreateSinkSchema(config, schemaName);
DriverConfig driverConfig = DriverConfig.build(allConfigMap.get(schemaName)); DriverConfig driverConfig = DriverConfig.build(allConfigMap.get(schemaName));
Driver driver = Driver.build(driverConfig); Driver driver = Driver.build(driverConfig);
final List<Table> tables = driver.listTables(schemaName); final List<Table> tables = driver.listTables(schemaName);
...@@ -134,6 +149,15 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -134,6 +149,15 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
} }
} }
} }
if (null != sinkDriver) {
for (Table table : schema.getTables()) {
Table sinkTable = (Table) table.clone();
sinkTable.setSchema(sinkBuilder.getSinkSchemaName(table));
sinkTable.setName(sinkBuilder.getSinkTableName(table));
checkAndCreateSinkTable(sinkDriver, sinkTable);
}
}
schemaList.add(schema); schemaList.add(schema);
} }
} }
...@@ -156,11 +180,11 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -156,11 +180,11 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
DataStreamSource<String> streamSource = cdcBuilder.build(streamExecutionEnvironment); DataStreamSource<String> streamSource = cdcBuilder.build(streamExecutionEnvironment);
logger.info("Build " + config.getType() + " successful..."); logger.info("Build " + config.getType() + " successful...");
if (cdcSource.getSinks() == null || cdcSource.getSinks().size() == 0) { if (cdcSource.getSinks() == null || cdcSource.getSinks().size() == 0) {
SinkBuilderFactory.buildSinkBuilder(config).build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource); sinkBuilder.build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource);
} else { } else {
for (Map<String, String> sink : cdcSource.getSinks()) { for (Map<String, String> sink : cdcSource.getSinks()) {
config.setSink(sink); config.setSink(sink);
SinkBuilderFactory.buildSinkBuilder(config).build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource); sinkBuilder.build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource);
} }
} }
logger.info("Build CDCSOURCE Task successful!"); logger.info("Build CDCSOURCE Task successful!");
...@@ -170,4 +194,26 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -170,4 +194,26 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
return null; return null;
} }
Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception {
Map<String, String> sink = config.getSink();
String autoCreate = sink.get("auto.create");
if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) {
return null;
}
String url = sink.get("url");
String schema = SqlUtil.replaceAllParam(sink.get("sink.db"), "schemaName", schemaName);
Driver driver = Driver.build(sink.get("connector"), url, sink.get("username"), sink.get("password"));
if (null != driver && !driver.existSchema(schema)) {
driver.createSchema(schema);
}
sink.put("sink.db", schema);
sink.put("url", url + "/" + schema);
return driver;
}
void checkAndCreateSinkTable(Driver driver, Table table) throws Exception {
if (null != driver && !driver.existTable(table)) {
driver.generateCreateTable(table);
}
}
} }
...@@ -73,9 +73,9 @@ import com.alibaba.druid.sql.ast.SQLStatement; ...@@ -73,9 +73,9 @@ import com.alibaba.druid.sql.ast.SQLStatement;
**/ **/
public abstract class AbstractJdbcDriver extends AbstractDriver { public abstract class AbstractJdbcDriver extends AbstractDriver {
private static Logger logger = LoggerFactory.getLogger(AbstractJdbcDriver.class); protected static Logger logger = LoggerFactory.getLogger(AbstractJdbcDriver.class);
protected static ThreadLocal<Connection> conn = new ThreadLocal<>(); protected ThreadLocal<Connection> conn = new ThreadLocal<>();
private DruidDataSource dataSource; private DruidDataSource dataSource;
...@@ -211,6 +211,28 @@ public abstract class AbstractJdbcDriver extends AbstractDriver { ...@@ -211,6 +211,28 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
return schemas; return schemas;
} }
@Override
public boolean existSchema(String schemaName) {
return listSchemas().stream().anyMatch(schemaItem -> Asserts.isEquals(schemaItem.getName(), schemaName));
}
@Override
public boolean createSchema(String schemaName) throws Exception {
String sql = generateCreateSchemaSql(schemaName).replaceAll("\r\n", " ");
if (Asserts.isNotNull(sql)) {
return execute(sql);
} else {
return false;
}
}
@Override
public String generateCreateSchemaSql(String schemaName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE DATABASE ").append(schemaName);
return sb.toString();
}
@Override @Override
public List<Table> listTables(String schemaName) { public List<Table> listTables(String schemaName) {
List<Table> tableList = new ArrayList<>(); List<Table> tableList = new ArrayList<>();
...@@ -357,6 +379,16 @@ public abstract class AbstractJdbcDriver extends AbstractDriver { ...@@ -357,6 +379,16 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
} }
} }
@Override
public boolean generateCreateTable(Table table) throws Exception {
String sql = generateCreateTableSql(table).replaceAll("\r\n", " ");
if (Asserts.isNotNull(sql)) {
return execute(sql);
} else {
return false;
}
}
@Override @Override
public boolean dropTable(Table table) throws Exception { public boolean dropTable(Table table) throws Exception {
String sql = getDropTableSql(table).replaceAll("\r\n", " "); String sql = getDropTableSql(table).replaceAll("\r\n", " ");
...@@ -430,6 +462,13 @@ public abstract class AbstractJdbcDriver extends AbstractDriver { ...@@ -430,6 +462,13 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
return sb.toString(); return sb.toString();
} }
//todu impl by subclass
@Override
public String generateCreateTableSql(Table table) {
StringBuilder sb = new StringBuilder();
return sb.toString();
}
@Override @Override
public boolean execute(String sql) throws Exception { public boolean execute(String sql) throws Exception {
Asserts.checkNullString(sql, "Sql 语句为空"); Asserts.checkNullString(sql, "Sql 语句为空");
......
...@@ -60,9 +60,6 @@ public interface Driver { ...@@ -60,9 +60,6 @@ public interface Driver {
return getHealthDriver(key); return getHealthDriver(key);
} }
synchronized (Driver.class) { synchronized (Driver.class) {
if (DriverPool.exist(key)) {
return getHealthDriver(key);
}
Optional<Driver> optionalDriver = Driver.get(config); Optional<Driver> optionalDriver = Driver.get(config);
if (!optionalDriver.isPresent()) { if (!optionalDriver.isPresent()) {
throw new MetaDataException("缺少数据源类型【" + config.getType() + "】的依赖,请在 lib 下添加对应的扩展依赖"); throw new MetaDataException("缺少数据源类型【" + config.getType() + "】的依赖,请在 lib 下添加对应的扩展依赖");
...@@ -82,6 +79,36 @@ public interface Driver { ...@@ -82,6 +79,36 @@ public interface Driver {
} }
} }
static Driver build(String connector, String url, String username, String password) {
String type = null;
if (Asserts.isEqualsIgnoreCase(connector, "doris")) {
type = "Doris";
} else if (Asserts.isEqualsIgnoreCase(connector, "starrocks")) {
type = "StarRocks";
} else if (Asserts.isEqualsIgnoreCase(connector, "clickhouse")) {
type = "ClickHouse";
} else if (Asserts.isEqualsIgnoreCase(connector, "jdbc")) {
if (url.startsWith("jdbc:mysql")) {
type = "MySQL";
} else if (url.startsWith("jdbc:postgresql")) {
type = "PostgreSql";
} else if (url.startsWith("jdbc:oracle")) {
type = "Oracle";
} else if (url.startsWith("jdbc:sqlserver")) {
type = "SQLServer";
} else if (url.startsWith("jdbc:phoenix")) {
type = "Phoenix";
} else if (url.startsWith("jdbc:pivotal")) {
type = "Greenplum";
}
}
if (Asserts.isNull(type)) {
throw new MetaDataException("缺少数据源类型:【" + connector + "】");
}
DriverConfig driverConfig = new DriverConfig(url, type, url, username, password);
return build(driverConfig);
}
Driver setDriverConfig(DriverConfig config); Driver setDriverConfig(DriverConfig config);
boolean canHandle(String type); boolean canHandle(String type);
...@@ -100,6 +127,12 @@ public interface Driver { ...@@ -100,6 +127,12 @@ public interface Driver {
List<Schema> listSchemas(); List<Schema> listSchemas();
boolean existSchema(String schemaName);
boolean createSchema(String schemaName) throws Exception;
String generateCreateSchemaSql(String schemaName);
List<Table> listTables(String schemaName); List<Table> listTables(String schemaName);
List<Column> listColumns(String schemaName, String tableName); List<Column> listColumns(String schemaName, String tableName);
...@@ -116,6 +149,8 @@ public interface Driver { ...@@ -116,6 +149,8 @@ public interface Driver {
boolean createTable(Table table) throws Exception; boolean createTable(Table table) throws Exception;
boolean generateCreateTable(Table table) throws Exception;
boolean dropTable(Table table) throws Exception; boolean dropTable(Table table) throws Exception;
boolean truncateTable(Table table) throws Exception; boolean truncateTable(Table table) throws Exception;
...@@ -126,6 +161,8 @@ public interface Driver { ...@@ -126,6 +161,8 @@ public interface Driver {
String getTruncateTableSql(Table table); String getTruncateTableSql(Table table);
String generateCreateTableSql(Table table);
/* boolean insert(Table table, JsonNode data); /* boolean insert(Table table, JsonNode data);
boolean update(Table table, JsonNode data); boolean update(Table table, JsonNode data);
......
...@@ -19,10 +19,13 @@ ...@@ -19,10 +19,13 @@
package com.dlink.metadata.driver; package com.dlink.metadata.driver;
import com.dlink.assertion.Asserts;
import com.dlink.metadata.convert.ITypeConvert; import com.dlink.metadata.convert.ITypeConvert;
import com.dlink.metadata.convert.MySqlTypeConvert; import com.dlink.metadata.convert.MySqlTypeConvert;
import com.dlink.metadata.query.IDBQuery; import com.dlink.metadata.query.IDBQuery;
import com.dlink.metadata.query.MySqlQuery; import com.dlink.metadata.query.MySqlQuery;
import com.dlink.model.Column;
import com.dlink.model.Table;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -69,4 +72,74 @@ public class MySqlDriver extends AbstractJdbcDriver { ...@@ -69,4 +72,74 @@ public class MySqlDriver extends AbstractJdbcDriver {
map.put("DATETIME", "TIMESTAMP"); map.put("DATETIME", "TIMESTAMP");
return map; return map;
} }
@Override
public String generateCreateTableSql(Table table) {
StringBuilder key = new StringBuilder();
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ").append(table.getSchemaTableName()).append(" (\n");
for (int i = 0; i < table.getColumns().size(); i++) {
Column column = table.getColumns().get(i);
sb.append(" `")
.append(column.getName()).append("` ")
.append(column.getType()).append(" ");
//todo tmp process for varchar
if (column.getType().equals("varchar")) {
sb.append("(255)");
}
if (column.getPrecision() > 0) {
sb.append("(").append(column.getPrecision());
if (column.getScale() > 0) {
sb.append(",").append(column.getScale());
}
sb.append(")");
}
if (Asserts.isNotNull(column.getCharacterSet())) {
sb.append(" CHARACTER SET ").append(column.getCharacterSet());
}
if (Asserts.isNotNull(column.getCollation())) {
sb.append(" COLLATE ").append(column.getCollation());
}
if (Asserts.isNotNull(column.getDefaultValue())) {
sb.append(" DEFAULT ").append(column.getDefaultValue());
} else {
if (!column.isNullable()) {
sb.append(" NOT ");
}
sb.append(" NULL ");
}
if (column.isAutoIncrement()) {
sb.append(" AUTO_INCREMENT ");
}
if (Asserts.isNotNullString(column.getComment())) {
sb.append(" COMMENT '").append(column.getComment()).append("'");
}
if (column.isKeyFlag()) {
key.append("`").append(column.getName()).append("`,");
}
if (i < table.getColumns().size() || key.length() > 0) {
sb.append(",");
}
sb.append("\n");
}
if (key.length() > 0) {
sb.append(" PRIMARY KEY (");
sb.append(key.substring(0, key.length() - 1));
sb.append(")\n");
}
sb.append(")\n ENGINE=").append(table.getEngine());
if (Asserts.isNotNullString(table.getOptions())) {
sb.append(" ").append(table.getOptions());
}
if (Asserts.isNotNullString(table.getComment())) {
sb.append(" COMMENT='").append(table.getComment()).append("'");
}
sb.append(";");
logger.info("Auto generateCreateTableSql {}", sb);
return sb.toString();
}
} }
...@@ -63,4 +63,11 @@ public class PostgreSqlDriver extends AbstractJdbcDriver { ...@@ -63,4 +63,11 @@ public class PostgreSqlDriver extends AbstractJdbcDriver {
public Map<String, String> getFlinkColumnTypeConversion() { public Map<String, String> getFlinkColumnTypeConversion() {
return new HashMap<>(); return new HashMap<>();
} }
@Override
public String generateCreateSchemaSql(String schemaName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE SCHEMA ").append(schemaName);
return sb.toString();
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment