Unverified Commit 55e50632 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-522] [client] Fix CDCSOURCE OracleCDC number can't be cast to Long

[Fix-522] [client] Fix CDCSOURCE OracleCDC number can't be cast to Long
parents 63ed66e8 45ea7bdf
...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
...@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
}
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){
return ((Integer) value).longValue();
}else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
...@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
}
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){
return ((Integer) value).longValue();
}else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -263,11 +263,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -263,11 +263,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
}
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){
return ((Integer) value).longValue();
}else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
...@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
}
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){
return ((Integer) value).longValue();
}else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
...@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return null; return null;
} }
if (logicalType instanceof DateType) { if (logicalType instanceof DateType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
}
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value); return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){
return ((Integer) value).longValue();
}else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -28,7 +28,7 @@ public class OracleTypeConvert implements ITypeConvert { ...@@ -28,7 +28,7 @@ public class OracleTypeConvert implements ITypeConvert {
if (t.matches("number\\(+\\d\\)")) { if (t.matches("number\\(+\\d\\)")) {
columnType = ColumnType.INTEGER; columnType = ColumnType.INTEGER;
} else if (t.matches("number\\(+\\d{2}+\\)")) { } else if (t.matches("number\\(+\\d{2}+\\)")) {
columnType = ColumnType.LONG; columnType = ColumnType.JAVA_LANG_LONG;
} else { } else {
columnType = ColumnType.DECIMAL; columnType = ColumnType.DECIMAL;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment