Unverified Commit 84d7797e authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-666][client] Capture column type conversion exception details in CDCSOURCE (#667)

* [Feature-654][web] Add task info tab

* [Feature-666][client] Capture column type conversion exception details in CDCSOURCE
Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent 11b97adb
...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType; ...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
/** /**
* AbstractCDCBuilder * AbstractCDCBuilder
...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder { ...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder {
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() { .flatMap(new FlatMapFunction<Map, RowData>() {
@Override @Override
public void flatMap(Map value, Collector<RowData> out) throws Exception { public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) { try {
case "r": switch (value.get("op").toString()) {
case "c": case "r":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); case "c":
igenericRowData.setRowKind(RowKind.INSERT); GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
Map idata = (Map) value.get("after"); igenericRowData.setRowKind(RowKind.INSERT);
for (int i = 0; i < columnNameList.size(); i++) { Map idata = (Map) value.get("after");
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(igenericRowData); }
break; out.collect(igenericRowData);
case "d": break;
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size()); case "d":
dgenericRowData.setRowKind(RowKind.DELETE); GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
Map ddata = (Map) value.get("before"); dgenericRowData.setRowKind(RowKind.DELETE);
for (int i = 0; i < columnNameList.size(); i++) { Map ddata = (Map) value.get("before");
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(dgenericRowData); }
break; out.collect(dgenericRowData);
case "u": break;
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size()); case "u":
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE); GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
Map ubdata = (Map) value.get("before"); ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
for (int i = 0; i < columnNameList.size(); i++) { Map ubdata = (Map) value.get("before");
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(ubgenericRowData); }
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size()); out.collect(ubgenericRowData);
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER); GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
Map uadata = (Map) value.get("after"); uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
for (int i = 0; i < columnNameList.size(); i++) { Map uadata = (Map) value.get("after");
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(uagenericRowData); }
break; out.collect(uagenericRowData);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
} }
}); });
...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder { ...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder {
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName());
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder { ...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){ if (columnType.getPrecision() == null || columnType.getPrecision() == 0) {
return new DecimalType(38, columnType.getScale()); return new DecimalType(38, columnType.getScale());
}else{ } else {
return new DecimalType(columnType.getPrecision(), columnType.getScale()); return new DecimalType(columnType.getPrecision(), columnType.getScale());
} }
case INT: case INT:
...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder { ...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder {
return tableName; return tableName;
} }
protected List<String> getPKList(Table table){ protected List<String> getPKList(Table table) {
List<String> pks = new ArrayList<>(); List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){ if (Asserts.isNullCollection(table.getColumns())) {
return pks; return pks;
} }
for(Column column: table.getColumns()){ for (Column column : table.getColumns()) {
if(column.isKeyFlag()){ if (column.isKeyFlag()) {
pks.add(column.getName()); pks.add(column.getName());
} }
} }
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -22,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -22,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -36,6 +30,18 @@ import java.util.ArrayList; ...@@ -36,6 +30,18 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
* *
...@@ -62,7 +68,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -62,7 +68,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
...@@ -73,38 +80,43 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -73,38 +80,43 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
.flatMap(new FlatMapFunction<Map, Row>() { .flatMap(new FlatMapFunction<Map, Row>() {
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { try {
case "r": switch (value.get("op").toString()) {
case "c": case "r":
Row irow = Row.ofKind(RowKind.INSERT); case "c":
Map idata = (Map) value.get("after"); Row irow = Row.ofKind(RowKind.INSERT);
for (int i = 0; i < columnNameList.size(); i++) { Map idata = (Map) value.get("after");
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(irow); }
break; out.collect(irow);
case "d": break;
Row drow = Row.ofKind(RowKind.DELETE); case "d":
Map ddata = (Map) value.get("before"); Row drow = Row.ofKind(RowKind.DELETE);
for (int i = 0; i < columnNameList.size(); i++) { Map ddata = (Map) value.get("before");
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(drow); }
break; out.collect(drow);
case "u": break;
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE); case "u":
Map ubdata = (Map) value.get("before"); Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
for (int i = 0; i < columnNameList.size(); i++) { Map ubdata = (Map) value.get("before");
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(ubrow); }
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER); out.collect(ubrow);
Map uadata = (Map) value.get("after"); Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
for (int i = 0; i < columnNameList.size(); i++) { Map uadata = (Map) value.get("after");
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(uarow); }
break; out.collect(uarow);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
} }
}, rowTypeInfo); }, rowTypeInfo);
...@@ -161,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -161,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) { } catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} }
...@@ -191,23 +204,23 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -191,23 +204,23 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else { } else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} }
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else { } else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} }
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) { } else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return ((Integer) value).longValue(); return ((Integer) value).longValue();
}else { } else {
return value; return value;
} }
} else { } else {
......
...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType; ...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
/** /**
* AbstractCDCBuilder * AbstractCDCBuilder
...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder { ...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder {
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() { .flatMap(new FlatMapFunction<Map, RowData>() {
@Override @Override
public void flatMap(Map value, Collector<RowData> out) throws Exception { public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) { try {
case "r": switch (value.get("op").toString()) {
case "c": case "r":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); case "c":
igenericRowData.setRowKind(RowKind.INSERT); GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
Map idata = (Map) value.get("after"); igenericRowData.setRowKind(RowKind.INSERT);
for (int i = 0; i < columnNameList.size(); i++) { Map idata = (Map) value.get("after");
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(igenericRowData); }
break; out.collect(igenericRowData);
case "d": break;
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size()); case "d":
dgenericRowData.setRowKind(RowKind.DELETE); GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
Map ddata = (Map) value.get("before"); dgenericRowData.setRowKind(RowKind.DELETE);
for (int i = 0; i < columnNameList.size(); i++) { Map ddata = (Map) value.get("before");
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(dgenericRowData); }
break; out.collect(dgenericRowData);
case "u": break;
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size()); case "u":
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE); GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
Map ubdata = (Map) value.get("before"); ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
for (int i = 0; i < columnNameList.size(); i++) { Map ubdata = (Map) value.get("before");
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(ubgenericRowData); }
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size()); out.collect(ubgenericRowData);
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER); GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
Map uadata = (Map) value.get("after"); uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
for (int i = 0; i < columnNameList.size(); i++) { Map uadata = (Map) value.get("after");
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(uagenericRowData); }
break; out.collect(uagenericRowData);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
} }
}); });
...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder { ...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder {
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName());
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder { ...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){ if (columnType.getPrecision() == null || columnType.getPrecision() == 0) {
return new DecimalType(38, columnType.getScale()); return new DecimalType(38, columnType.getScale());
}else{ } else {
return new DecimalType(columnType.getPrecision(), columnType.getScale()); return new DecimalType(columnType.getPrecision(), columnType.getScale());
} }
case INT: case INT:
...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder { ...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder {
return tableName; return tableName;
} }
protected List<String> getPKList(Table table){ protected List<String> getPKList(Table table) {
List<String> pks = new ArrayList<>(); List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){ if (Asserts.isNullCollection(table.getColumns())) {
return pks; return pks;
} }
for(Column column: table.getColumns()){ for (Column column : table.getColumns()) {
if(column.isKeyFlag()){ if (column.isKeyFlag()) {
pks.add(column.getName()); pks.add(column.getName());
} }
} }
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -22,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -22,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -36,6 +30,18 @@ import java.util.ArrayList; ...@@ -36,6 +30,18 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
* *
...@@ -62,7 +68,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -62,7 +68,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
...@@ -73,48 +80,53 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -73,48 +80,53 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
.flatMap(new FlatMapFunction<Map, Row>() { .flatMap(new FlatMapFunction<Map, Row>() {
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { try {
case "r": switch (value.get("op").toString()) {
case "c": case "r":
Row irow = Row.ofKind(RowKind.INSERT); case "c":
Map idata = (Map) value.get("after"); Row irow = Row.ofKind(RowKind.INSERT);
for (int i = 0; i < columnNameList.size(); i++) { Map idata = (Map) value.get("after");
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(irow); }
break; out.collect(irow);
case "d": break;
Row drow = Row.ofKind(RowKind.DELETE); case "d":
Map ddata = (Map) value.get("before"); Row drow = Row.ofKind(RowKind.DELETE);
for (int i = 0; i < columnNameList.size(); i++) { Map ddata = (Map) value.get("before");
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(drow); }
break; out.collect(drow);
case "u": break;
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE); case "u":
Map ubdata = (Map) value.get("before"); Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
for (int i = 0; i < columnNameList.size(); i++) { Map ubdata = (Map) value.get("before");
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(ubrow); }
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER); out.collect(ubrow);
Map uadata = (Map) value.get("after"); Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
for (int i = 0; i < columnNameList.size(); i++) { Map uadata = (Map) value.get("after");
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(uarow); }
break; out.collect(uarow);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
} }
}, rowTypeInfo); }, rowTypeInfo);
} }
private void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table); String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
...@@ -161,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -161,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) { } catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} }
...@@ -191,23 +204,23 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -191,23 +204,23 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else { } else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} }
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else { } else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} }
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) { } else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return ((Integer) value).longValue(); return ((Integer) value).longValue();
}else { } else {
return value; return value;
} }
} else { } else {
......
...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType; ...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
/** /**
* AbstractCDCBuilder * AbstractCDCBuilder
...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder { ...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder {
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() { .flatMap(new FlatMapFunction<Map, RowData>() {
@Override @Override
public void flatMap(Map value, Collector<RowData> out) throws Exception { public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) { try {
case "r": switch (value.get("op").toString()) {
case "c": case "r":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); case "c":
igenericRowData.setRowKind(RowKind.INSERT); GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
Map idata = (Map) value.get("after"); igenericRowData.setRowKind(RowKind.INSERT);
for (int i = 0; i < columnNameList.size(); i++) { Map idata = (Map) value.get("after");
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(igenericRowData); }
break; out.collect(igenericRowData);
case "d": break;
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size()); case "d":
dgenericRowData.setRowKind(RowKind.DELETE); GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
Map ddata = (Map) value.get("before"); dgenericRowData.setRowKind(RowKind.DELETE);
for (int i = 0; i < columnNameList.size(); i++) { Map ddata = (Map) value.get("before");
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(dgenericRowData); }
break; out.collect(dgenericRowData);
case "u": break;
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size()); case "u":
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE); GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
Map ubdata = (Map) value.get("before"); ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
for (int i = 0; i < columnNameList.size(); i++) { Map ubdata = (Map) value.get("before");
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(ubgenericRowData); }
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size()); out.collect(ubgenericRowData);
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER); GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
Map uadata = (Map) value.get("after"); uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
for (int i = 0; i < columnNameList.size(); i++) { Map uadata = (Map) value.get("after");
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(uagenericRowData); }
break; out.collect(uagenericRowData);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
} }
}); });
...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder { ...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder {
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName());
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder { ...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){ if (columnType.getPrecision() == null || columnType.getPrecision() == 0) {
return new DecimalType(38, columnType.getScale()); return new DecimalType(38, columnType.getScale());
}else{ } else {
return new DecimalType(columnType.getPrecision(), columnType.getScale()); return new DecimalType(columnType.getPrecision(), columnType.getScale());
} }
case INT: case INT:
...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder { ...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder {
return tableName; return tableName;
} }
protected List<String> getPKList(Table table){ protected List<String> getPKList(Table table) {
List<String> pks = new ArrayList<>(); List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){ if (Asserts.isNullCollection(table.getColumns())) {
return pks; return pks;
} }
for(Column column: table.getColumns()){ for (Column column : table.getColumns()) {
if(column.isKeyFlag()){ if (column.isKeyFlag()) {
pks.add(column.getName()); pks.add(column.getName());
} }
} }
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.model.*;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -16,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -16,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -35,7 +35,12 @@ import com.dlink.cdc.AbstractSinkBuilder; ...@@ -35,7 +35,12 @@ import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder; import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import com.dlink.utils.SqlUtil; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
...@@ -61,9 +66,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -61,9 +66,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
...@@ -71,9 +77,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -71,9 +77,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() { .flatMap(new FlatMapFunction<Map, Row>() {
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
try {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
...@@ -107,15 +114,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -107,15 +114,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(uarow); out.collect(uarow);
break; break;
} }
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
}, rowTypeInfo); }
}, rowTypeInfo);
} }
private void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table); String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
...@@ -151,10 +162,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -151,10 +162,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public DataStreamSource build( public DataStreamSource build(
CDCBuilder cdcBuilder, CDCBuilder cdcBuilder,
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
...@@ -162,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -162,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) { } catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} }
......
...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType; ...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
/** /**
* AbstractCDCBuilder * AbstractCDCBuilder
...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder { ...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder {
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() { .flatMap(new FlatMapFunction<Map, RowData>() {
@Override @Override
public void flatMap(Map value, Collector<RowData> out) throws Exception { public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) { try {
case "r": switch (value.get("op").toString()) {
case "c": case "r":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); case "c":
igenericRowData.setRowKind(RowKind.INSERT); GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
Map idata = (Map) value.get("after"); igenericRowData.setRowKind(RowKind.INSERT);
for (int i = 0; i < columnNameList.size(); i++) { Map idata = (Map) value.get("after");
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(igenericRowData); }
break; out.collect(igenericRowData);
case "d": break;
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size()); case "d":
dgenericRowData.setRowKind(RowKind.DELETE); GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
Map ddata = (Map) value.get("before"); dgenericRowData.setRowKind(RowKind.DELETE);
for (int i = 0; i < columnNameList.size(); i++) { Map ddata = (Map) value.get("before");
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(dgenericRowData); }
break; out.collect(dgenericRowData);
case "u": break;
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size()); case "u":
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE); GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
Map ubdata = (Map) value.get("before"); ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
for (int i = 0; i < columnNameList.size(); i++) { Map ubdata = (Map) value.get("before");
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(ubgenericRowData); }
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size()); out.collect(ubgenericRowData);
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER); GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
Map uadata = (Map) value.get("after"); uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
for (int i = 0; i < columnNameList.size(); i++) { Map uadata = (Map) value.get("after");
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(uagenericRowData); }
break; out.collect(uagenericRowData);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
} }
}); });
...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder { ...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder {
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName());
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder { ...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){ if (columnType.getPrecision() == null || columnType.getPrecision() == 0) {
return new DecimalType(38, columnType.getScale()); return new DecimalType(38, columnType.getScale());
}else{ } else {
return new DecimalType(columnType.getPrecision(), columnType.getScale()); return new DecimalType(columnType.getPrecision(), columnType.getScale());
} }
case INT: case INT:
...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder { ...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder {
return tableName; return tableName;
} }
protected List<String> getPKList(Table table){ protected List<String> getPKList(Table table) {
List<String> pks = new ArrayList<>(); List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){ if (Asserts.isNullCollection(table.getColumns())) {
return pks; return pks;
} }
for(Column column: table.getColumns()){ for (Column column : table.getColumns()) {
if(column.isKeyFlag()){ if (column.isKeyFlag()) {
pks.add(column.getName()); pks.add(column.getName());
} }
} }
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -22,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -22,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -36,6 +30,18 @@ import java.util.ArrayList; ...@@ -36,6 +30,18 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
* *
...@@ -60,9 +66,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -60,9 +66,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
...@@ -70,9 +77,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -70,9 +77,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() { .flatMap(new FlatMapFunction<Map, Row>() {
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
try {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
...@@ -106,15 +114,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -106,15 +114,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(uarow); out.collect(uarow);
break; break;
} }
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
}, rowTypeInfo); }
}, rowTypeInfo);
} }
private void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table); String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
...@@ -150,10 +162,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -150,10 +162,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public DataStreamSource build( public DataStreamSource build(
CDCBuilder cdcBuilder, CDCBuilder cdcBuilder,
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
...@@ -161,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -161,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) { } catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} }
......
...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType; ...@@ -49,6 +49,7 @@ import com.dlink.model.ColumnType;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
/** /**
* AbstractCDCBuilder * AbstractCDCBuilder
...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder { ...@@ -118,47 +119,53 @@ public abstract class AbstractSinkBuilder {
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() { .flatMap(new FlatMapFunction<Map, RowData>() {
@Override @Override
public void flatMap(Map value, Collector<RowData> out) throws Exception { public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) { try {
case "r": switch (value.get("op").toString()) {
case "c": case "r":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); case "c":
igenericRowData.setRowKind(RowKind.INSERT); GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
Map idata = (Map) value.get("after"); igenericRowData.setRowKind(RowKind.INSERT);
for (int i = 0; i < columnNameList.size(); i++) { Map idata = (Map) value.get("after");
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(igenericRowData); }
break; out.collect(igenericRowData);
case "d": break;
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size()); case "d":
dgenericRowData.setRowKind(RowKind.DELETE); GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
Map ddata = (Map) value.get("before"); dgenericRowData.setRowKind(RowKind.DELETE);
for (int i = 0; i < columnNameList.size(); i++) { Map ddata = (Map) value.get("before");
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(dgenericRowData); }
break; out.collect(dgenericRowData);
case "u": break;
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size()); case "u":
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE); GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
Map ubdata = (Map) value.get("before"); ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
for (int i = 0; i < columnNameList.size(); i++) { Map ubdata = (Map) value.get("before");
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(ubgenericRowData); }
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size()); out.collect(ubgenericRowData);
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER); GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
Map uadata = (Map) value.get("after"); uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
for (int i = 0; i < columnNameList.size(); i++) { Map uadata = (Map) value.get("after");
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); for (int i = 0; i < columnNameList.size(); i++) {
} uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
out.collect(uagenericRowData); }
break; out.collect(uagenericRowData);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
} }
}); });
...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder { ...@@ -191,7 +198,7 @@ public abstract class AbstractSinkBuilder {
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName());
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder { ...@@ -230,9 +237,9 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
if(columnType.getPrecision() == null || columnType.getPrecision() == 0){ if (columnType.getPrecision() == null || columnType.getPrecision() == 0) {
return new DecimalType(38, columnType.getScale()); return new DecimalType(38, columnType.getScale());
}else{ } else {
return new DecimalType(columnType.getPrecision(), columnType.getScale()); return new DecimalType(columnType.getPrecision(), columnType.getScale());
} }
case INT: case INT:
...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder { ...@@ -303,13 +310,13 @@ public abstract class AbstractSinkBuilder {
return tableName; return tableName;
} }
protected List<String> getPKList(Table table){ protected List<String> getPKList(Table table) {
List<String> pks = new ArrayList<>(); List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){ if (Asserts.isNullCollection(table.getColumns())) {
return pks; return pks;
} }
for(Column column: table.getColumns()){ for (Column column : table.getColumns()) {
if(column.isKeyFlag()){ if (column.isKeyFlag()) {
pks.add(column.getName()); pks.add(column.getName());
} }
} }
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -22,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -22,7 +12,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -36,6 +30,18 @@ import java.util.ArrayList; ...@@ -36,6 +30,18 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
* *
...@@ -60,9 +66,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -60,9 +66,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList,
String schemaTableName) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
...@@ -70,9 +77,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -70,9 +77,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() { .flatMap(new FlatMapFunction<Map, Row>() {
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
try {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
...@@ -106,15 +114,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -106,15 +114,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(uarow); out.collect(uarow);
break; break;
} }
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
} }
}, rowTypeInfo); }
}, rowTypeInfo);
} }
private void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table); String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
...@@ -150,10 +162,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -150,10 +162,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public DataStreamSource build( public DataStreamSource build(
CDCBuilder cdcBuilder, CDCBuilder cdcBuilder,
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
...@@ -161,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -161,18 +173,19 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
logger.info("Build " + table.getSchemaTableName() + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName);
logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) { } catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment