Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
45ea7bdf
Commit
45ea7bdf
authored
May 19, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[Fix-522] [client] Fix CDCSOURCE OracleCDC number can't be cast to Long
parent
63ed66e8
Changes
6
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
85 additions
and
11 deletions
+85
-11
SQLSinkBuilder.java
...-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+17
-2
SQLSinkBuilder.java
...-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+17
-2
SQLSinkBuilder.java
...-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+16
-2
SQLSinkBuilder.java
...-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+17
-2
SQLSinkBuilder.java
...-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+17
-2
OracleTypeConvert.java
...in/java/com/dlink/metadata/convert/OracleTypeConvert.java
+1
-1
No files found.
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
45ea7bdf
...
@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...
@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
...
@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
null
;
return
null
;
}
}
if
(
logicalType
instanceof
DateType
)
{
if
(
logicalType
instanceof
DateType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
}
else
if
(
logicalType
instanceof
TimestampType
)
{
}
else
if
(
logicalType
instanceof
TimestampType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
}
else
if
(
logicalType
instanceof
DecimalType
)
{
}
else
if
(
logicalType
instanceof
DecimalType
)
{
return
new
BigDecimal
((
String
)
value
);
return
new
BigDecimal
((
String
)
value
);
}
else
if
(
logicalType
instanceof
BigIntType
)
{
if
(
value
instanceof
Integer
){
return
((
Integer
)
value
).
longValue
();
}
else
{
return
value
;
}
}
else
{
}
else
{
return
value
;
return
value
;
}
}
...
...
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
45ea7bdf
...
@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...
@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
...
@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
null
;
return
null
;
}
}
if
(
logicalType
instanceof
DateType
)
{
if
(
logicalType
instanceof
DateType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
}
else
if
(
logicalType
instanceof
TimestampType
)
{
}
else
if
(
logicalType
instanceof
TimestampType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
}
else
if
(
logicalType
instanceof
DecimalType
)
{
}
else
if
(
logicalType
instanceof
DecimalType
)
{
return
new
BigDecimal
((
String
)
value
);
return
new
BigDecimal
((
String
)
value
);
}
else
if
(
logicalType
instanceof
BigIntType
)
{
if
(
value
instanceof
Integer
){
return
((
Integer
)
value
).
longValue
();
}
else
{
return
value
;
}
}
else
{
}
else
{
return
value
;
return
value
;
}
}
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
45ea7bdf
...
@@ -263,11 +263,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -263,11 +263,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
null
;
return
null
;
}
}
if
(
logicalType
instanceof
DateType
)
{
if
(
logicalType
instanceof
DateType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
}
else
if
(
logicalType
instanceof
TimestampType
)
{
}
else
if
(
logicalType
instanceof
TimestampType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
}
else
if
(
logicalType
instanceof
DecimalType
)
{
}
else
if
(
logicalType
instanceof
DecimalType
)
{
return
new
BigDecimal
((
String
)
value
);
return
new
BigDecimal
((
String
)
value
);
}
else
if
(
logicalType
instanceof
BigIntType
)
{
if
(
value
instanceof
Integer
){
return
((
Integer
)
value
).
longValue
();
}
else
{
return
value
;
}
}
else
{
}
else
{
return
value
;
return
value
;
}
}
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
45ea7bdf
...
@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...
@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
...
@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
null
;
return
null
;
}
}
if
(
logicalType
instanceof
DateType
)
{
if
(
logicalType
instanceof
DateType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
}
else
if
(
logicalType
instanceof
TimestampType
)
{
}
else
if
(
logicalType
instanceof
TimestampType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
}
else
if
(
logicalType
instanceof
DecimalType
)
{
}
else
if
(
logicalType
instanceof
DecimalType
)
{
return
new
BigDecimal
((
String
)
value
);
return
new
BigDecimal
((
String
)
value
);
}
else
if
(
logicalType
instanceof
BigIntType
)
{
if
(
value
instanceof
Integer
){
return
((
Integer
)
value
).
longValue
();
}
else
{
return
value
;
}
}
else
{
}
else
{
return
value
;
return
value
;
}
}
...
...
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
45ea7bdf
...
@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...
@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
...
@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -269,11 +270,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
null
;
return
null
;
}
}
if
(
logicalType
instanceof
DateType
)
{
if
(
logicalType
instanceof
DateType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDate
();
}
}
else
if
(
logicalType
instanceof
TimestampType
)
{
}
else
if
(
logicalType
instanceof
TimestampType
)
{
if
(
value
instanceof
Integer
){
return
Instant
.
ofEpochMilli
(((
Integer
)
value
).
longValue
()).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
else
{
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
return
Instant
.
ofEpochMilli
((
long
)
value
).
atZone
(
ZoneId
.
systemDefault
()).
toLocalDateTime
();
}
}
else
if
(
logicalType
instanceof
DecimalType
)
{
}
else
if
(
logicalType
instanceof
DecimalType
)
{
return
new
BigDecimal
((
String
)
value
);
return
new
BigDecimal
((
String
)
value
);
}
else
if
(
logicalType
instanceof
BigIntType
)
{
if
(
value
instanceof
Integer
){
return
((
Integer
)
value
).
longValue
();
}
else
{
return
value
;
}
}
else
{
}
else
{
return
value
;
return
value
;
}
}
...
...
dlink-metadata/dlink-metadata-oracle/src/main/java/com/dlink/metadata/convert/OracleTypeConvert.java
View file @
45ea7bdf
...
@@ -28,7 +28,7 @@ public class OracleTypeConvert implements ITypeConvert {
...
@@ -28,7 +28,7 @@ public class OracleTypeConvert implements ITypeConvert {
if
(
t
.
matches
(
"number\\(+\\d\\)"
))
{
if
(
t
.
matches
(
"number\\(+\\d\\)"
))
{
columnType
=
ColumnType
.
INTEGER
;
columnType
=
ColumnType
.
INTEGER
;
}
else
if
(
t
.
matches
(
"number\\(+\\d{2}+\\)"
))
{
}
else
if
(
t
.
matches
(
"number\\(+\\d{2}+\\)"
))
{
columnType
=
ColumnType
.
LONG
;
columnType
=
ColumnType
.
JAVA_LANG_
LONG
;
}
else
{
}
else
{
columnType
=
ColumnType
.
DECIMAL
;
columnType
=
ColumnType
.
DECIMAL
;
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment