Unverified Commit 39b337bb authored by aiwenmo's avatar aiwenmo Committed by GitHub

fix #608 Can't create a flink table when table name is a sql reserved…

fix #608 Can't create a flink table when table name is a sql reserved…
parents ed54c10d 03f7e409
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.Column; import com.dlink.assertion.Asserts;
import com.dlink.model.ColumnType; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
...@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -33,16 +36,6 @@ import java.util.ArrayList; ...@@ -33,16 +36,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
* *
...@@ -123,15 +116,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -123,15 +116,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
String pkList = StringUtils.join(getPKList(table), ".");
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful..."); logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName); String flinkDDL = FlinkBaseUtil.getFlinkDDL(table, sinkTableName, config, sinkSchemaName, sinkTableName, pkList);
logger.info(flinkDDL); logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL); customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful..."); logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName); String cdcSqlInsert = FlinkBaseUtil.getCDCSqlInsert(table, sinkTableName, viewName, config);
logger.info(cdcSqlInsert); logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert); List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful..."); logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
...@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return dataStreamSource; return dataStreamSource;
} }
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) { if (value == null) {
return null; return null;
...@@ -293,13 +214,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -293,13 +214,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return value; return value;
} }
} }
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
} }
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.Column; import com.dlink.assertion.Asserts;
import com.dlink.model.ColumnType; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
...@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -33,16 +36,6 @@ import java.util.ArrayList; ...@@ -33,16 +36,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
* *
...@@ -118,20 +111,22 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -118,20 +111,22 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
String pkList = StringUtils.join(getPKList(table), ".");
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful..."); logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName); String flinkDDL = FlinkBaseUtil.getFlinkDDL(table, sinkTableName, config, sinkSchemaName, sinkTableName, pkList);
logger.info(flinkDDL); logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL); customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful..."); logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName); String cdcSqlInsert = FlinkBaseUtil.getCDCSqlInsert(table, sinkTableName, viewName, config);
logger.info(cdcSqlInsert); logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert); List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful..."); logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
...@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return dataStreamSource; return dataStreamSource;
} }
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) { if (value == null) {
return null; return null;
...@@ -293,13 +214,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -293,13 +214,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return value; return value;
} }
} }
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
} }
...@@ -2,6 +2,7 @@ package com.dlink.cdc.sql; ...@@ -2,6 +2,7 @@ package com.dlink.cdc.sql;
import com.dlink.model.*; import com.dlink.model.*;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
...@@ -116,15 +117,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -116,15 +117,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
String pkList = StringUtils.join(getPKList(table), ".");
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful..."); logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName); String flinkDDL = FlinkBaseUtil.getFlinkDDL(table, sinkTableName, config, sinkSchemaName, sinkTableName, pkList);
logger.info(flinkDDL); logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL); customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful..."); logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName); String cdcSqlInsert = FlinkBaseUtil.getCDCSqlInsert(table, sinkTableName, viewName, config);
logger.info(cdcSqlInsert); logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert); List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful..."); logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
...@@ -184,80 +187,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -184,80 +187,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return dataStreamSource; return dataStreamSource;
} }
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) { if (value == null) {
return null; return null;
...@@ -288,13 +217,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -288,13 +217,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return value; return value;
} }
} }
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
} }
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.Column; import com.dlink.assertion.Asserts;
import com.dlink.model.ColumnType; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
...@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -33,16 +36,6 @@ import java.util.ArrayList; ...@@ -33,16 +36,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
* *
...@@ -123,15 +116,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -123,15 +116,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
String pkList = StringUtils.join(getPKList(table), ".");
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful..."); logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName); String flinkDDL = FlinkBaseUtil.getFlinkDDL(table, sinkTableName, config, sinkSchemaName, sinkTableName, pkList);
logger.info(flinkDDL); logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL); customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful..."); logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName); String cdcSqlInsert = FlinkBaseUtil.getCDCSqlInsert(table, sinkTableName, viewName, config);
logger.info(cdcSqlInsert); logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert); List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful..."); logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
...@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return dataStreamSource; return dataStreamSource;
} }
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) { if (value == null) {
return null; return null;
...@@ -295,13 +216,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -295,13 +216,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return value; return value;
} }
} }
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
} }
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.Column; import com.dlink.assertion.Asserts;
import com.dlink.model.ColumnType; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
...@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -33,16 +36,6 @@ import java.util.ArrayList; ...@@ -33,16 +36,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.SqlUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
* *
...@@ -117,21 +110,23 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -117,21 +110,23 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}, rowTypeInfo); }, rowTypeInfo);
} }
public void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
String pkList = StringUtils.join(getPKList(table), ".");
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful..."); logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName); String flinkDDL = FlinkBaseUtil.getFlinkDDL(table, sinkTableName, config, sinkSchemaName, sinkTableName, pkList);
logger.info(flinkDDL); logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL); customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful..."); logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName); String cdcSqlInsert = FlinkBaseUtil.getCDCSqlInsert(table, sinkTableName, viewName, config);
logger.info(cdcSqlInsert); logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert); List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful..."); logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
...@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return dataStreamSource; return dataStreamSource;
} }
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) { if (value == null) {
return null; return null;
...@@ -295,13 +216,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -295,13 +216,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return value; return value;
} }
} }
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
} }
package com.dlink.utils; package com.dlink.utils;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
...@@ -24,4 +30,88 @@ public class FlinkBaseUtil { ...@@ -24,4 +30,88 @@ public class FlinkBaseUtil {
params.put(FlinkParamConstant.PASSWORD, parameters.get(FlinkParamConstant.PASSWORD, null)); params.put(FlinkParamConstant.PASSWORD, parameters.get(FlinkParamConstant.PASSWORD, null));
return params; return params;
} }
public static String getCDCSqlInsert(Table table, String targetName, String sourceName, FlinkCDCConfig config) {
StringBuilder sb = new StringBuilder("INSERT INTO `");
sb.append(targetName);
sb.append("` SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i), config)).append(" \n");
}
sb.append(" FROM `");
sb.append(sourceName);
sb.append("`");
return sb.toString();
}
public static String getFlinkDDL(Table table, String tableName, FlinkCDCConfig config, String sinkSchemaName, String sinkTableName, String pkList) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS `");
sb.append(tableName);
sb.append("` (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type, config));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table, config, sinkSchemaName, sinkTableName, pkList));
sb.append(")\n");
return sb.toString();
}
public static String getSinkConfigurationString(Table table, FlinkCDCConfig config, String sinkSchemaName, String sinkTableName, String pkList) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", sinkSchemaName);
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", sinkTableName);
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", pkList);
}
return configurationString;
}
public static String convertSinkColumnType(String type, FlinkCDCConfig config) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
public static String getColumnProcessing(Column column, FlinkCDCConfig config) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment