Unverified Commit d0146a4d authored by xiaofeng's avatar xiaofeng Committed by GitHub

Optimized connection pooling and connection creation (#900)

Co-authored-by: 's avatarliuxiaofeng <liuxiaofeng@xcxd.com>
parent c005a002
......@@ -21,6 +21,7 @@
package com.dlink.metadata.driver;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.dlink.assertion.Asserts;
......@@ -48,7 +49,7 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
private static Logger logger = LoggerFactory.getLogger(AbstractJdbcDriver.class);
protected Connection conn;
protected static ThreadLocal<Connection> conn = new ThreadLocal<>();
private DruidDataSource dataSource;
......@@ -67,13 +68,14 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
return CommonConstant.HEALTHY;
}
public DruidDataSource getDataSource() throws SQLException {
public DruidDataSource createDataSource() throws SQLException {
if (null == dataSource) {
synchronized (this) {
synchronized (this.getClass()) {
if (null == dataSource) {
this.dataSource = new DruidDataSource();
assembleConfig(config);
dataSource.init();
DruidDataSource ds = new DruidDataSource();
createDataSource(ds, config);
ds.init();
this.dataSource = ds;
}
}
}
......@@ -83,37 +85,39 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
public Driver setDriverConfig(DriverConfig config) {
this.config = config;
try {
this.dataSource = getDataSource();
this.dataSource = createDataSource();
} catch (SQLException e) {
throw new RuntimeException(e);
}
assembleConfig(config);
createDataSource(dataSource, config);
return this;
}
private void assembleConfig(DriverConfig config) {
dataSource.setName(config.getName());
dataSource.setUrl(config.getUrl());
dataSource.setDriverClassName(getDriverClass());
dataSource.setUsername(config.getUsername());
dataSource.setPassword(config.getPassword());
dataSource.setValidationQuery("select 1");
dataSource.setTestOnBorrow(true);
dataSource.setTestWhileIdle(true);
dataSource.setBreakAfterAcquireFailure(true);
dataSource.setFailFast(true);
dataSource.setInitialSize(1);
dataSource.setMaxActive(8);
dataSource.setMinIdle(5);
private void createDataSource(DruidDataSource ds, DriverConfig config) {
ds.setName(config.getName().replaceAll(":", ""));
ds.setUrl(config.getUrl());
ds.setDriverClassName(getDriverClass());
ds.setUsername(config.getUsername());
ds.setPassword(config.getPassword());
ds.setValidationQuery("select 1");
ds.setTestWhileIdle(true);
ds.setBreakAfterAcquireFailure(true);
ds.setFailFast(true);
ds.setInitialSize(1);
ds.setMaxActive(8);
ds.setMinIdle(5);
}
@Override
public Driver connect() {
try {
Class.forName(getDriverClass());
conn = getDataSource().getConnection();
} catch (ClassNotFoundException | SQLException e) {
throw new RuntimeException(e);
if (null == conn.get()) {
try {
Class.forName(getDriverClass());
DruidPooledConnection connection = createDataSource().getConnection();
conn.set(connection);
} catch (ClassNotFoundException | SQLException e) {
throw new RuntimeException(e);
}
}
return this;
}
......@@ -121,8 +125,8 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
@Override
public boolean isHealth() {
try {
if (Asserts.isNotNull(conn)) {
return !conn.isClosed();
if (Asserts.isNotNull(conn.get())) {
return !conn.get().isClosed();
}
return false;
} catch (Exception e) {
......@@ -134,8 +138,9 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
@Override
public void close() {
try {
if (Asserts.isNotNull(conn)) {
conn.close();
if (Asserts.isNotNull(conn.get())) {
conn.get().close();
conn.remove();
}
} catch (SQLException e) {
e.printStackTrace();
......@@ -162,7 +167,7 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
ResultSet results = null;
String schemasSql = getDBQuery().schemaAllSql();
try {
preparedStatement = conn.prepareStatement(schemasSql);
preparedStatement = conn.get().prepareStatement(schemasSql);
results = preparedStatement.executeQuery();
while (results.next()) {
String schemaName = results.getString(getDBQuery().schemaName());
......@@ -187,7 +192,7 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
IDBQuery dbQuery = getDBQuery();
String sql = dbQuery.tablesSql(schemaName);
try {
preparedStatement = conn.prepareStatement(sql);
preparedStatement = conn.get().prepareStatement(sql);
results = preparedStatement.executeQuery();
ResultSetMetaData metaData = results.getMetaData();
List<String> columnList = new ArrayList<>();
......@@ -244,7 +249,7 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
String tableFieldsSql = dbQuery.columnsSql(schemaName, tableName);
tableFieldsSql = String.format(tableFieldsSql, tableName);
try {
preparedStatement = conn.prepareStatement(tableFieldsSql);
preparedStatement = conn.get().prepareStatement(tableFieldsSql);
results = preparedStatement.executeQuery();
ResultSetMetaData metaData = results.getMetaData();
List<String> columnList = new ArrayList<>();
......@@ -352,7 +357,7 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
ResultSet results = null;
String createTableSql = getDBQuery().createTableSql(table.getSchema(), table.getName());
try {
preparedStatement = conn.prepareStatement(createTableSql);
preparedStatement = conn.get().prepareStatement(createTableSql);
results = preparedStatement.executeQuery();
if (results.next()) {
createTable = results.getString(getDBQuery().createTableName());
......@@ -390,7 +395,8 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
@Override
public boolean execute(String sql) throws Exception {
Asserts.checkNullString(sql, "Sql 语句为空");
try (Statement statement = conn.createStatement()) {
try (Statement statement = conn.get().createStatement()) {
// logger.info("执行sql的连接id:" + ((DruidPooledConnection) conn).getTransactionInfo().getId());
statement.execute(sql);
}
return true;
......@@ -400,7 +406,7 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
public int executeUpdate(String sql) throws Exception {
Asserts.checkNullString(sql, "Sql 语句为空");
int res = 0;
try (Statement statement = conn.createStatement()) {
try (Statement statement = conn.get().createStatement()) {
res = statement.executeUpdate(sql);
}
return res;
......@@ -419,7 +425,7 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
ResultSet results = null;
int count = 0;
try {
preparedStatement = conn.prepareStatement(sql);
preparedStatement = conn.get().prepareStatement(sql);
results = preparedStatement.executeQuery();
if (Asserts.isNull(results)) {
result.setSuccess(true);
......
......@@ -28,6 +28,7 @@ import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.result.SqlExplainResult;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
import java.util.Optional;
......@@ -55,20 +56,32 @@ public interface Driver {
static Driver build(DriverConfig config) {
String key = config.getName();
if (DriverPool.exist(key)) {
Driver driver = DriverPool.get(key);
if (driver.isHealth()) {
return driver;
return getHealthDriver(key);
}
synchronized (Driver.class) {
if (DriverPool.exist(key)) {
return getHealthDriver(key);
}
Optional<Driver> optionalDriver = Driver.get(config);
if (!optionalDriver.isPresent()) {
throw new MetaDataException("缺少数据源类型【" + config.getType() + "】的依赖,请在 lib 下添加对应的扩展依赖");
}
Driver driver = optionalDriver.get().connect();
DriverPool.push(key, driver);
return driver;
}
Optional<Driver> optionalDriver = Driver.get(config);
if (!optionalDriver.isPresent()) {
throw new MetaDataException("缺少数据源类型【" + config.getType() + "】的依赖,请在 lib 下添加对应的扩展依赖");
}
static Driver getHealthDriver(String key) {
Driver driver = DriverPool.get(key);
if (driver.isHealth()) {
return driver;
} else {
return driver.connect();
}
Driver driver = optionalDriver.get().connect();
DriverPool.push(key, driver);
return driver;
}
Driver setDriverConfig(DriverConfig config);
boolean canHandle(String type);
......
......@@ -112,7 +112,7 @@ public class ClickHouseDriver extends AbstractJdbcDriver {
}
continue;
}
preparedStatement = conn.prepareStatement("explain " + current);
preparedStatement = conn.get().prepareStatement("explain " + current);
results = preparedStatement.executeQuery();
while (results.next()) {
explain.append(getTypeConvert().convertValue(results, "explain", "string") + "\r\n");
......
......@@ -106,7 +106,7 @@ public class HiveDriver extends AbstractJdbcDriver implements Driver {
String sql = dbQuery.tablesSql(schemaName);
try {
execute(String.format(HiveConstant.USE_DB, schemaName));
preparedStatement = conn.prepareStatement(sql);
preparedStatement = conn.get().prepareStatement(sql);
results = preparedStatement.executeQuery();
ResultSetMetaData metaData = results.getMetaData();
List<String> columnList = new ArrayList<>();
......@@ -155,7 +155,7 @@ public class HiveDriver extends AbstractJdbcDriver implements Driver {
ResultSet results = null;
String schemasSql = getDBQuery().schemaAllSql();
try {
preparedStatement = conn.prepareStatement(schemasSql);
preparedStatement = conn.get().prepareStatement(schemasSql);
results = preparedStatement.executeQuery();
while (results.next()) {
String schemaName = results.getString(getDBQuery().schemaName());
......@@ -183,7 +183,7 @@ public class HiveDriver extends AbstractJdbcDriver implements Driver {
IDBQuery dbQuery = getDBQuery();
String tableFieldsSql = dbQuery.columnsSql(schemaName, tableName);
try {
preparedStatement = conn.prepareStatement(tableFieldsSql);
preparedStatement = conn.get().prepareStatement(tableFieldsSql);
results = preparedStatement.executeQuery();
ResultSetMetaData metaData = results.getMetaData();
List<String> columnList = new ArrayList<>();
......@@ -227,7 +227,7 @@ public class HiveDriver extends AbstractJdbcDriver implements Driver {
ResultSet results = null;
String createTableSql = getDBQuery().createTableSql(table.getSchema(), table.getName());
try {
preparedStatement = conn.prepareStatement(createTableSql);
preparedStatement = conn.get().prepareStatement(createTableSql);
results = preparedStatement.executeQuery();
while (results.next()) {
createTable.append(results.getString(getDBQuery().createTableName())).append("\n");
......@@ -246,7 +246,7 @@ public class HiveDriver extends AbstractJdbcDriver implements Driver {
Asserts.checkNullString(sql, "Sql 语句为空");
String querySQL = sql.trim().replaceAll(";$", "");
int res = 0;
try (Statement statement = conn.createStatement()) {
try (Statement statement = conn.get().createStatement()) {
res = statement.executeUpdate(querySQL);
}
return res;
......@@ -266,7 +266,7 @@ public class HiveDriver extends AbstractJdbcDriver implements Driver {
int count = 0;
try {
String querySQL = sql.trim().replaceAll(";$", "");
preparedStatement = conn.prepareStatement(querySQL);
preparedStatement = conn.get().prepareStatement(querySQL);
results = preparedStatement.executeQuery();
if (Asserts.isNull(results)) {
result.setSuccess(true);
......
......@@ -30,6 +30,8 @@ import com.dlink.model.Column;
import com.dlink.model.Table;
import org.apache.commons.lang3.StringUtils;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
......@@ -91,9 +93,10 @@ public class PhoenixDriver extends AbstractJdbcDriver {
Properties properties = new Properties();
properties.put("phoenix.schema.isNamespaceMappingEnabled", "true");
properties.put("phoenix.schema.mapSystemTablesToNamespac", "true");
conn = DriverManager.getConnection(config.getUrl(), properties);
Connection connection = DriverManager.getConnection(config.getUrl(), properties);
conn.set(connection);
//设置为自动提交,否则upsert语句不生效
conn.setAutoCommit(true);
connection.setAutoCommit(true);
} catch (ClassNotFoundException | SQLException e) {
throw new RuntimeException(e);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment