Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
bd5fc13c
Unverified
Commit
bd5fc13c
authored
Apr 15, 2022
by
aiwenmo
Committed by
GitHub
Apr 15, 2022
Browse files
Options
Browse Files
Download
Plain Diff
Fix Bug
Fix Bug
parents
2d03d13b
b91d42b7
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
309 additions
and
11 deletions
+309
-11
pom.xml
dlink-connectors/dlink-connector-jdbc-1.11/pom.xml
+1
-1
OracleRowConverter.java
...connector/jdbc/internal/converter/OracleRowConverter.java
+75
-0
pom.xml
dlink-connectors/dlink-connector-jdbc-1.12/pom.xml
+1
-1
OracleRowConverter.java
...connector/jdbc/internal/converter/OracleRowConverter.java
+75
-0
pom.xml
dlink-connectors/dlink-connector-jdbc-1.13/pom.xml
+1
-1
OracleRowConverter.java
...connector/jdbc/internal/converter/OracleRowConverter.java
+75
-0
pom.xml
dlink-connectors/dlink-connector-jdbc-1.14/pom.xml
+1
-1
OracleRowConverter.java
...connector/jdbc/internal/converter/OracleRowConverter.java
+75
-0
DaemonTask.java
...aemon/src/main/java/com/dlink/daemon/task/DaemonTask.java
+5
-7
No files found.
dlink-connectors/dlink-connector-jdbc-1.11/pom.xml
View file @
bd5fc13c
...
...
@@ -136,7 +136,7 @@
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<scope>
test
</scope>
<scope>
provided
</scope>
</dependency>
<!-- SQLServer test dependencies -->
<dependency>
...
...
dlink-connectors/dlink-connector-jdbc-1.11/src/main/java/org/apache/flink/connector/jdbc/internal/converter/OracleRowConverter.java
View file @
bd5fc13c
...
...
@@ -18,8 +18,20 @@
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.RowType
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Date
;
import
java.sql.Time
;
import
java.sql.Timestamp
;
import
java.time.LocalDateTime
;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
...
...
@@ -39,4 +51,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
public
OracleRowConverter
(
RowType
rowType
)
{
super
(
rowType
);
}
@Override
protected
JdbcDeserializationConverter
createInternalConverter
(
LogicalType
type
)
{
switch
(
type
.
getTypeRoot
())
{
case
NULL:
return
val
->
null
;
case
BOOLEAN:
case
FLOAT:
case
DOUBLE:
case
INTERVAL_YEAR_MONTH:
case
INTERVAL_DAY_TIME:
return
val
->
val
;
case
TINYINT:
return
val
->
((
Integer
)
val
).
byteValue
();
case
SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return
val
->
val
instanceof
Integer
?
((
Integer
)
val
).
shortValue
()
:
val
;
case
INTEGER:
case
BIGINT:
return
val
->
val
;
case
DECIMAL:
final
int
precision
=
((
DecimalType
)
type
).
getPrecision
();
final
int
scale
=
((
DecimalType
)
type
).
getScale
();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return
val
->
val
instanceof
BigInteger
?
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
BigInteger
)
val
,
0
),
precision
,
scale
)
:
DecimalData
.
fromBigDecimal
((
BigDecimal
)
val
,
precision
,
scale
);
case
DATE:
return
val
->
(
int
)
(((
Date
)
val
).
toLocalDate
().
toEpochDay
());
case
TIME_WITHOUT_TIME_ZONE:
return
val
->
(
int
)
(((
Time
)
val
).
toLocalTime
().
toNanoOfDay
()
/
1_000_000L
);
case
TIMESTAMP_WITH_TIME_ZONE:
case
TIMESTAMP_WITHOUT_TIME_ZONE:
return
(
val
)
->
{
if
(
val
instanceof
LocalDateTime
){
return
TimestampData
.
fromLocalDateTime
((
LocalDateTime
)
val
);
}
else
if
(
val
instanceof
oracle
.
sql
.
TIMESTAMP
){
return
TimestampData
.
fromTimestamp
(
Timestamp
.
valueOf
(((
oracle
.
sql
.
TIMESTAMP
)
val
).
stringValue
()));
}
else
{
return
TimestampData
.
fromTimestamp
((
Timestamp
)
val
);
}
};
case
CHAR:
case
VARCHAR:
return
val
->
StringData
.
fromString
((
String
)
val
);
case
BINARY:
case
VARBINARY:
return
val
->
(
byte
[])
val
;
case
ARRAY:
case
ROW:
case
MAP:
case
MULTISET:
case
RAW:
default
:
throw
new
UnsupportedOperationException
(
"Unsupported type:"
+
type
);
}
}
}
dlink-connectors/dlink-connector-jdbc-1.12/pom.xml
View file @
bd5fc13c
...
...
@@ -136,7 +136,7 @@
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<scope>
test
</scope>
<scope>
provided
</scope>
</dependency>
<!-- SQLServer test dependencies -->
<dependency>
...
...
dlink-connectors/dlink-connector-jdbc-1.12/src/main/java/org/apache/flink/connector/jdbc/internal/converter/OracleRowConverter.java
View file @
bd5fc13c
...
...
@@ -18,8 +18,20 @@
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.RowType
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Date
;
import
java.sql.Time
;
import
java.sql.Timestamp
;
import
java.time.LocalDateTime
;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
...
...
@@ -39,4 +51,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
public
OracleRowConverter
(
RowType
rowType
)
{
super
(
rowType
);
}
@Override
protected
JdbcDeserializationConverter
createInternalConverter
(
LogicalType
type
)
{
switch
(
type
.
getTypeRoot
())
{
case
NULL:
return
val
->
null
;
case
BOOLEAN:
case
FLOAT:
case
DOUBLE:
case
INTERVAL_YEAR_MONTH:
case
INTERVAL_DAY_TIME:
return
val
->
val
;
case
TINYINT:
return
val
->
((
Integer
)
val
).
byteValue
();
case
SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return
val
->
val
instanceof
Integer
?
((
Integer
)
val
).
shortValue
()
:
val
;
case
INTEGER:
case
BIGINT:
return
val
->
val
;
case
DECIMAL:
final
int
precision
=
((
DecimalType
)
type
).
getPrecision
();
final
int
scale
=
((
DecimalType
)
type
).
getScale
();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return
val
->
val
instanceof
BigInteger
?
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
BigInteger
)
val
,
0
),
precision
,
scale
)
:
DecimalData
.
fromBigDecimal
((
BigDecimal
)
val
,
precision
,
scale
);
case
DATE:
return
val
->
(
int
)
(((
Date
)
val
).
toLocalDate
().
toEpochDay
());
case
TIME_WITHOUT_TIME_ZONE:
return
val
->
(
int
)
(((
Time
)
val
).
toLocalTime
().
toNanoOfDay
()
/
1_000_000L
);
case
TIMESTAMP_WITH_TIME_ZONE:
case
TIMESTAMP_WITHOUT_TIME_ZONE:
return
(
val
)
->
{
if
(
val
instanceof
LocalDateTime
){
return
TimestampData
.
fromLocalDateTime
((
LocalDateTime
)
val
);
}
else
if
(
val
instanceof
oracle
.
sql
.
TIMESTAMP
){
return
TimestampData
.
fromTimestamp
(
Timestamp
.
valueOf
(((
oracle
.
sql
.
TIMESTAMP
)
val
).
stringValue
()));
}
else
{
return
TimestampData
.
fromTimestamp
((
Timestamp
)
val
);
}
};
case
CHAR:
case
VARCHAR:
return
val
->
StringData
.
fromString
((
String
)
val
);
case
BINARY:
case
VARBINARY:
return
val
->
(
byte
[])
val
;
case
ARRAY:
case
ROW:
case
MAP:
case
MULTISET:
case
RAW:
default
:
throw
new
UnsupportedOperationException
(
"Unsupported type:"
+
type
);
}
}
}
dlink-connectors/dlink-connector-jdbc-1.13/pom.xml
View file @
bd5fc13c
...
...
@@ -128,7 +128,7 @@
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<scope>
test
</scope>
<scope>
provided
</scope>
</dependency>
<!-- SQLServer test dependencies -->
...
...
dlink-connectors/dlink-connector-jdbc-1.13/src/main/java/org/apache/flink/connector/jdbc/internal/converter/OracleRowConverter.java
View file @
bd5fc13c
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.RowType
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Date
;
import
java.sql.Time
;
import
java.sql.Timestamp
;
import
java.time.LocalDateTime
;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
...
...
@@ -21,4 +33,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
public
OracleRowConverter
(
RowType
rowType
)
{
super
(
rowType
);
}
@Override
protected
JdbcDeserializationConverter
createInternalConverter
(
LogicalType
type
)
{
switch
(
type
.
getTypeRoot
())
{
case
NULL:
return
val
->
null
;
case
BOOLEAN:
case
FLOAT:
case
DOUBLE:
case
INTERVAL_YEAR_MONTH:
case
INTERVAL_DAY_TIME:
return
val
->
val
;
case
TINYINT:
return
val
->
((
Integer
)
val
).
byteValue
();
case
SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return
val
->
val
instanceof
Integer
?
((
Integer
)
val
).
shortValue
()
:
val
;
case
INTEGER:
case
BIGINT:
return
val
->
val
;
case
DECIMAL:
final
int
precision
=
((
DecimalType
)
type
).
getPrecision
();
final
int
scale
=
((
DecimalType
)
type
).
getScale
();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return
val
->
val
instanceof
BigInteger
?
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
BigInteger
)
val
,
0
),
precision
,
scale
)
:
DecimalData
.
fromBigDecimal
((
BigDecimal
)
val
,
precision
,
scale
);
case
DATE:
return
val
->
(
int
)
(((
Date
)
val
).
toLocalDate
().
toEpochDay
());
case
TIME_WITHOUT_TIME_ZONE:
return
val
->
(
int
)
(((
Time
)
val
).
toLocalTime
().
toNanoOfDay
()
/
1_000_000L
);
case
TIMESTAMP_WITH_TIME_ZONE:
case
TIMESTAMP_WITHOUT_TIME_ZONE:
return
(
val
)
->
{
if
(
val
instanceof
LocalDateTime
){
return
TimestampData
.
fromLocalDateTime
((
LocalDateTime
)
val
);
}
else
if
(
val
instanceof
oracle
.
sql
.
TIMESTAMP
){
return
TimestampData
.
fromTimestamp
(
Timestamp
.
valueOf
(((
oracle
.
sql
.
TIMESTAMP
)
val
).
stringValue
()));
}
else
{
return
TimestampData
.
fromTimestamp
((
Timestamp
)
val
);
}
};
case
CHAR:
case
VARCHAR:
return
val
->
StringData
.
fromString
((
String
)
val
);
case
BINARY:
case
VARBINARY:
return
val
->
(
byte
[])
val
;
case
ARRAY:
case
ROW:
case
MAP:
case
MULTISET:
case
RAW:
default
:
throw
new
UnsupportedOperationException
(
"Unsupported type:"
+
type
);
}
}
}
dlink-connectors/dlink-connector-jdbc-1.14/pom.xml
View file @
bd5fc13c
...
...
@@ -113,7 +113,7 @@
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<scope>
test
</scope>
<scope>
provided
</scope>
</dependency>
<!-- SQLServer test dependencies -->
...
...
dlink-connectors/dlink-connector-jdbc-1.14/src/main/java/org/apache/flink/connector/jdbc/internal/converter/OracleRowConverter.java
View file @
bd5fc13c
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.RowType
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Date
;
import
java.sql.Time
;
import
java.sql.Timestamp
;
import
java.time.LocalDateTime
;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
...
...
@@ -21,4 +33,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
public
OracleRowConverter
(
RowType
rowType
)
{
super
(
rowType
);
}
@Override
protected
JdbcDeserializationConverter
createInternalConverter
(
LogicalType
type
)
{
switch
(
type
.
getTypeRoot
())
{
case
NULL:
return
val
->
null
;
case
BOOLEAN:
case
FLOAT:
case
DOUBLE:
case
INTERVAL_YEAR_MONTH:
case
INTERVAL_DAY_TIME:
return
val
->
val
;
case
TINYINT:
return
val
->
((
Integer
)
val
).
byteValue
();
case
SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return
val
->
val
instanceof
Integer
?
((
Integer
)
val
).
shortValue
()
:
val
;
case
INTEGER:
case
BIGINT:
return
val
->
val
;
case
DECIMAL:
final
int
precision
=
((
DecimalType
)
type
).
getPrecision
();
final
int
scale
=
((
DecimalType
)
type
).
getScale
();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return
val
->
val
instanceof
BigInteger
?
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
BigInteger
)
val
,
0
),
precision
,
scale
)
:
DecimalData
.
fromBigDecimal
((
BigDecimal
)
val
,
precision
,
scale
);
case
DATE:
return
val
->
(
int
)
(((
Date
)
val
).
toLocalDate
().
toEpochDay
());
case
TIME_WITHOUT_TIME_ZONE:
return
val
->
(
int
)
(((
Time
)
val
).
toLocalTime
().
toNanoOfDay
()
/
1_000_000L
);
case
TIMESTAMP_WITH_TIME_ZONE:
case
TIMESTAMP_WITHOUT_TIME_ZONE:
return
(
val
)
->
{
if
(
val
instanceof
LocalDateTime
){
return
TimestampData
.
fromLocalDateTime
((
LocalDateTime
)
val
);
}
else
if
(
val
instanceof
oracle
.
sql
.
TIMESTAMP
){
return
TimestampData
.
fromTimestamp
(
Timestamp
.
valueOf
(((
oracle
.
sql
.
TIMESTAMP
)
val
).
stringValue
()));
}
else
{
return
TimestampData
.
fromTimestamp
((
Timestamp
)
val
);
}
};
case
CHAR:
case
VARCHAR:
return
val
->
StringData
.
fromString
((
String
)
val
);
case
BINARY:
case
VARBINARY:
return
val
->
val
;
case
ARRAY:
case
ROW:
case
MAP:
case
MULTISET:
case
RAW:
default
:
throw
new
UnsupportedOperationException
(
"Unsupported type:"
+
type
);
}
}
}
dlink-daemon/src/main/java/com/dlink/daemon/task/DaemonTask.java
View file @
bd5fc13c
package
com
.
dlink
.
daemon
.
task
;
import
java.util.Optional
;
import
java.util.ServiceLoader
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.daemon.exception.DaemonTaskException
;
import
sun.misc.Service
;
import
java.util.Iterator
;
import
java.util.Optional
;
public
interface
DaemonTask
{
static
Optional
<
DaemonTask
>
get
(
DaemonTaskConfig
config
)
{
Asserts
.
checkNotNull
(
config
,
"线程任务配置不能为空"
);
Iterator
<
DaemonTask
>
providers
=
Service
.
providers
(
DaemonTask
.
class
);
while
(
providers
.
hasNext
())
{
DaemonTask
daemonTask
=
providers
.
next
();
ServiceLoader
<
DaemonTask
>
daemonTasks
=
ServiceLoader
.
load
(
DaemonTask
.
class
);
for
(
DaemonTask
daemonTask
:
daemonTasks
)
{
if
(
daemonTask
.
canHandle
(
config
.
getType
()))
{
return
Optional
.
of
(
daemonTask
.
setConfig
(
config
));
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment