Unverified Commit 1ba9edf5 authored by Kerwin's avatar Kerwin Committed by GitHub

Added dlink-catalog module code style. (#895)

parent 4eb1afc1
......@@ -17,12 +17,13 @@
*
*/
package com.dlink.flink.catalog;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
......@@ -72,8 +73,6 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
/**
* 自定义 catalog
* 检查connection done.
......@@ -100,7 +99,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
/**
* 判断是否发生过SQL异常,如果发生过,那么conn可能失效。要注意判断
*/
private boolean SQLExceptionHappened = false;
private boolean sqlExceptionHappened = false;
/**
* 对象类型,例如 库、表、视图等
......@@ -202,7 +201,6 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
this.pwd = pwd;
}
public DlinkMysqlCatalog(String name) {
super(name, defaultDatabase);
this.url = DlinkMysqlCatalogFactoryOptions.URL.defaultValue();
......@@ -232,7 +230,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
connection.close();
connection = null;
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("Fail to close connection.", e);
}
}
......@@ -247,8 +245,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (connection == null) {
connection = DriverManager.getConnection(url, user, pwd);
}
if (SQLExceptionHappened) {
SQLExceptionHappened = false;
if (sqlExceptionHappened) {
sqlExceptionHappened = false;
if (!connection.isValid(10)) {
connection.close();
}
......@@ -289,8 +287,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
String querySql = "SELECT id, database_name,description " +
" FROM metadata_database where database_name=?";
String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
......@@ -302,9 +299,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map<String, String> map = new HashMap<>();
String sql = "select `key`,`value` " +
"from metadata_database_property " +
"where database_id=? ";
String sql = "select `key`,`value` from metadata_database_property where database_id=? ";
try (PreparedStatement pStat = conn.prepareStatement(sql)) {
pStat.setInt(1, id);
ResultSet prs = pStat.executeQuery();
......@@ -312,7 +307,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
map.put(rs.getString("key"), rs.getString("value"));
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database properties in catalog %s", getName()), e);
}
......@@ -322,7 +317,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new DatabaseNotExistException(getName(), databaseName);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database in catalog %s", getName()), e);
}
......@@ -351,7 +346,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
return id;
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException(String.format("获取 database 信息失败:%s.%s", getName(), databaseName), e);
}
}
......@@ -380,8 +375,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ResultSet idRs = stat.getGeneratedKeys();
if (idRs.next() && db.getProperties() != null && db.getProperties().size() > 0) {
int id = idRs.getInt(1);
String propInsertSql = "insert into metadata_database_property(database_id, " +
"`key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_database_property(database_id, `key`,`value`) values (?,?,?)";
PreparedStatement pstat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : db.getProperties().entrySet()) {
pstat.setInt(1, id);
......@@ -394,7 +388,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("创建 database 信息失败:", e);
}
}
......@@ -446,7 +440,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("删除 database 信息失败:", e);
}
}
......@@ -476,9 +470,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
uState.executeUpdate();
uState.close();
if (newDb.getProperties() != null && newDb.getProperties().size() > 0) {
String upsertSql = "insert into metadata_database_property (database_id, `key`,`value`) \n" +
"values (?,?,?)\n" +
"on duplicate key update `value` =?, update_time = sysdate()\n";
String upsertSql = "insert into metadata_database_property (database_id, `key`,`value`) \n"
+ "values (?,?,?)\n"
+ "on duplicate key update `value` =?, update_time = sysdate()\n";
PreparedStatement pstat = conn.prepareStatement(upsertSql);
for (Map.Entry<String, String> entry : newDb.getProperties().entrySet()) {
pstat.setInt(1, id);
......@@ -492,7 +486,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改 database 信息失败:", e);
}
}
......@@ -552,10 +546,10 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection conn = getConnection();
try {
String queryTable = "SELECT table_name " +
" ,description, table_type " +
" FROM metadata_table " +
" where id=?";
String queryTable = "SELECT table_name "
+ " ,description, table_type "
+ " FROM metadata_table "
+ " where id=?";
PreparedStatement ps = conn.prepareStatement(queryTable);
ps.setInt(1, id);
ResultSet rs = ps.executeQuery();
......@@ -571,8 +565,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
if (tableType.equals(ObjectType.TABLE)) {
// 这个是 table
String propSql = "SELECT `key`, `value` from metadata_table_property " +
"WHERE table_id=?";
String propSql = "SELECT `key`, `value` from metadata_table_property "
+ "WHERE table_id=?";
PreparedStatement pState = conn.prepareStatement(propSql);
pState.setInt(1, id);
ResultSet prs = pState.executeQuery();
......@@ -588,9 +582,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
} else if (tableType.equals(ObjectType.VIEW)) {
// 1、从库中取出table信息。(前面已做)
// 2、取出字段。
String colSql = "SELECT column_name, column_type, data_type, description " +
" FROM metadata_column WHERE " +
" table_id=?";
String colSql = "SELECT column_name, column_type, data_type, description "
+ " FROM metadata_column WHERE "
+ " table_id=?";
PreparedStatement cStat = conn.prepareStatement(colSql);
cStat.setInt(1, id);
ResultSet crs = cStat.executeQuery();
......@@ -604,8 +598,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
cStat.close();
// 3、取出query
String qSql = "SELECT `key`, value FROM metadata_table_property" +
" WHERE table_id=? ";
String qSql = "SELECT `key`, value FROM metadata_table_property"
+ " WHERE table_id=? ";
PreparedStatement qStat = conn.prepareStatement(qSql);
qStat.setInt(1, id);
ResultSet qrs = qStat.executeQuery();
......@@ -630,7 +624,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new CatalogException("不支持的数据类型。" + tableType);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("获取 表信息失败。", e);
}
......@@ -648,8 +642,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return null;
}
// 获取id
String getIdSql = "select id from metadata_table " +
" where table_name=? and database_id=?";
String getIdSql = "select id from metadata_table "
+ " where table_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, tablePath.getObjectName());
......@@ -659,7 +653,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return rs.getInt(1);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("get table fail", e);
throw new CatalogException("get table fail.", e);
}
......@@ -678,27 +672,27 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
try {
// todo: 现在是真实删除,后续设计是否做记录保留。
conn.setAutoCommit(false);
String deletePropSql = "delete from metadata_table_property " +
" where table_id=?";
String deletePropSql = "delete from metadata_table_property "
+ " where table_id=?";
PreparedStatement dStat = conn.prepareStatement(deletePropSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteColSql = "delete from metadata_column " +
" where table_id=?";
String deleteColSql = "delete from metadata_column "
+ " where table_id=?";
dStat = conn.prepareStatement(deleteColSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteDbSql = "delete from metadata_table " +
" where id=?";
String deleteDbSql = "delete from metadata_table "
+ " where id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("drop table fail", e);
throw new CatalogException("drop table fail.", e);
}
......@@ -723,7 +717,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ps.setInt(2, id);
ps.executeUpdate();
} catch (SQLException ex) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
......@@ -731,8 +725,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Integer db_id = getDatabaseId(tablePath.getDatabaseName());
if (null == db_id) {
Integer dbId = getDatabaseId(tablePath.getDatabaseName());
if (null == dbId) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if (tableExists(tablePath)) {
......@@ -754,16 +748,16 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
// 首先插入表信息
CatalogBaseTable.TableKind kind = table.getTableKind();
String insertSql = "insert into metadata_table(\n" +
" table_name," +
" table_type," +
" database_id," +
" description)" +
" values(?,?,?,?)";
String insertSql = "insert into metadata_table(\n"
+ " table_name,"
+ " table_type,"
+ " database_id,"
+ " description)"
+ " values(?,?,?,?)";
PreparedStatement iStat = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS);
iStat.setString(1, tablePath.getObjectName());
iStat.setString(2, kind.toString());
iStat.setInt(3, db_id);
iStat.setInt(3, dbId);
iStat.setString(4, table.getComment());
iStat.executeUpdate();
ResultSet idRs = iStat.getGeneratedKeys();
......@@ -777,8 +771,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (table instanceof ResolvedCatalogTable) {
// table 就可以直接拿properties了。
Map<String, String> props = ((ResolvedCatalogTable) table).toProperties();
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_table_property(table_id,"
+ "`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : props.entrySet()) {
pStat.setInt(1, id);
......@@ -795,13 +789,13 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ResolvedCatalogView view = (ResolvedCatalogView) table;
List<Schema.UnresolvedColumn> cols = view.getUnresolvedSchema().getColumns();
if (cols.size() > 0) {
String colInsertSql = "insert into metadata_column(" +
" column_name, column_type, data_type" +
" , `expr`" +
" , description" +
" , table_id" +
" , `primary`) " +
" values(?,?,?,?,?,?,?)";
String colInsertSql = "insert into metadata_column("
+ " column_name, column_type, data_type"
+ " , `expr`"
+ " , description"
+ " , table_id"
+ " , `primary`) "
+ " values(?,?,?,?,?,?,?)";
PreparedStatement colIStat = conn.prepareStatement(colInsertSql);
for (Schema.UnresolvedColumn col : cols) {
if (col instanceof Schema.UnresolvedPhysicalColumn) {
......@@ -840,8 +834,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
option.put("OriginalQuery", view.getOriginalQuery());
option.put("ExpandedQuery", view.getExpandedQuery());
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_table_property(table_id,"
+ "`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : option.entrySet()) {
pStat.setInt(1, id);
......@@ -855,7 +849,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
conn.commit();
} catch (SQLException ex) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("插入数据库失败", ex);
throw new CatalogException("插入数据库失败", ex);
}
......@@ -872,9 +866,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map<String, String> opts = newTable.getOptions();
if (opts != null && opts.size() > 0) {
String updateSql = "INSERT INTO metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?) " +
"on duplicate key update `value` =?, update_time = sysdate()";
String updateSql = "INSERT INTO metadata_table_property(table_id,"
+ "`key`,`value`) values (?,?,?) "
+ "on duplicate key update `value` =?, update_time = sysdate()";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(updateSql)) {
for (Map.Entry<String, String> entry : opts.entrySet()) {
......@@ -886,7 +880,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
ps.executeBatch();
} catch (SQLException ex) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
......@@ -931,19 +925,19 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
......@@ -955,8 +949,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (null == dbId) {
throw new DatabaseNotExistException(getName(), dbName);
}
String querySql = "SELECT function_name from metadata_function " +
" WHERE database_id=?";
String querySql = "SELECT function_name from metadata_function "
+ " WHERE database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
......@@ -969,7 +963,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
return functions;
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("获取 UDF 列表失败");
}
}
......@@ -982,8 +976,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new FunctionNotExistException(getName(), functionPath);
}
String querySql = "SELECT class_name,function_language from metadata_function " +
" WHERE id=?";
String querySql = "SELECT class_name,function_language from metadata_function "
+ " WHERE id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
gStat.setInt(1, id);
......@@ -997,7 +991,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new FunctionNotExistException(getName(), functionPath);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("获取 UDF 失败:"
+ functionPath.getDatabaseName() + "."
+ functionPath.getObjectName());
......@@ -1016,8 +1010,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return null;
}
// 获取id
String getIdSql = "select id from metadata_function " +
" where function_name=? and database_id=?";
String getIdSql = "select id from metadata_function "
+ " where function_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, functionPath.getObjectName());
......@@ -1028,7 +1022,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return id;
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("get function fail", e);
throw new CatalogException("get function fail.", e);
}
......@@ -1049,9 +1043,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
Connection conn = getConnection();
String insertSql = "Insert into metadata_function " +
"(function_name,class_name,database_id,function_language) " +
" values (?,?,?,?)";
String insertSql = "Insert into metadata_function "
+ "(function_name,class_name,database_id,function_language) "
+ " values (?,?,?,?)";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, functionPath.getObjectName());
ps.setString(2, function.getClassName());
......@@ -1059,7 +1053,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ps.setString(4, function.getFunctionLanguage().toString());
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("创建 函数 失败", e);
}
}
......@@ -1076,16 +1070,16 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
Connection conn = getConnection();
String insertSql = "update metadata_function " +
"set (class_name =?, function_language=?) " +
" where id=?";
String insertSql = "update metadata_function "
+ "set (class_name =?, function_language=?) "
+ " where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, newFunction.getClassName());
ps.setString(2, newFunction.getFunctionLanguage().toString());
ps.setInt(3, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改 函数 失败", e);
}
}
......@@ -1102,13 +1096,12 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
Connection conn = getConnection();
String insertSql = "delete from metadata_function " +
" where id=?";
String insertSql = "delete from metadata_function where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setInt(1, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("删除 函数 失败", e);
}
}
......@@ -1165,21 +1158,21 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException, TablePartitionedException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
}
......@@ -17,11 +17,16 @@
*
*/
package com.dlink.flink.catalog.factory;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.PASSWORD;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.URL;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.USERNAME;
package com.dlink.flink.catalog.factory;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
......@@ -30,9 +35,6 @@ import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.*;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
/**
* Factory for {@link DlinkMysqlCatalog}.
*/
......
......@@ -17,11 +17,10 @@
*
*/
package com.dlink.flink.catalog.factory;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
......
......@@ -17,13 +17,13 @@
*
*/
package com.dlink.flink.catalog;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
......@@ -73,8 +73,6 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
/**
* 自定义 catalog
* 检查connection done.
......@@ -101,7 +99,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
/**
* 判断是否发生过SQL异常,如果发生过,那么conn可能失效。要注意判断
*/
private boolean SQLExceptionHappened = false;
private boolean sqlExceptionHappened = false;
/**
* 对象类型,例如 库、表、视图等
......@@ -203,7 +201,6 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
this.pwd = pwd;
}
public DlinkMysqlCatalog(String name) {
super(name, defaultDatabase);
this.url = DlinkMysqlCatalogFactoryOptions.URL.defaultValue();
......@@ -233,7 +230,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
connection.close();
connection = null;
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("Fail to close connection.", e);
}
}
......@@ -248,8 +245,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (connection == null) {
connection = DriverManager.getConnection(url, user, pwd);
}
if (SQLExceptionHappened) {
SQLExceptionHappened = false;
if (sqlExceptionHappened) {
sqlExceptionHappened = false;
if (!connection.isValid(10)) {
connection.close();
}
......@@ -290,8 +287,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
String querySql = "SELECT id, database_name,description " +
" FROM metadata_database where database_name=?";
String querySql = "SELECT id, database_name,description "
+ " FROM metadata_database where database_name=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
......@@ -303,9 +300,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map<String, String> map = new HashMap<>();
String sql = "select `key`,`value` " +
"from metadata_database_property " +
"where database_id=? ";
String sql = "select `key`,`value` "
+ "from metadata_database_property "
+ "where database_id=? ";
try (PreparedStatement pStat = conn.prepareStatement(sql)) {
pStat.setInt(1, id);
ResultSet prs = pStat.executeQuery();
......@@ -313,7 +310,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
map.put(rs.getString("key"), rs.getString("value"));
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database properties in catalog %s", getName()), e);
}
......@@ -323,7 +320,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new DatabaseNotExistException(getName(), databaseName);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database in catalog %s", getName()), e);
}
......@@ -352,7 +349,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
return id;
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException(String.format("获取 database 信息失败:%s.%s", getName(), databaseName), e);
}
}
......@@ -381,8 +378,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ResultSet idRs = stat.getGeneratedKeys();
if (idRs.next() && db.getProperties() != null && db.getProperties().size() > 0) {
int id = idRs.getInt(1);
String propInsertSql = "insert into metadata_database_property(database_id, " +
"`key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_database_property(database_id, "
+ "`key`,`value`) values (?,?,?)";
PreparedStatement pstat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : db.getProperties().entrySet()) {
pstat.setInt(1, id);
......@@ -395,7 +392,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("创建 database 信息失败:", e);
}
}
......@@ -447,7 +444,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("删除 database 信息失败:", e);
}
}
......@@ -477,9 +474,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
uState.executeUpdate();
uState.close();
if (newDb.getProperties() != null && newDb.getProperties().size() > 0) {
String upsertSql = "insert into metadata_database_property (database_id, `key`,`value`) \n" +
"values (?,?,?)\n" +
"on duplicate key update `value` =?, update_time = sysdate()\n";
String upsertSql = "insert into metadata_database_property (database_id, `key`,`value`) \n"
+ "values (?,?,?)\n"
+ "on duplicate key update `value` =?, update_time = sysdate()\n";
PreparedStatement pstat = conn.prepareStatement(upsertSql);
for (Map.Entry<String, String> entry : newDb.getProperties().entrySet()) {
pstat.setInt(1, id);
......@@ -493,7 +490,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改 database 信息失败:", e);
}
}
......@@ -553,10 +550,10 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection conn = getConnection();
try {
String queryTable = "SELECT table_name " +
" ,description, table_type " +
" FROM metadata_table " +
" where id=?";
String queryTable = "SELECT table_name "
+ " ,description, table_type "
+ " FROM metadata_table "
+ " where id=?";
PreparedStatement ps = conn.prepareStatement(queryTable);
ps.setInt(1, id);
ResultSet rs = ps.executeQuery();
......@@ -572,8 +569,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
if (tableType.equals(ObjectType.TABLE)) {
// 这个是 table
String propSql = "SELECT `key`, `value` from metadata_table_property " +
"WHERE table_id=?";
String propSql = "SELECT `key`, `value` from metadata_table_property "
+ "WHERE table_id=?";
PreparedStatement pState = conn.prepareStatement(propSql);
pState.setInt(1, id);
ResultSet prs = pState.executeQuery();
......@@ -589,9 +586,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
} else if (tableType.equals(ObjectType.VIEW)) {
// 1、从库中取出table信息。(前面已做)
// 2、取出字段。
String colSql = "SELECT column_name, column_type, data_type, description " +
" FROM metadata_column WHERE " +
" table_id=?";
String colSql = "SELECT column_name, column_type, data_type, description "
+ " FROM metadata_column WHERE "
+ " table_id=?";
PreparedStatement cStat = conn.prepareStatement(colSql);
cStat.setInt(1, id);
ResultSet crs = cStat.executeQuery();
......@@ -609,8 +606,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
cStat.close();
// 3、取出query
String qSql = "SELECT `key`, value FROM metadata_table_property" +
" WHERE table_id=? ";
String qSql = "SELECT `key`, value FROM metadata_table_property"
+ " WHERE table_id=? ";
PreparedStatement qStat = conn.prepareStatement(qSql);
qStat.setInt(1, id);
ResultSet qrs = qStat.executeQuery();
......@@ -635,7 +632,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new CatalogException("不支持的数据类型。" + tableType);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("获取 表信息失败。", e);
}
......@@ -653,8 +650,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return null;
}
// 获取id
String getIdSql = "select id from metadata_table " +
" where table_name=? and database_id=?";
String getIdSql = "select id from metadata_table "
+ " where table_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, tablePath.getObjectName());
......@@ -664,7 +661,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return rs.getInt(1);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("get table fail", e);
throw new CatalogException("get table fail.", e);
}
......@@ -683,27 +680,27 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
try {
// todo: 现在是真实删除,后续设计是否做记录保留。
conn.setAutoCommit(false);
String deletePropSql = "delete from metadata_table_property " +
" where table_id=?";
String deletePropSql = "delete from metadata_table_property "
+ " where table_id=?";
PreparedStatement dStat = conn.prepareStatement(deletePropSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteColSql = "delete from metadata_column " +
" where table_id=?";
String deleteColSql = "delete from metadata_column "
+ " where table_id=?";
dStat = conn.prepareStatement(deleteColSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteDbSql = "delete from metadata_table " +
" where id=?";
String deleteDbSql = "delete from metadata_table "
+ " where id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("drop table fail", e);
throw new CatalogException("drop table fail.", e);
}
......@@ -728,7 +725,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ps.setInt(2, id);
ps.executeUpdate();
} catch (SQLException ex) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
......@@ -736,8 +733,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Integer db_id = getDatabaseId(tablePath.getDatabaseName());
if (null == db_id) {
Integer dbId = getDatabaseId(tablePath.getDatabaseName());
if (null == dbId) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if (tableExists(tablePath)) {
......@@ -759,16 +756,16 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
// 首先插入表信息
CatalogBaseTable.TableKind kind = table.getTableKind();
String insertSql = "insert into metadata_table(\n" +
" table_name," +
" table_type," +
" database_id," +
" description)" +
" values(?,?,?,?)";
String insertSql = "insert into metadata_table(\n"
+ " table_name,"
+ " table_type,"
+ " database_id,"
+ " description)"
+ " values(?,?,?,?)";
PreparedStatement iStat = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS);
iStat.setString(1, tablePath.getObjectName());
iStat.setString(2, kind.toString());
iStat.setInt(3, db_id);
iStat.setInt(3, dbId);
iStat.setString(4, table.getComment());
iStat.executeUpdate();
ResultSet idRs = iStat.getGeneratedKeys();
......@@ -782,8 +779,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (table instanceof ResolvedCatalogTable) {
// table 就可以直接拿properties了。
Map<String, String> props = ((ResolvedCatalogTable) table).toProperties();
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_table_property(table_id,"
+ "`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : props.entrySet()) {
pStat.setInt(1, id);
......@@ -800,13 +797,13 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ResolvedCatalogView view = (ResolvedCatalogView) table;
List<Schema.UnresolvedColumn> cols = view.getUnresolvedSchema().getColumns();
if (cols.size() > 0) {
String colInsertSql = "insert into metadata_column(" +
" column_name, column_type, data_type" +
" , `expr`" +
" , description" +
" , table_id" +
" , `primary`) " +
" values(?,?,?,?,?,?,?)";
String colInsertSql = "insert into metadata_column("
+ " column_name, column_type, data_type"
+ " , `expr`"
+ " , description"
+ " , table_id"
+ " , `primary`) "
+ " values(?,?,?,?,?,?,?)";
PreparedStatement colIStat = conn.prepareStatement(colInsertSql);
for (Schema.UnresolvedColumn col : cols) {
if (col instanceof Schema.UnresolvedPhysicalColumn) {
......@@ -845,8 +842,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
option.put("OriginalQuery", view.getOriginalQuery());
option.put("ExpandedQuery", view.getExpandedQuery());
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_table_property(table_id,"
+ "`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : option.entrySet()) {
pStat.setInt(1, id);
......@@ -860,7 +857,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
conn.commit();
} catch (SQLException ex) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("插入数据库失败", ex);
throw new CatalogException("插入数据库失败", ex);
}
......@@ -877,9 +874,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map<String, String> opts = newTable.getOptions();
if (opts != null && opts.size() > 0) {
String updateSql = "INSERT INTO metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?) " +
"on duplicate key update `value` =?, update_time = sysdate()";
String updateSql = "INSERT INTO metadata_table_property(table_id,"
+ "`key`,`value`) values (?,?,?) "
+ "on duplicate key update `value` =?, update_time = sysdate()";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(updateSql)) {
for (Map.Entry<String, String> entry : opts.entrySet()) {
......@@ -891,7 +888,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
ps.executeBatch();
} catch (SQLException ex) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
......@@ -936,19 +933,19 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
......@@ -960,8 +957,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (null == dbId) {
throw new DatabaseNotExistException(getName(), dbName);
}
String querySql = "SELECT function_name from metadata_function " +
" WHERE database_id=?";
String querySql = "SELECT function_name from metadata_function "
+ " WHERE database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
......@@ -974,7 +971,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
return functions;
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("获取 UDF 列表失败");
}
}
......@@ -987,8 +984,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new FunctionNotExistException(getName(), functionPath);
}
String querySql = "SELECT class_name,function_language from metadata_function " +
" WHERE id=?";
String querySql = "SELECT class_name,function_language from metadata_function "
+ " WHERE id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
gStat.setInt(1, id);
......@@ -1002,7 +999,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new FunctionNotExistException(getName(), functionPath);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("获取 UDF 失败:"
+ functionPath.getDatabaseName() + "."
+ functionPath.getObjectName());
......@@ -1021,8 +1018,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return null;
}
// 获取id
String getIdSql = "select id from metadata_function " +
" where function_name=? and database_id=?";
String getIdSql = "select id from metadata_function "
+ " where function_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, functionPath.getObjectName());
......@@ -1033,7 +1030,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return id;
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("get function fail", e);
throw new CatalogException("get function fail.", e);
}
......@@ -1054,9 +1051,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
Connection conn = getConnection();
String insertSql = "Insert into metadata_function " +
"(function_name,class_name,database_id,function_language) " +
" values (?,?,?,?)";
String insertSql = "Insert into metadata_function "
+ "(function_name,class_name,database_id,function_language) "
+ " values (?,?,?,?)";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, functionPath.getObjectName());
ps.setString(2, function.getClassName());
......@@ -1064,7 +1061,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ps.setString(4, function.getFunctionLanguage().toString());
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("创建 函数 失败", e);
}
}
......@@ -1081,16 +1078,16 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
Connection conn = getConnection();
String insertSql = "update metadata_function " +
"set (class_name =?, function_language=?) " +
" where id=?";
String insertSql = "update metadata_function "
+ "set (class_name =?, function_language=?) "
+ " where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, newFunction.getClassName());
ps.setString(2, newFunction.getFunctionLanguage().toString());
ps.setInt(3, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改 函数 失败", e);
}
}
......@@ -1107,13 +1104,13 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
Connection conn = getConnection();
String insertSql = "delete from metadata_function " +
" where id=?";
String insertSql = "delete from metadata_function "
+ " where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setInt(1, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("删除 函数 失败", e);
}
}
......@@ -1170,21 +1167,21 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException, TablePartitionedException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
}
......@@ -19,7 +19,14 @@
package com.dlink.flink.catalog.factory;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.PASSWORD;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.URL;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.USERNAME;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
......@@ -28,9 +35,6 @@ import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.*;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
/**
* Factory for {@link DlinkMysqlCatalog}.
*/
......
......@@ -17,10 +17,10 @@
*
*/
package com.dlink.flink.catalog.factory;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
......
......@@ -17,16 +17,16 @@
*
*/
package com.dlink.flink.catalog;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.junit.Before;
import org.junit.Test;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
public class DlinkMysqlCatalogTest {
protected static String url;
......@@ -56,12 +56,12 @@ public class DlinkMysqlCatalogTest {
@Test
public void testSqlCatalog() {
String createSql = "create catalog myCatalog \n" +
" with('type'='dlink_mysql',\n" +
" 'username'='dlink',\n" +
" 'password'='dlink',\n" +
" 'url'='jdbc:mysql://127.0.0.1:3306/" +
"dlink?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC')";
String createSql = "create catalog myCatalog \n"
+ " with('type'='dlink_mysql',\n"
+ " 'username'='dlink',\n"
+ " 'password'='dlink',\n"
+ " 'url'='jdbc:mysql://127.0.0.1:3306/"
+ "dlink?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC')";
tableEnv.executeSql(createSql);
tableEnv.executeSql("use catalog myCatalog");
}
......
......@@ -17,23 +17,24 @@
*
*/
package com.dlink.flink.catalog.com.dlink.flink.catalog.factory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.factories.FactoryUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass;
import org.junit.Test;
public class DlinkMysqlCatalogFactoryTest {
......
......@@ -22,6 +22,8 @@ package com.dlink.flink.catalog;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
......@@ -71,8 +73,6 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
/**
* 自定义 catalog
* 检查connection done.
......@@ -99,7 +99,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
/**
* 判断是否发生过SQL异常,如果发生过,那么conn可能失效。要注意判断
*/
private boolean SQLExceptionHappened = false;
private boolean sqlExceptionHappened = false;
/**
* 对象类型,例如 库、表、视图等
......@@ -201,7 +201,6 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
this.pwd = pwd;
}
public DlinkMysqlCatalog(String name) {
super(name, defaultDatabase);
this.url = DlinkMysqlCatalogFactoryOptions.URL.defaultValue();
......@@ -231,7 +230,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
connection.close();
connection = null;
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("Fail to close connection.", e);
}
}
......@@ -246,8 +245,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (connection == null) {
connection = DriverManager.getConnection(url, user, pwd);
}
if (SQLExceptionHappened) {
SQLExceptionHappened = false;
if (sqlExceptionHappened) {
sqlExceptionHappened = false;
if (!connection.isValid(10)) {
connection.close();
}
......@@ -288,8 +287,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
String querySql = "SELECT id, database_name,description " +
" FROM metadata_database where database_name=?";
String querySql = "SELECT id, database_name,description "
+ " FROM metadata_database where database_name=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
......@@ -301,9 +300,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map<String, String> map = new HashMap<>();
String sql = "select `key`,`value` " +
"from metadata_database_property " +
"where database_id=? ";
String sql = "select `key`,`value` "
+ "from metadata_database_property "
+ "where database_id=? ";
try (PreparedStatement pStat = conn.prepareStatement(sql)) {
pStat.setInt(1, id);
ResultSet prs = pStat.executeQuery();
......@@ -311,7 +310,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
map.put(rs.getString("key"), rs.getString("value"));
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database properties in catalog %s", getName()), e);
}
......@@ -321,7 +320,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new DatabaseNotExistException(getName(), databaseName);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database in catalog %s", getName()), e);
}
......@@ -350,7 +349,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
return id;
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException(String.format("获取 database 信息失败:%s.%s", getName(), databaseName), e);
}
}
......@@ -379,8 +378,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ResultSet idRs = stat.getGeneratedKeys();
if (idRs.next() && db.getProperties() != null && db.getProperties().size() > 0) {
int id = idRs.getInt(1);
String propInsertSql = "insert into metadata_database_property(database_id, " +
"`key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_database_property(database_id, "
+ "`key`,`value`) values (?,?,?)";
PreparedStatement pstat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : db.getProperties().entrySet()) {
pstat.setInt(1, id);
......@@ -393,7 +392,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("创建 database 信息失败:", e);
}
}
......@@ -445,7 +444,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("删除 database 信息失败:", e);
}
}
......@@ -475,9 +474,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
uState.executeUpdate();
uState.close();
if (newDb.getProperties() != null && newDb.getProperties().size() > 0) {
String upsertSql = "insert into metadata_database_property (database_id, `key`,`value`) \n" +
"values (?,?,?)\n" +
"on duplicate key update `value` =?, update_time = sysdate()\n";
String upsertSql = "insert into metadata_database_property (database_id, `key`,`value`) \n"
+ "values (?,?,?)\n"
+ "on duplicate key update `value` =?, update_time = sysdate()\n";
PreparedStatement pstat = conn.prepareStatement(upsertSql);
for (Map.Entry<String, String> entry : newDb.getProperties().entrySet()) {
pstat.setInt(1, id);
......@@ -491,7 +490,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改 database 信息失败:", e);
}
}
......@@ -551,10 +550,10 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection conn = getConnection();
try {
String queryTable = "SELECT table_name " +
" ,description, table_type " +
" FROM metadata_table " +
" where id=?";
String queryTable = "SELECT table_name "
+ " ,description, table_type "
+ " FROM metadata_table "
+ " where id=?";
PreparedStatement ps = conn.prepareStatement(queryTable);
ps.setInt(1, id);
ResultSet rs = ps.executeQuery();
......@@ -570,8 +569,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
if (tableType.equals(ObjectType.TABLE)) {
// 这个是 table
String propSql = "SELECT `key`, `value` from metadata_table_property " +
"WHERE table_id=?";
String propSql = "SELECT `key`, `value` from metadata_table_property "
+ "WHERE table_id=?";
PreparedStatement pState = conn.prepareStatement(propSql);
pState.setInt(1, id);
ResultSet prs = pState.executeQuery();
......@@ -587,9 +586,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
} else if (tableType.equals(ObjectType.VIEW)) {
// 1、从库中取出table信息。(前面已做)
// 2、取出字段。
String colSql = "SELECT column_name, column_type, data_type, description " +
" FROM metadata_column WHERE " +
" table_id=?";
String colSql = "SELECT column_name, column_type, data_type, description "
+ " FROM metadata_column WHERE "
+ " table_id=?";
PreparedStatement cStat = conn.prepareStatement(colSql);
cStat.setInt(1, id);
ResultSet crs = cStat.executeQuery();
......@@ -607,8 +606,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
cStat.close();
// 3、取出query
String qSql = "SELECT `key`, value FROM metadata_table_property" +
" WHERE table_id=? ";
String qSql = "SELECT `key`, value FROM metadata_table_property"
+ " WHERE table_id=? ";
PreparedStatement qStat = conn.prepareStatement(qSql);
qStat.setInt(1, id);
ResultSet qrs = qStat.executeQuery();
......@@ -633,7 +632,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new CatalogException("不支持的数据类型。" + tableType);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("获取 表信息失败。", e);
}
......@@ -651,8 +650,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return null;
}
// 获取id
String getIdSql = "select id from metadata_table " +
" where table_name=? and database_id=?";
String getIdSql = "select id from metadata_table "
+ " where table_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, tablePath.getObjectName());
......@@ -662,7 +661,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return rs.getInt(1);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("get table fail", e);
throw new CatalogException("get table fail.", e);
}
......@@ -681,27 +680,27 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
try {
// todo: 现在是真实删除,后续设计是否做记录保留。
conn.setAutoCommit(false);
String deletePropSql = "delete from metadata_table_property " +
" where table_id=?";
String deletePropSql = "delete from metadata_table_property "
+ " where table_id=?";
PreparedStatement dStat = conn.prepareStatement(deletePropSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteColSql = "delete from metadata_column " +
" where table_id=?";
String deleteColSql = "delete from metadata_column "
+ " where table_id=?";
dStat = conn.prepareStatement(deleteColSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteDbSql = "delete from metadata_table " +
" where id=?";
String deleteDbSql = "delete from metadata_table "
+ " where id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("drop table fail", e);
throw new CatalogException("drop table fail.", e);
}
......@@ -726,7 +725,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ps.setInt(2, id);
ps.executeUpdate();
} catch (SQLException ex) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
......@@ -734,8 +733,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Integer db_id = getDatabaseId(tablePath.getDatabaseName());
if (null == db_id) {
Integer dbId = getDatabaseId(tablePath.getDatabaseName());
if (null == dbId) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if (tableExists(tablePath)) {
......@@ -757,16 +756,16 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
// 首先插入表信息
CatalogBaseTable.TableKind kind = table.getTableKind();
String insertSql = "insert into metadata_table(\n" +
" table_name," +
" table_type," +
" database_id," +
" description)" +
" values(?,?,?,?)";
String insertSql = "insert into metadata_table(\n"
+ " table_name,"
+ " table_type,"
+ " database_id,"
+ " description)"
+ " values(?,?,?,?)";
PreparedStatement iStat = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS);
iStat.setString(1, tablePath.getObjectName());
iStat.setString(2, kind.toString());
iStat.setInt(3, db_id);
iStat.setInt(3, dbId);
iStat.setString(4, table.getComment());
iStat.executeUpdate();
ResultSet idRs = iStat.getGeneratedKeys();
......@@ -780,8 +779,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (table instanceof ResolvedCatalogTable) {
// table 就可以直接拿properties了。
Map<String, String> props = ((ResolvedCatalogTable) table).toProperties();
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_table_property(table_id,"
+ "`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : props.entrySet()) {
pStat.setInt(1, id);
......@@ -798,13 +797,13 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ResolvedCatalogView view = (ResolvedCatalogView) table;
List<Schema.UnresolvedColumn> cols = view.getUnresolvedSchema().getColumns();
if (cols.size() > 0) {
String colInsertSql = "insert into metadata_column(" +
" column_name, column_type, data_type" +
" , `expr`" +
" , description" +
" , table_id" +
" , `primary`) " +
" values(?,?,?,?,?,?,?)";
String colInsertSql = "insert into metadata_column("
+ " column_name, column_type, data_type"
+ " , `expr`"
+ " , description"
+ " , table_id"
+ " , `primary`) "
+ " values(?,?,?,?,?,?,?)";
PreparedStatement colIStat = conn.prepareStatement(colInsertSql);
for (Schema.UnresolvedColumn col : cols) {
if (col instanceof Schema.UnresolvedPhysicalColumn) {
......@@ -843,8 +842,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
option.put("OriginalQuery", view.getOriginalQuery());
option.put("ExpandedQuery", view.getExpandedQuery());
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_table_property(table_id,"
+ "`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : option.entrySet()) {
pStat.setInt(1, id);
......@@ -858,7 +857,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
conn.commit();
} catch (SQLException ex) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("插入数据库失败", ex);
throw new CatalogException("插入数据库失败", ex);
}
......@@ -875,9 +874,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map<String, String> opts = newTable.getOptions();
if (opts != null && opts.size() > 0) {
String updateSql = "INSERT INTO metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?) " +
"on duplicate key update `value` =?, update_time = sysdate()";
String updateSql = "INSERT INTO metadata_table_property(table_id,"
+ "`key`,`value`) values (?,?,?) "
+ "on duplicate key update `value` =?, update_time = sysdate()";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(updateSql)) {
for (Map.Entry<String, String> entry : opts.entrySet()) {
......@@ -889,7 +888,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
ps.executeBatch();
} catch (SQLException ex) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
......@@ -934,19 +933,19 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
......@@ -958,8 +957,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (null == dbId) {
throw new DatabaseNotExistException(getName(), dbName);
}
String querySql = "SELECT function_name from metadata_function " +
" WHERE database_id=?";
String querySql = "SELECT function_name from metadata_function "
+ " WHERE database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
......@@ -972,7 +971,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
return functions;
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("获取 UDF 列表失败");
}
}
......@@ -985,8 +984,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new FunctionNotExistException(getName(), functionPath);
}
String querySql = "SELECT class_name,function_language from metadata_function " +
" WHERE id=?";
String querySql = "SELECT class_name,function_language from metadata_function "
+ " WHERE id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
gStat.setInt(1, id);
......@@ -1000,7 +999,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new FunctionNotExistException(getName(), functionPath);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("获取 UDF 失败:"
+ functionPath.getDatabaseName() + "."
+ functionPath.getObjectName());
......@@ -1019,8 +1018,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return null;
}
// 获取id
String getIdSql = "select id from metadata_function " +
" where function_name=? and database_id=?";
String getIdSql = "select id from metadata_function "
+ " where function_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, functionPath.getObjectName());
......@@ -1031,7 +1030,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return id;
}
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
logger.error("get function fail", e);
throw new CatalogException("get function fail.", e);
}
......@@ -1052,9 +1051,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
Connection conn = getConnection();
String insertSql = "Insert into metadata_function " +
"(function_name,class_name,database_id,function_language) " +
" values (?,?,?,?)";
String insertSql = "Insert into metadata_function "
+ "(function_name,class_name,database_id,function_language) "
+ " values (?,?,?,?)";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, functionPath.getObjectName());
ps.setString(2, function.getClassName());
......@@ -1062,7 +1061,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
ps.setString(4, function.getFunctionLanguage().toString());
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("创建 函数 失败", e);
}
}
......@@ -1079,16 +1078,16 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
Connection conn = getConnection();
String insertSql = "update metadata_function " +
"set (class_name =?, function_language=?) " +
" where id=?";
String insertSql = "update metadata_function "
+ "set (class_name =?, function_language=?) "
+ " where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, newFunction.getClassName());
ps.setString(2, newFunction.getFunctionLanguage().toString());
ps.setInt(3, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("修改 函数 失败", e);
}
}
......@@ -1105,13 +1104,13 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
Connection conn = getConnection();
String insertSql = "delete from metadata_function " +
" where id=?";
String insertSql = "delete from metadata_function "
+ " where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setInt(1, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
sqlExceptionHappened = true;
throw new CatalogException("删除 函数 失败", e);
}
}
......@@ -1168,21 +1167,21 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException, TablePartitionedException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
}
......@@ -17,10 +17,16 @@
*
*/
package com.dlink.flink.catalog.factory;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.PASSWORD;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.URL;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.USERNAME;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
......@@ -29,9 +35,6 @@ import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.*;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
/**
* Factory for {@link DlinkMysqlCatalog}.
*/
......
......@@ -17,10 +17,10 @@
*
*/
package com.dlink.flink.catalog.factory;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment