Unverified Commit 081f5ca4 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-506][client] The CDCSOURCE table parameter supports line fee…

[Feature-506][client] The CDCSOURCE table parameter supports line fee…
parents e7b6f509 e9ea98f2
...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder { ...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -87,6 +87,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -87,6 +87,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder { ...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -100,6 +100,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -100,6 +100,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder { ...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -145,6 +145,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -145,6 +145,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder { ...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -145,6 +145,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -145,6 +145,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder { ...@@ -42,6 +42,7 @@ public abstract class AbstractCDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -145,6 +145,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -145,6 +145,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
......
...@@ -66,17 +66,30 @@ public class SqlParserTest { ...@@ -66,17 +66,30 @@ public class SqlParserTest {
@Test @Test
public void createCDCSourceTest() { public void createCDCSourceTest() {
String sql = "EXECUTE CDCSOURCE demo WITH (\n" + String sql = "EXECUTE CDCSOURCE demo WITH (\n" +
" 'type'='mysql-cdc',\n" + " 'connector' = 'mysql-cdc',\n" +
" 'hostname'='127.0.0.1',\n" + " 'hostname' = '10.1.51.25',\n" +
" 'port'='3306',\n" + " 'port' = '3306',\n" +
" 'password'='dlink',\n" + " 'username' = 'dfly',\n" +
" 'hostname'='dlink',\n" + " 'password' = 'Dareway@2020',\n" +
" 'checkpoint'='3000',\n" + " 'checkpoint' = '3000',\n" +
" 'parallelism'='1',\n" + " 'scan.startup.mode' = 'initial',\n" +
" 'database'='dlink,test',\n" + " 'parallelism' = '1',\n" +
" 'table'='',\n" + " -- 'database-name'='test',\n" +
" 'topic'='dlinkcdc',\n" + " 'table-name' = 'test\\.student,\n" +
" 'brokers'='127.0.0.1:9092'\n" + " test\\.score',\n" +
" -- 'sink.connector'='datastream-doris',\n" +
" 'sink.connector' = 'doris',\n" +
" 'sink.fenodes' = '10.1.51.26:8030',\n" +
" 'sink.username' = 'root',\n" +
" 'sink.password' = 'dw123456',\n" +
" 'sink.sink.batch.size' = '1',\n" +
" 'sink.sink.max-retries' = '1',\n" +
" 'sink.sink.batch.interval' = '60000',\n" +
" 'sink.sink.db' = 'test',\n" +
" 'sink.table.prefix' = 'ODS_',\n" +
" 'sink.table.upper' = 'true',\n" +
" 'sink.table.identifier' = '${schemaName}.${tableName}',\n" +
" 'sink.sink.enable-delete' = 'true'\n" +
");"; ");";
Map<String, List<String>> lists = SingleSqlParserFactory.generateParser(sql); Map<String, List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString()); System.out.println(lists.toString());
......
...@@ -74,15 +74,15 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -74,15 +74,15 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
if (!Asserts.isEquals(table.getType(), "VIEW")) { if (!Asserts.isEquals(table.getType(), "VIEW")) {
if (Asserts.isNotNullCollection(tableRegList)) { if (Asserts.isNotNullCollection(tableRegList)) {
for (String tableReg : tableRegList) { for (String tableReg : tableRegList) {
if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) { if (table.getSchemaTableName().matches(tableReg.trim()) && !schema.getTables().contains(Table.build(table.getName()))) {
table.setColumns(driver.listColumns(schemaName, table.getName())); table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
schema.getTables().add(table); schema.getTables().add(table);
schemaTableNameList.add(table.getSchemaTableName()); schemaTableNameList.add(table.getSchemaTableName());
break; break;
} }
} }
} else { } else {
table.setColumns(driver.listColumns(schemaName, table.getName())); table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
schemaTableNameList.add(table.getSchemaTableName()); schemaTableNameList.add(table.getSchemaTableName());
schema.getTables().add(table); schema.getTables().add(table);
} }
......
...@@ -235,6 +235,23 @@ public abstract class AbstractJdbcDriver extends AbstractDriver { ...@@ -235,6 +235,23 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
return columns; return columns;
} }
@Override
public List<Column> listColumnsSortByPK(String schemaName, String tableName) {
List<Column> columnList = listColumns(schemaName, tableName);
List<Column> columnListSortByPK = new ArrayList<>();
for(Column column: columnList){
if(column.isKeyFlag()){
columnListSortByPK.add(column);
}
}
for(Column column: columnList){
if(!column.isKeyFlag()){
columnListSortByPK.add(column);
}
}
return columnListSortByPK;
}
@Override @Override
public boolean createTable(Table table) throws Exception { public boolean createTable(Table table) throws Exception {
String sql = getCreateTableSql(table).replaceAll("\r\n", " "); String sql = getCreateTableSql(table).replaceAll("\r\n", " ");
......
...@@ -71,6 +71,8 @@ public interface Driver { ...@@ -71,6 +71,8 @@ public interface Driver {
List<Column> listColumns(String schemaName, String tableName); List<Column> listColumns(String schemaName, String tableName);
List<Column> listColumnsSortByPK(String schemaName, String tableName);
List<Schema> getSchemasAndTables(); List<Schema> getSchemasAndTables();
List<Table> getTablesAndColumns(String schemaName); List<Table> getTablesAndColumns(String schemaName);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment