Unverified Commit 14adfd6b authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-518][client] CDCSOURCE add log and fix decimal bug

[Feature-518][client] CDCSOURCE add log and fix decimal bug
parents 66eee408 15bbdc26
......@@ -259,6 +259,20 @@
<include>dlink-app-1.15-${project.version}-jar-with-dependencies.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-client/dlink-client-base/target</directory>
<outputDirectory>jar</outputDirectory>
<includes>
<include>dlink-client-base-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-common/target</directory>
<outputDirectory>jar</outputDirectory>
<includes>
<include>dlink-common-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-doc/extends</directory>
<outputDirectory>jar</outputDirectory>
......
......@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
......@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/
public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
......@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale());
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT:
case INTEGER:
return new IntType();
......
package com.dlink.cdc.sql;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
private SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
......@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
protected DataStream<Row> buildRow(
private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
......@@ -113,17 +116,24 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}, rowTypeInfo);
}
public void addTableSink(
private void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName);
logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
......@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
}
return dataStreamSource;
}
public String getFlinkDDL(Table table, String tableName) {
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
......@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString();
}
protected String convertSinkColumnType(String type) {
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
......@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
}
protected String getSinkConfigurationString(Table table) {
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
......
......@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
......@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/
public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
......@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale());
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT:
case INTEGER:
return new IntType();
......
package com.dlink.cdc.sql;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
private SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
......@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
protected DataStream<Row> buildRow(
private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
......@@ -113,17 +116,24 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}, rowTypeInfo);
}
public void addTableSink(
private void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName);
logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
......@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
}
return dataStreamSource;
}
public String getFlinkDDL(Table table, String tableName) {
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
......@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString();
}
protected String convertSinkColumnType(String type) {
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
......@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
}
protected String getSinkConfigurationString(Table table) {
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
......
......@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
......@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/
public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
......@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale());
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT:
case INTEGER:
return new IntType();
......
......@@ -2,6 +2,7 @@ package com.dlink.cdc.sql;
import com.dlink.model.*;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -49,7 +50,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
private SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
......@@ -58,7 +59,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
protected DataStream<Row> buildRow(
private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
......@@ -109,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}, rowTypeInfo);
}
public void addTableSink(
private void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
......@@ -118,9 +119,15 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
String sinkTableName = getSinkTableName(table);
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, viewName));
logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName);
logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
......@@ -149,25 +156,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
}
return dataStreamSource;
}
public String getFlinkDDL(Table table, String tableName) {
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
......@@ -208,7 +225,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString();
}
protected String convertSinkColumnType(String type) {
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
......@@ -232,7 +273,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
}
protected String getSinkConfigurationString(Table table) {
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
......
......@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
......@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/
public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
......@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale());
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT:
case INTEGER:
return new IntType();
......
package com.dlink.cdc.sql;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
private SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
......@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
protected DataStream<Row> buildRow(
private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
......@@ -113,17 +116,24 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}, rowTypeInfo);
}
public void addTableSink(
private void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName);
logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
......@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
}
return dataStreamSource;
}
public String getFlinkDDL(Table table, String tableName) {
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
......@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString();
}
protected String convertSinkColumnType(String type) {
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
......@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
}
protected String getSinkConfigurationString(Table table) {
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
......
......@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.assertion.Asserts;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.Column;
......@@ -55,6 +58,8 @@ import com.dlink.model.Table;
**/
public abstract class AbstractSinkBuilder {
protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class);
protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList();
......@@ -225,7 +230,11 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE:
return new DoubleType();
case DECIMAL:
return new DecimalType(columnType.getPrecision(), columnType.getScale());
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){
return new DecimalType(38, columnType.getScale());
}else{
return new DecimalType(columnType.getPrecision(), columnType.getScale());
}
case INT:
case INTEGER:
return new IntType();
......
package com.dlink.cdc.sql;
import com.dlink.model.Column;
import com.dlink.model.ColumnType;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -53,7 +56,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public SQLSinkBuilder() {
}
public SQLSinkBuilder(FlinkCDCConfig config) {
private SQLSinkBuilder(FlinkCDCConfig config) {
super(config);
}
......@@ -62,7 +65,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
protected DataStream<Row> buildRow(
private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
......@@ -120,10 +123,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
logger.info("Create " + viewName + " temporaryView successful...");
String flinkDDL = getFlinkDDL(table, sinkTableName);
logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
String cdcSqlInsert = getCDCSqlInsert(table, sinkTableName, viewName);
logger.info(cdcSqlInsert);
List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create " + sinkTableName + " FlinkSQL insert into successful...");
if (operations.size() > 0) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
......@@ -152,25 +162,35 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful...");
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful...");
List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
}
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) {
env.addOperator(item);
}
logger.info("A total of " + trans.size() + " table cdc sync were build successfull...");
}
return dataStreamSource;
}
public String getFlinkDDL(Table table, String tableName) {
private String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
......@@ -211,7 +231,31 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return sb.toString();
}
protected String convertSinkColumnType(String type) {
private String getCDCSqlInsert(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append(getColumnProcessing(table.getColumns().get(i)) + " \n");
}
sb.append(" FROM ");
sb.append(sourceName);
return sb.toString();
}
private String getColumnProcessing(Column column) {
if ("true".equals(config.getSink().get("column.replace.line-break")) && ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`" + column.getName() + "`, '\\n', '') AS `" + column.getName() + "`";
} else {
return "`" + column.getName() + "`";
}
}
private String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
......@@ -235,7 +279,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
}
protected String getSinkConfigurationString(Table table) {
private String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
......
......@@ -158,6 +158,7 @@ public class FlinkCDCConfig {
case "table.suffix":
case "table.upper":
case "table.lower":
case "column.replace.line-break":
return true;
default:
return false;
......
......@@ -60,7 +60,11 @@ public enum ColumnType {
public String getFlinkType() {
if (flinkType.equals("DECIMAL")) {
return flinkType + "(" + precision + "," + scale + ")";
if (precision == null || precision == 0) {
return flinkType + "(" + 38 + "," + scale + ")";
} else {
return flinkType + "(" + precision + "," + scale + ")";
}
} else {
return flinkType;
}
......
......@@ -42,31 +42,4 @@ public class BatchTest {
TableResult tableResult = tEnv.executeSql(select);
tableResult.print();
}
@Test
public void batchTest2() {
String source = "CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" buyer ROW<first_name STRING, last_name STRING>,\n" +
" order_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'number-of-rows' = '100'\n" +
")";
String select = "select order_number,price,order_time from Orders";
// LocalEnvironment environment = ExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
// configuration.setString("execution.runtime-mode", "STREAMING");
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);
CustomTableEnvironmentImpl batchTableEnvironment = CustomTableEnvironmentImpl.create(environment,
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), tableConfig);
batchTableEnvironment.executeSql(source);
batchTableEnvironment.executeSql(select);
// TableResult tableResult = batchTableEnvironment.executeSql(select);
// tableResult.print();
}
}
package com.dlink.trans;
import com.dlink.executor.CustomTableEnvironmentImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
......@@ -13,6 +15,8 @@ import java.util.List;
*/
public class AbstractOperation {
protected static final Logger logger = LoggerFactory.getLogger(AbstractOperation.class);
protected String statement;
public AbstractOperation() {
......
......@@ -50,6 +50,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
@Override
public TableResult build(Executor executor) {
logger.info("Start build CDCSOURCE Task...");
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
......@@ -90,17 +91,25 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
}
schemaList.add(schema);
}
logger.info("A total of " + schemaTableNameList.size() + " tables were detected...");
for (int i = 0; i < schemaTableNameList.size(); i++) {
logger.info((i + 1) + ": " + schemaTableNameList.get(i));
}
config.setSchemaTableNameList(schemaTableNameList);
config.setSchemaList(schemaList);
StreamExecutionEnvironment streamExecutionEnvironment = executor.getStreamExecutionEnvironment();
if (Asserts.isNotNull(config.getParallelism())) {
streamExecutionEnvironment.setParallelism(config.getParallelism());
logger.info("Set parallelism: " + config.getParallelism());
}
if (Asserts.isNotNull(config.getCheckpoint())) {
streamExecutionEnvironment.enableCheckpointing(config.getCheckpoint());
logger.info("Set checkpoint: " + config.getCheckpoint());
}
DataStreamSource<String> streamSource = cdcBuilder.build(streamExecutionEnvironment);
logger.info("Build " + config.getType() + " successful...");
SinkBuilderFactory.buildSinkBuilder(config).build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource);
logger.info("Build CDCSOURCE Task successful!");
} catch (Exception e) {
e.printStackTrace();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment