Unverified Commit 73a62595 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[fix-472] [client] Mysqlcdc whole database sync Hudi error

 [fix-472] [client] Mysqlcdc whole database sync Hudi error 
parents ed6477cd d712e0a7
...@@ -32,7 +32,7 @@ import com.dlink.model.Table; ...@@ -32,7 +32,7 @@ import com.dlink.model.Table;
*/ */
public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "hudi"; private final static String KEY_WORD = "datastream-hudi";
private static final long serialVersionUID = 5324199407472847422L; private static final long serialVersionUID = 5324199407472847422L;
public HudiSinkBuilder() { public HudiSinkBuilder() {
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.*;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -12,10 +13,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -12,10 +13,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -34,9 +32,6 @@ import com.dlink.cdc.AbstractSinkBuilder; ...@@ -34,9 +32,6 @@ import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder; import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
/** /**
...@@ -120,10 +115,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -120,10 +115,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); String viewName="VIEW_"+table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName)); customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline())); List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, viewName));
if (operations.size() > 0) { if (operations.size() > 0) {
Operation operation = operations.get(0); Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) { if (operation instanceof ModifyOperation) {
......
...@@ -71,19 +71,21 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -71,19 +71,21 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
Driver driver = Driver.build(driverConfig); Driver driver = Driver.build(driverConfig);
final List<Table> tables = driver.listTables(schemaName); final List<Table> tables = driver.listTables(schemaName);
for (Table table : tables) { for (Table table : tables) {
if (Asserts.isNotNullCollection(tableRegList)) { if (!Asserts.isEquals(table.getType(), "VIEW")) {
for (String tableReg : tableRegList) { if (Asserts.isNotNullCollection(tableRegList)) {
if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) { for (String tableReg : tableRegList) {
table.setColumns(driver.listColumns(schemaName, table.getName())); if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) {
schema.getTables().add(table); table.setColumns(driver.listColumns(schemaName, table.getName()));
schemaTableNameList.add(table.getSchemaTableName()); schema.getTables().add(table);
break; schemaTableNameList.add(table.getSchemaTableName());
break;
}
} }
} else {
table.setColumns(driver.listColumns(schemaName, table.getName()));
schemaTableNameList.add(table.getSchemaTableName());
schema.getTables().add(table);
} }
} else {
table.setColumns(driver.listColumns(schemaName, table.getName()));
schemaTableNameList.add(table.getSchemaTableName());
schema.getTables().add(table);
} }
} }
schemaList.add(schema); schemaList.add(schema);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment