Unverified Commit ddd35b5d authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-571] [client] Fix CDCSOURCE String can't be cast to Timestamp

[Fix-571] [client] Fix CDCSOURCE String can't be cast to Timestamp
parents 48420487 86ec757c
...@@ -60,9 +60,9 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -60,9 +60,9 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
...@@ -70,51 +70,51 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -70,51 +70,51 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() { .flatMap(new FlatMapFunction<Map, Row>() {
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(irow); out.collect(irow);
break; break;
case "d": case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before"); Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(drow); out.collect(drow);
break; break;
case "u": case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before"); Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(ubrow); out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after"); Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(uarow); out.collect(uarow);
break; break;
}
} }
} }, rowTypeInfo);
}, rowTypeInfo);
} }
private void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
...@@ -148,10 +148,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -148,10 +148,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public DataStreamSource build( public DataStreamSource build(
CDCBuilder cdcBuilder, CDCBuilder cdcBuilder,
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
...@@ -263,23 +263,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -263,23 +263,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else { } else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} }
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else { } else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} }
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) { } else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return ((Integer) value).longValue(); return ((Integer) value).longValue();
}else { } else {
return value; return value;
} }
} else { } else {
......
...@@ -67,9 +67,9 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -67,9 +67,9 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
...@@ -77,51 +77,51 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -77,51 +77,51 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() { .flatMap(new FlatMapFunction<Map, Row>() {
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(irow); out.collect(irow);
break; break;
case "d": case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before"); Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(drow); out.collect(drow);
break; break;
case "u": case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before"); Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(ubrow); out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after"); Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(uarow); out.collect(uarow);
break; break;
}
} }
} }, rowTypeInfo);
}, rowTypeInfo);
} }
private void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
...@@ -155,10 +155,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -155,10 +155,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public DataStreamSource build( public DataStreamSource build(
CDCBuilder cdcBuilder, CDCBuilder cdcBuilder,
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
...@@ -176,7 +176,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -176,7 +176,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink..."); logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}catch (Exception e) { } catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
...@@ -270,23 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -270,23 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else { } else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} }
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else { } else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} }
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) { } else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return ((Integer) value).longValue(); return ((Integer) value).longValue();
}else { } else {
return value; return value;
} }
} else { } else {
......
...@@ -67,9 +67,9 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -67,9 +67,9 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList) { List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
...@@ -77,51 +77,51 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -77,51 +77,51 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() { .flatMap(new FlatMapFunction<Map, Row>() {
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(irow); out.collect(irow);
break; break;
case "d": case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before"); Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(drow); out.collect(drow);
break; break;
case "u": case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before"); Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(ubrow); out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after"); Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(uarow); out.collect(uarow);
break; break;
}
} }
} }, rowTypeInfo);
}, rowTypeInfo);
} }
public void addTableSink( public void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
Table table, Table table,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
...@@ -155,10 +155,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -155,10 +155,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public DataStreamSource build( public DataStreamSource build(
CDCBuilder cdcBuilder, CDCBuilder cdcBuilder,
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
...@@ -176,7 +176,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -176,7 +176,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Build " + table.getSchemaTableName() + " flatMap successful...");
logger.info("Start build " + table.getSchemaTableName() + " sink..."); logger.info("Start build " + table.getSchemaTableName() + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
}catch (Exception e) { } catch (Exception e) {
logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error("Build " + table.getSchemaTableName() + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
...@@ -270,23 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -270,23 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else { } else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} }
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else { } else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} }
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) { } else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){ if (value instanceof Integer) {
return ((Integer) value).longValue(); return ((Integer) value).longValue();
}else { } else {
return value; return value;
} }
} else { } else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment