Commit 86ec757c authored by Zack Young's avatar Zack Young

修复 TimestampType的值可能是String引发的类型强转bug

parent 13843f5a
......@@ -67,9 +67,9 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
......@@ -77,51 +77,51 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
}
}
}, rowTypeInfo);
}, rowTypeInfo);
}
private void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
......@@ -155,10 +155,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
......@@ -176,7 +176,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}catch (Exception e) {
} catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
......@@ -270,23 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null;
}
if (logicalType instanceof DateType) {
if(value instanceof Integer){
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else {
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else {
} else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){
if (value instanceof Integer) {
return ((Integer) value).longValue();
}else {
} else {
return value;
}
} else {
......
......@@ -67,9 +67,9 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
......@@ -77,51 +77,51 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
}
}
}, rowTypeInfo);
}, rowTypeInfo);
}
public void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
......@@ -155,10 +155,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
......@@ -176,7 +176,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}catch (Exception e) {
} catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e));
}
......@@ -270,23 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null;
}
if (logicalType instanceof DateType) {
if(value instanceof Integer){
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else {
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else {
} else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){
if (value instanceof Integer) {
return ((Integer) value).longValue();
}else {
} else {
return value;
}
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment