Commit 15bbdc26 authored by wenmo's avatar wenmo

[Feature-518][client] CDCSOURCE add log and fix decimal bug

parent 66eee408
...@@ -259,6 +259,20 @@ ...@@ -259,6 +259,20 @@
<include>dlink-app-1.15-${project.version}-jar-with-dependencies.jar</include> <include>dlink-app-1.15-${project.version}-jar-with-dependencies.jar</include>
</includes> </includes>
</fileSet> </fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-client/dlink-client-base/target</directory>
<outputDirectory>jar</outputDirectory>
<includes>
<include>dlink-client-base-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-common/target</directory>
<outputDirectory>jar</outputDirectory>
<includes>
<include>dlink-common-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet> <fileSet>
<directory>${project.parent.basedir}/dlink-doc/extends</directory> <directory>${project.parent.basedir}/dlink-doc/extends</directory>
<outputDirectory>jar</outputDirectory> <outputDirectory>jar</outputDirectory>
......
...@@ -39,6 +39,9 @@ import java.util.List; ...@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column; import com.dlink.model.Column;
...@@ -55,6 +58,8 @@ import com.dlink.model.Table; ...@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/ **/
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList(); protected List<ModifyOperation> modifyOperations = new ArrayList();
...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder { ...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale()); if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT: case INT:
case INTEGER: case INTEGER:
return new IntType(); return new IntType();
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() { public SQLSinkBuilder() {
} }
public SQLSinkBuilder(FlinkCDCConfig config) { private SQLSinkBuilder(FlinkCDCConfig config) {
super(config); super(config);
} }
...@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
protected DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
...@@ -113,17 +116,24 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -113,17 +116,24 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}, rowTypeInfo); }, rowTypeInfo);
} }
public void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName)); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful...");
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline())); String flinkDDL = getFlinkDDL(table, sinkTableName);
logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) { if (operations.size() > 0) {
Operation operation = operations.get(0); Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) { if (operation instanceof ModifyOperation) {
...@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource); SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); try {
List<String> columnNameList = new ArrayList<>(); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<LogicalType> columnTypeList = new ArrayList<>(); logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
buildColumn(columnNameList, columnTypeList, table.getColumns()); List<String> columnNameList = new ArrayList<>();
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); List<LogicalType> columnTypeList = new ArrayList<>();
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
} }
} }
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
} }
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
} }
return dataStreamSource; return dataStreamSource;
} }
public String getFlinkDDL(Table table, String tableName) { private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS "); sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName); sb.append(tableName);
...@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString(); return sb.toString();
} }
protected String convertSinkColumnType(String type) { private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) { if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) { if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)"; return "TIMESTAMP(3)";
...@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
} }
protected String getSinkConfigurationString(Table table) { private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table)); String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table)); configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) { if (configurationString.contains("${pkList}")) {
......
...@@ -39,6 +39,9 @@ import java.util.List; ...@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column; import com.dlink.model.Column;
...@@ -55,6 +58,8 @@ import com.dlink.model.Table; ...@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/ **/
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList(); protected List<ModifyOperation> modifyOperations = new ArrayList();
...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder { ...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale()); if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT: case INT:
case INTEGER: case INTEGER:
return new IntType(); return new IntType();
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() { public SQLSinkBuilder() {
} }
public SQLSinkBuilder(FlinkCDCConfig config) { private SQLSinkBuilder(FlinkCDCConfig config) {
super(config); super(config);
} }
...@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
protected DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
...@@ -113,17 +116,24 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -113,17 +116,24 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}, rowTypeInfo); }, rowTypeInfo);
} }
public void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName)); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful...");
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline())); String flinkDDL = getFlinkDDL(table, sinkTableName);
logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) { if (operations.size() > 0) {
Operation operation = operations.get(0); Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) { if (operation instanceof ModifyOperation) {
...@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource); SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); try {
List<String> columnNameList = new ArrayList<>(); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<LogicalType> columnTypeList = new ArrayList<>(); logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
buildColumn(columnNameList, columnTypeList, table.getColumns()); List<String> columnNameList = new ArrayList<>();
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); List<LogicalType> columnTypeList = new ArrayList<>();
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
} }
} }
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
} }
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
} }
return dataStreamSource; return dataStreamSource;
} }
public String getFlinkDDL(Table table, String tableName) { private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS "); sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName); sb.append(tableName);
...@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString(); return sb.toString();
} }
protected String convertSinkColumnType(String type) { private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) { if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) { if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)"; return "TIMESTAMP(3)";
...@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
} }
protected String getSinkConfigurationString(Table table) { private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table)); String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table)); configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) { if (configurationString.contains("${pkList}")) {
......
...@@ -39,6 +39,9 @@ import java.util.List; ...@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column; import com.dlink.model.Column;
...@@ -55,6 +58,8 @@ import com.dlink.model.Table; ...@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/ **/
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList(); protected List<ModifyOperation> modifyOperations = new ArrayList();
...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder { ...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale()); if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT: case INT:
case INTEGER: case INTEGER:
return new IntType(); return new IntType();
......
...@@ -2,6 +2,7 @@ package com.dlink.cdc.sql; ...@@ -2,6 +2,7 @@ package com.dlink.cdc.sql;
import com.dlink.model.*; import com.dlink.model.*;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -49,7 +50,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -49,7 +50,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() { public SQLSinkBuilder() {
} }
public SQLSinkBuilder(FlinkCDCConfig config) { private SQLSinkBuilder(FlinkCDCConfig config) {
super(config); super(config);
} }
...@@ -58,7 +59,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -58,7 +59,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
protected DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
...@@ -109,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -109,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}, rowTypeInfo); }, rowTypeInfo);
} }
public void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
...@@ -118,9 +119,15 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -118,9 +119,15 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName)); logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName);
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, viewName)); logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) { if (operations.size() > 0) {
Operation operation = operations.get(0); Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) { if (operation instanceof ModifyOperation) {
...@@ -149,25 +156,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -149,25 +156,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource); SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); try {
List<String> columnNameList = new ArrayList<>(); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<LogicalType> columnTypeList = new ArrayList<>(); logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
buildColumn(columnNameList, columnTypeList, table.getColumns()); List<String> columnNameList = new ArrayList<>();
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); List<LogicalType> columnTypeList = new ArrayList<>();
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
} }
} }
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
} }
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
} }
return dataStreamSource; return dataStreamSource;
} }
public String getFlinkDDL(Table table, String tableName) { private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS "); sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName); sb.append(tableName);
...@@ -208,7 +225,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -208,7 +225,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString(); return sb.toString();
} }
protected String convertSinkColumnType(String type) { private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) { if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) { if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)"; return "TIMESTAMP(3)";
...@@ -232,7 +273,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -232,7 +273,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
} }
protected String getSinkConfigurationString(Table table) { private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table)); String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table)); configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) { if (configurationString.contains("${pkList}")) {
......
...@@ -39,6 +39,9 @@ import java.util.List; ...@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column; import com.dlink.model.Column;
...@@ -55,6 +58,8 @@ import com.dlink.model.Table; ...@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/ **/
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList(); protected List<ModifyOperation> modifyOperations = new ArrayList();
...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder { ...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale()); if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT: case INT:
case INTEGER: case INTEGER:
return new IntType(); return new IntType();
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() { public SQLSinkBuilder() {
} }
public SQLSinkBuilder(FlinkCDCConfig config) { private SQLSinkBuilder(FlinkCDCConfig config) {
super(config); super(config);
} }
...@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
protected DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
...@@ -113,17 +116,24 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -113,17 +116,24 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}, rowTypeInfo); }, rowTypeInfo);
} }
public void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName)); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful...");
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline())); String flinkDDL = getFlinkDDL(table, sinkTableName);
logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) { if (operations.size() > 0) {
Operation operation = operations.get(0); Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) { if (operation instanceof ModifyOperation) {
...@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource); SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); try {
List<String> columnNameList = new ArrayList<>(); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<LogicalType> columnTypeList = new ArrayList<>(); logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
buildColumn(columnNameList, columnTypeList, table.getColumns()); List<String> columnNameList = new ArrayList<>();
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); List<LogicalType> columnTypeList = new ArrayList<>();
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
} }
} }
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
} }
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
} }
return dataStreamSource; return dataStreamSource;
} }
public String getFlinkDDL(Table table, String tableName) { private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS "); sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName); sb.append(tableName);
...@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString(); return sb.toString();
} }
protected String convertSinkColumnType(String type) { private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) { if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) { if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)"; return "TIMESTAMP(3)";
...@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
} }
protected String getSinkConfigurationString(Table table) { private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table)); String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table)); configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) { if (configurationString.contains("${pkList}")) {
......
...@@ -39,6 +39,9 @@ import java.util.List; ...@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column; import com.dlink.model.Column;
...@@ -55,6 +58,8 @@ import com.dlink.model.Table; ...@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/ **/
public abstract class AbstractSinkBuilder { public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList(); protected List<ModifyOperation> modifyOperations = new ArrayList();
...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder { ...@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale()); if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT: case INT:
case INTEGER: case INTEGER:
return new IntType(); return new IntType();
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() { public SQLSinkBuilder() {
} }
public SQLSinkBuilder(FlinkCDCConfig config) { private SQLSinkBuilder(FlinkCDCConfig config) {
super(config); super(config);
} }
...@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
protected DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
...@@ -120,10 +123,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -120,10 +123,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName)); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful...");
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline())); String flinkDDL = getFlinkDDL(table, sinkTableName);
logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) { if (operations.size() > 0) {
Operation operation = operations.get(0); Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) { if (operation instanceof ModifyOperation) {
...@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource); SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); try {
List<String> columnNameList = new ArrayList<>(); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<LogicalType> columnTypeList = new ArrayList<>(); logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
buildColumn(columnNameList, columnTypeList, table.getColumns()); List<String> columnNameList = new ArrayList<>();
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); List<LogicalType> columnTypeList = new ArrayList<>();
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
} }
} }
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
} }
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
} }
return dataStreamSource; return dataStreamSource;
} }
public String getFlinkDDL(Table table, String tableName) { private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS "); sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName); sb.append(tableName);
...@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString(); return sb.toString();
} }
protected String convertSinkColumnType(String type) { private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) { if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) { if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)"; return "TIMESTAMP(3)";
...@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
} }
protected String getSinkConfigurationString(Table table) { private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table)); String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table)); configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) { if (configurationString.contains("${pkList}")) {
......
...@@ -158,6 +158,7 @@ public class FlinkCDCConfig { ...@@ -158,6 +158,7 @@ public class FlinkCDCConfig {
case "table.suffix": case "table.suffix":
case "table.upper": case "table.upper":
case "table.lower": case "table.lower":
case "column.replace.line-break":
return true; return true;
default: default:
return false; return false;
......
...@@ -60,7 +60,11 @@ public enum ColumnType { ...@@ -60,7 +60,11 @@ public enum ColumnType {
public String getFlinkType() { public String getFlinkType() {
if (flinkType.equals("DECIMAL")) { if (flinkType.equals("DECIMAL")) {
return flinkType + "(" + precision + "," + scale + ")"; if (precision == null || precision == 0) {
return flinkType + "(" + 38 + "," + scale + ")";
} else {
return flinkType + "(" + precision + "," + scale + ")";
}
} else { } else {
return flinkType; return flinkType;
} }
......
...@@ -42,31 +42,4 @@ public class BatchTest { ...@@ -42,31 +42,4 @@ public class BatchTest {
TableResult tableResult = tEnv.executeSql(select); TableResult tableResult = tEnv.executeSql(select);
tableResult.print(); tableResult.print();
} }
@Test
public void batchTest2() {
String source = "CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" buyer ROW<first_name STRING, last_name STRING>,\n" +
" order_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'number-of-rows' = '100'\n" +
")";
String select = "select order_number,price,order_time from Orders";
// LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
// configuration.setString("execution.runtime-mode", "STREAMING");
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
CustomTableEnvironmentImpl batchTableEnvironment = CustomTableEnvironmentImpl.create(environment,
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), tableConfig);
batchTableEnvironment.executeSql(source);
batchTableEnvironment.executeSql(select);
// TableResult tableResult = batchTableEnvironment.executeSql(select);
// tableResult.print();
}
} }
package com.dlink.trans; package com.dlink.trans;
import com.dlink.executor.CustomTableEnvironmentImpl; import com.dlink.executor.CustomTableEnvironmentImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
...@@ -13,6 +15,8 @@ import java.util.List; ...@@ -13,6 +15,8 @@ import java.util.List;
*/ */
public class AbstractOperation { public class AbstractOperation {
protected static final Logger logger = LoggerFactory.getLogger(AbstractOperation.class);
protected String statement; protected String statement;
public AbstractOperation() { public AbstractOperation() {
......
...@@ -50,6 +50,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -50,6 +50,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
@Override @Override
public TableResult build(Executor executor) { public TableResult build(Executor executor) {
logger.info("Start build CDCSOURCE Task...");
CDCSource cdcSource = CDCSource.build(statement); CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername() FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema() , cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
...@@ -90,17 +91,25 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -90,17 +91,25 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
} }
schemaList.add(schema); schemaList.add(schema);
} }
logger.info("A total of " + schemaTableNameList.size() + " tables were detected...");
for (int i = 0; i < schemaTableNameList.size(); i++) {
logger.info((i + 1) + ": " + schemaTableNameList.get(i));
}
config.setSchemaTableNameList(schemaTableNameList); config.setSchemaTableNameList(schemaTableNameList);
config.setSchemaList(schemaList); config.setSchemaList(schemaList);
StreamExecutionEnvironment streamExecutionEnvironment = executor.getStreamExecutionEnvironment(); StreamExecutionEnvironment streamExecutionEnvironment = executor.getStreamExecutionEnvironment();
if (Asserts.isNotNull(config.getParallelism())) { if (Asserts.isNotNull(config.getParallelism())) {
streamExecutionEnvironment.setParallelism(config.getParallelism()); streamExecutionEnvironment.setParallelism(config.getParallelism());
logger.info("Set parallelism: " + config.getParallelism());
} }
if (Asserts.isNotNull(config.getCheckpoint())) { if (Asserts.isNotNull(config.getCheckpoint())) {
streamExecutionEnvironment.enableCheckpointing(config.getCheckpoint()); streamExecutionEnvironment.enableCheckpointing(config.getCheckpoint());
logger.info("Set checkpoint: " + config.getCheckpoint());
} }
DataStreamSource<String> streamSource = cdcBuilder.build(streamExecutionEnvironment); DataStreamSource<String> streamSource = cdcBuilder.build(streamExecutionEnvironment);
logger.info("Build " + config.getType() + " successful...");
SinkBuilderFactory.buildSinkBuilder(config).build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource); SinkBuilderFactory.buildSinkBuilder(config).build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource);
logger.info("Build CDCSOURCE Task successful!");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment