Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
ec3ae8b2
Unverified
Commit
ec3ae8b2
authored
Apr 16, 2022
by
xiaoguaiguai
Committed by
GitHub
Apr 16, 2022
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'DataLinkDC:dev' into dev
parents
337e12a3
684d7fe8
Changes
17
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
325 additions
and
27 deletions
+325
-27
ClusterConfigurationController.java
.../com/dlink/controller/ClusterConfigurationController.java
+1
-1
Cluster.java
dlink-admin/src/main/java/com/dlink/model/Cluster.java
+1
-1
ClusterConfiguration.java
...n/src/main/java/com/dlink/model/ClusterConfiguration.java
+1
-1
DataBase.java
dlink-admin/src/main/java/com/dlink/model/DataBase.java
+1
-1
Task.java
dlink-admin/src/main/java/com/dlink/model/Task.java
+3
-3
User.java
dlink-admin/src/main/java/com/dlink/model/User.java
+3
-3
TaskServiceImpl.java
...src/main/java/com/dlink/service/impl/TaskServiceImpl.java
+1
-1
UserServiceImpl.java
...src/main/java/com/dlink/service/impl/UserServiceImpl.java
+5
-5
pom.xml
dlink-connectors/dlink-connector-jdbc-1.11/pom.xml
+1
-1
OracleRowConverter.java
...connector/jdbc/internal/converter/OracleRowConverter.java
+75
-0
pom.xml
dlink-connectors/dlink-connector-jdbc-1.12/pom.xml
+1
-1
OracleRowConverter.java
...connector/jdbc/internal/converter/OracleRowConverter.java
+75
-0
pom.xml
dlink-connectors/dlink-connector-jdbc-1.13/pom.xml
+1
-1
OracleRowConverter.java
...connector/jdbc/internal/converter/OracleRowConverter.java
+75
-0
pom.xml
dlink-connectors/dlink-connector-jdbc-1.14/pom.xml
+1
-1
OracleRowConverter.java
...connector/jdbc/internal/converter/OracleRowConverter.java
+75
-0
DaemonTask.java
...aemon/src/main/java/com/dlink/daemon/task/DaemonTask.java
+5
-7
No files found.
dlink-admin/src/main/java/com/dlink/controller/ClusterConfigurationController.java
View file @
ec3ae8b2
...
@@ -39,7 +39,7 @@ public class ClusterConfigurationController {
...
@@ -39,7 +39,7 @@ public class ClusterConfigurationController {
@PutMapping
@PutMapping
public
Result
saveOrUpdate
(
@RequestBody
ClusterConfiguration
clusterConfiguration
)
{
public
Result
saveOrUpdate
(
@RequestBody
ClusterConfiguration
clusterConfiguration
)
{
TestResult
testResult
=
clusterConfigurationService
.
testGateway
(
clusterConfiguration
);
TestResult
testResult
=
clusterConfigurationService
.
testGateway
(
clusterConfiguration
);
clusterConfiguration
.
setAvailable
(
testResult
.
isAvailable
());
clusterConfiguration
.
set
Is
Available
(
testResult
.
isAvailable
());
if
(
clusterConfigurationService
.
saveOrUpdate
(
clusterConfiguration
))
{
if
(
clusterConfigurationService
.
saveOrUpdate
(
clusterConfiguration
))
{
return
Result
.
succeed
(
Asserts
.
isNotNull
(
clusterConfiguration
.
getId
())
?
"修改成功"
:
"新增成功"
);
return
Result
.
succeed
(
Asserts
.
isNotNull
(
clusterConfiguration
.
getId
())
?
"修改成功"
:
"新增成功"
);
}
else
{
}
else
{
...
...
dlink-admin/src/main/java/com/dlink/model/Cluster.java
View file @
ec3ae8b2
...
@@ -35,7 +35,7 @@ public class Cluster extends SuperEntity {
...
@@ -35,7 +35,7 @@ public class Cluster extends SuperEntity {
private
String
note
;
private
String
note
;
private
b
oolean
autoRegisters
;
private
B
oolean
autoRegisters
;
private
Integer
clusterConfigurationId
;
private
Integer
clusterConfigurationId
;
...
...
dlink-admin/src/main/java/com/dlink/model/ClusterConfiguration.java
View file @
ec3ae8b2
...
@@ -33,7 +33,7 @@ public class ClusterConfiguration extends SuperEntity {
...
@@ -33,7 +33,7 @@ public class ClusterConfiguration extends SuperEntity {
private
String
configJson
;
private
String
configJson
;
private
b
oolean
isAvailable
;
private
B
oolean
isAvailable
;
private
String
note
;
private
String
note
;
...
...
dlink-admin/src/main/java/com/dlink/model/DataBase.java
View file @
ec3ae8b2
...
@@ -44,7 +44,7 @@ public class DataBase extends SuperEntity {
...
@@ -44,7 +44,7 @@ public class DataBase extends SuperEntity {
private
String
dbVersion
;
private
String
dbVersion
;
private
b
oolean
status
;
private
B
oolean
status
;
private
LocalDateTime
healthTime
;
private
LocalDateTime
healthTime
;
...
...
dlink-admin/src/main/java/com/dlink/model/Task.java
View file @
ec3ae8b2
...
@@ -44,11 +44,11 @@ public class Task extends SuperEntity {
...
@@ -44,11 +44,11 @@ public class Task extends SuperEntity {
private
Integer
parallelism
;
private
Integer
parallelism
;
private
b
oolean
fragment
;
private
B
oolean
fragment
;
private
b
oolean
statementSet
;
private
B
oolean
statementSet
;
private
b
oolean
batchModel
;
private
B
oolean
batchModel
;
private
Integer
clusterId
;
private
Integer
clusterId
;
...
...
dlink-admin/src/main/java/com/dlink/model/User.java
View file @
ec3ae8b2
...
@@ -38,9 +38,9 @@ public class User implements Serializable {
...
@@ -38,9 +38,9 @@ public class User implements Serializable {
private
String
mobile
;
private
String
mobile
;
private
b
oolean
enabled
;
private
B
oolean
enabled
;
private
b
oolean
isDelete
;
private
B
oolean
isDelete
;
@TableField
(
fill
=
FieldFill
.
INSERT
)
@TableField
(
fill
=
FieldFill
.
INSERT
)
private
LocalDateTime
createTime
;
private
LocalDateTime
createTime
;
...
@@ -49,5 +49,5 @@ public class User implements Serializable {
...
@@ -49,5 +49,5 @@ public class User implements Serializable {
private
LocalDateTime
updateTime
;
private
LocalDateTime
updateTime
;
@TableField
(
exist
=
false
)
@TableField
(
exist
=
false
)
private
b
oolean
isAdmin
;
private
B
oolean
isAdmin
;
}
}
dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java
View file @
ec3ae8b2
...
@@ -487,7 +487,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
...
@@ -487,7 +487,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private
JobConfig
buildJobConfig
(
Task
task
)
{
private
JobConfig
buildJobConfig
(
Task
task
)
{
boolean
isJarTask
=
Dialect
.
FLINKJAR
.
equalsVal
(
task
.
getDialect
());
boolean
isJarTask
=
Dialect
.
FLINKJAR
.
equalsVal
(
task
.
getDialect
());
if
(!
isJarTask
&&
task
.
is
Fragment
())
{
if
(!
isJarTask
&&
task
.
get
Fragment
())
{
String
flinkWithSql
=
dataBaseService
.
getEnabledFlinkWithSql
();
String
flinkWithSql
=
dataBaseService
.
getEnabledFlinkWithSql
();
if
(
Asserts
.
isNotNullString
(
flinkWithSql
))
{
if
(
Asserts
.
isNotNullString
(
flinkWithSql
))
{
task
.
setStatement
(
flinkWithSql
+
"\r\n"
+
task
.
getStatement
());
task
.
setStatement
(
flinkWithSql
+
"\r\n"
+
task
.
getStatement
());
...
...
dlink-admin/src/main/java/com/dlink/service/impl/UserServiceImpl.java
View file @
ec3ae8b2
...
@@ -33,7 +33,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
...
@@ -33,7 +33,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
}
}
user
.
setPassword
(
SaSecureUtil
.
md5
(
user
.
getPassword
()));
user
.
setPassword
(
SaSecureUtil
.
md5
(
user
.
getPassword
()));
user
.
setEnabled
(
true
);
user
.
setEnabled
(
true
);
user
.
setDelete
(
false
);
user
.
set
Is
Delete
(
false
);
if
(
save
(
user
))
{
if
(
save
(
user
))
{
return
Result
.
succeed
(
"注册成功"
);
return
Result
.
succeed
(
"注册成功"
);
}
else
{
}
else
{
...
@@ -69,7 +69,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
...
@@ -69,7 +69,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
public
boolean
removeUser
(
Integer
id
)
{
public
boolean
removeUser
(
Integer
id
)
{
User
user
=
new
User
();
User
user
=
new
User
();
user
.
setId
(
id
);
user
.
setId
(
id
);
user
.
setDelete
(
true
);
user
.
set
Is
Delete
(
true
);
return
updateById
(
user
);
return
updateById
(
user
);
}
}
...
@@ -84,10 +84,10 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
...
@@ -84,10 +84,10 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
return
Result
.
failed
(
"密码不能为空"
);
return
Result
.
failed
(
"密码不能为空"
);
}
}
if
(
Asserts
.
isEquals
(
SaSecureUtil
.
md5
(
password
),
userPassword
))
{
if
(
Asserts
.
isEquals
(
SaSecureUtil
.
md5
(
password
),
userPassword
))
{
if
(
user
.
i
sDelete
())
{
if
(
user
.
getI
sDelete
())
{
return
Result
.
failed
(
"账号不存在"
);
return
Result
.
failed
(
"账号不存在"
);
}
}
if
(!
user
.
is
Enabled
())
{
if
(!
user
.
get
Enabled
())
{
return
Result
.
failed
(
"账号已被禁用"
);
return
Result
.
failed
(
"账号已被禁用"
);
}
}
StpUtil
.
login
(
user
.
getId
(),
isRemember
);
StpUtil
.
login
(
user
.
getId
(),
isRemember
);
...
@@ -102,7 +102,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
...
@@ -102,7 +102,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
public
User
getUserByUsername
(
String
username
)
{
public
User
getUserByUsername
(
String
username
)
{
User
user
=
getOne
(
new
QueryWrapper
<
User
>().
eq
(
"username"
,
username
).
eq
(
"is_delete"
,
0
));
User
user
=
getOne
(
new
QueryWrapper
<
User
>().
eq
(
"username"
,
username
).
eq
(
"is_delete"
,
0
));
if
(
Asserts
.
isNotNull
(
user
))
{
if
(
Asserts
.
isNotNull
(
user
))
{
user
.
setAdmin
(
Asserts
.
isEqualsIgnoreCase
(
username
,
"admin"
));
user
.
set
Is
Admin
(
Asserts
.
isEqualsIgnoreCase
(
username
,
"admin"
));
}
}
return
user
;
return
user
;
}
}
...
...
dlink-connectors/dlink-connector-jdbc-1.11/pom.xml
View file @
ec3ae8b2
...
@@ -136,7 +136,7 @@
...
@@ -136,7 +136,7 @@
<dependency>
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<artifactId>
ojdbc8
</artifactId>
<scope>
test
</scope>
<scope>
provided
</scope>
</dependency>
</dependency>
<!-- SQLServer test dependencies -->
<!-- SQLServer test dependencies -->
<dependency>
<dependency>
...
...
dlink-connectors/dlink-connector-jdbc-1.11/src/main/java/org/apache/flink/connector/jdbc/internal/converter/OracleRowConverter.java
View file @
ec3ae8b2
...
@@ -18,8 +18,20 @@
...
@@ -18,8 +18,20 @@
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.RowType
;
import
org.apache.flink.table.types.logical.RowType
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Date
;
import
java.sql.Time
;
import
java.sql.Timestamp
;
import
java.time.LocalDateTime
;
/**
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
* Oracle.
...
@@ -39,4 +51,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
...
@@ -39,4 +51,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
public
OracleRowConverter
(
RowType
rowType
)
{
public
OracleRowConverter
(
RowType
rowType
)
{
super
(
rowType
);
super
(
rowType
);
}
}
@Override
protected
JdbcDeserializationConverter
createInternalConverter
(
LogicalType
type
)
{
switch
(
type
.
getTypeRoot
())
{
case
NULL:
return
val
->
null
;
case
BOOLEAN:
case
FLOAT:
case
DOUBLE:
case
INTERVAL_YEAR_MONTH:
case
INTERVAL_DAY_TIME:
return
val
->
val
;
case
TINYINT:
return
val
->
((
Integer
)
val
).
byteValue
();
case
SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return
val
->
val
instanceof
Integer
?
((
Integer
)
val
).
shortValue
()
:
val
;
case
INTEGER:
case
BIGINT:
return
val
->
val
;
case
DECIMAL:
final
int
precision
=
((
DecimalType
)
type
).
getPrecision
();
final
int
scale
=
((
DecimalType
)
type
).
getScale
();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return
val
->
val
instanceof
BigInteger
?
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
BigInteger
)
val
,
0
),
precision
,
scale
)
:
DecimalData
.
fromBigDecimal
((
BigDecimal
)
val
,
precision
,
scale
);
case
DATE:
return
val
->
(
int
)
(((
Date
)
val
).
toLocalDate
().
toEpochDay
());
case
TIME_WITHOUT_TIME_ZONE:
return
val
->
(
int
)
(((
Time
)
val
).
toLocalTime
().
toNanoOfDay
()
/
1_000_000L
);
case
TIMESTAMP_WITH_TIME_ZONE:
case
TIMESTAMP_WITHOUT_TIME_ZONE:
return
(
val
)
->
{
if
(
val
instanceof
LocalDateTime
){
return
TimestampData
.
fromLocalDateTime
((
LocalDateTime
)
val
);
}
else
if
(
val
instanceof
oracle
.
sql
.
TIMESTAMP
){
return
TimestampData
.
fromTimestamp
(
Timestamp
.
valueOf
(((
oracle
.
sql
.
TIMESTAMP
)
val
).
stringValue
()));
}
else
{
return
TimestampData
.
fromTimestamp
((
Timestamp
)
val
);
}
};
case
CHAR:
case
VARCHAR:
return
val
->
StringData
.
fromString
((
String
)
val
);
case
BINARY:
case
VARBINARY:
return
val
->
(
byte
[])
val
;
case
ARRAY:
case
ROW:
case
MAP:
case
MULTISET:
case
RAW:
default
:
throw
new
UnsupportedOperationException
(
"Unsupported type:"
+
type
);
}
}
}
}
dlink-connectors/dlink-connector-jdbc-1.12/pom.xml
View file @
ec3ae8b2
...
@@ -136,7 +136,7 @@
...
@@ -136,7 +136,7 @@
<dependency>
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<artifactId>
ojdbc8
</artifactId>
<scope>
test
</scope>
<scope>
provided
</scope>
</dependency>
</dependency>
<!-- SQLServer test dependencies -->
<!-- SQLServer test dependencies -->
<dependency>
<dependency>
...
...
dlink-connectors/dlink-connector-jdbc-1.12/src/main/java/org/apache/flink/connector/jdbc/internal/converter/OracleRowConverter.java
View file @
ec3ae8b2
...
@@ -18,8 +18,20 @@
...
@@ -18,8 +18,20 @@
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.RowType
;
import
org.apache.flink.table.types.logical.RowType
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Date
;
import
java.sql.Time
;
import
java.sql.Timestamp
;
import
java.time.LocalDateTime
;
/**
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
* Oracle.
...
@@ -39,4 +51,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
...
@@ -39,4 +51,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
public
OracleRowConverter
(
RowType
rowType
)
{
public
OracleRowConverter
(
RowType
rowType
)
{
super
(
rowType
);
super
(
rowType
);
}
}
@Override
protected
JdbcDeserializationConverter
createInternalConverter
(
LogicalType
type
)
{
switch
(
type
.
getTypeRoot
())
{
case
NULL:
return
val
->
null
;
case
BOOLEAN:
case
FLOAT:
case
DOUBLE:
case
INTERVAL_YEAR_MONTH:
case
INTERVAL_DAY_TIME:
return
val
->
val
;
case
TINYINT:
return
val
->
((
Integer
)
val
).
byteValue
();
case
SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return
val
->
val
instanceof
Integer
?
((
Integer
)
val
).
shortValue
()
:
val
;
case
INTEGER:
case
BIGINT:
return
val
->
val
;
case
DECIMAL:
final
int
precision
=
((
DecimalType
)
type
).
getPrecision
();
final
int
scale
=
((
DecimalType
)
type
).
getScale
();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return
val
->
val
instanceof
BigInteger
?
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
BigInteger
)
val
,
0
),
precision
,
scale
)
:
DecimalData
.
fromBigDecimal
((
BigDecimal
)
val
,
precision
,
scale
);
case
DATE:
return
val
->
(
int
)
(((
Date
)
val
).
toLocalDate
().
toEpochDay
());
case
TIME_WITHOUT_TIME_ZONE:
return
val
->
(
int
)
(((
Time
)
val
).
toLocalTime
().
toNanoOfDay
()
/
1_000_000L
);
case
TIMESTAMP_WITH_TIME_ZONE:
case
TIMESTAMP_WITHOUT_TIME_ZONE:
return
(
val
)
->
{
if
(
val
instanceof
LocalDateTime
){
return
TimestampData
.
fromLocalDateTime
((
LocalDateTime
)
val
);
}
else
if
(
val
instanceof
oracle
.
sql
.
TIMESTAMP
){
return
TimestampData
.
fromTimestamp
(
Timestamp
.
valueOf
(((
oracle
.
sql
.
TIMESTAMP
)
val
).
stringValue
()));
}
else
{
return
TimestampData
.
fromTimestamp
((
Timestamp
)
val
);
}
};
case
CHAR:
case
VARCHAR:
return
val
->
StringData
.
fromString
((
String
)
val
);
case
BINARY:
case
VARBINARY:
return
val
->
(
byte
[])
val
;
case
ARRAY:
case
ROW:
case
MAP:
case
MULTISET:
case
RAW:
default
:
throw
new
UnsupportedOperationException
(
"Unsupported type:"
+
type
);
}
}
}
}
dlink-connectors/dlink-connector-jdbc-1.13/pom.xml
View file @
ec3ae8b2
...
@@ -128,7 +128,7 @@
...
@@ -128,7 +128,7 @@
<dependency>
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<artifactId>
ojdbc8
</artifactId>
<scope>
test
</scope>
<scope>
provided
</scope>
</dependency>
</dependency>
<!-- SQLServer test dependencies -->
<!-- SQLServer test dependencies -->
...
...
dlink-connectors/dlink-connector-jdbc-1.13/src/main/java/org/apache/flink/connector/jdbc/internal/converter/OracleRowConverter.java
View file @
ec3ae8b2
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.RowType
;
import
org.apache.flink.table.types.logical.RowType
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Date
;
import
java.sql.Time
;
import
java.sql.Timestamp
;
import
java.time.LocalDateTime
;
/**
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
* Oracle.
...
@@ -21,4 +33,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
...
@@ -21,4 +33,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
public
OracleRowConverter
(
RowType
rowType
)
{
public
OracleRowConverter
(
RowType
rowType
)
{
super
(
rowType
);
super
(
rowType
);
}
}
@Override
protected
JdbcDeserializationConverter
createInternalConverter
(
LogicalType
type
)
{
switch
(
type
.
getTypeRoot
())
{
case
NULL:
return
val
->
null
;
case
BOOLEAN:
case
FLOAT:
case
DOUBLE:
case
INTERVAL_YEAR_MONTH:
case
INTERVAL_DAY_TIME:
return
val
->
val
;
case
TINYINT:
return
val
->
((
Integer
)
val
).
byteValue
();
case
SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return
val
->
val
instanceof
Integer
?
((
Integer
)
val
).
shortValue
()
:
val
;
case
INTEGER:
case
BIGINT:
return
val
->
val
;
case
DECIMAL:
final
int
precision
=
((
DecimalType
)
type
).
getPrecision
();
final
int
scale
=
((
DecimalType
)
type
).
getScale
();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return
val
->
val
instanceof
BigInteger
?
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
BigInteger
)
val
,
0
),
precision
,
scale
)
:
DecimalData
.
fromBigDecimal
((
BigDecimal
)
val
,
precision
,
scale
);
case
DATE:
return
val
->
(
int
)
(((
Date
)
val
).
toLocalDate
().
toEpochDay
());
case
TIME_WITHOUT_TIME_ZONE:
return
val
->
(
int
)
(((
Time
)
val
).
toLocalTime
().
toNanoOfDay
()
/
1_000_000L
);
case
TIMESTAMP_WITH_TIME_ZONE:
case
TIMESTAMP_WITHOUT_TIME_ZONE:
return
(
val
)
->
{
if
(
val
instanceof
LocalDateTime
){
return
TimestampData
.
fromLocalDateTime
((
LocalDateTime
)
val
);
}
else
if
(
val
instanceof
oracle
.
sql
.
TIMESTAMP
){
return
TimestampData
.
fromTimestamp
(
Timestamp
.
valueOf
(((
oracle
.
sql
.
TIMESTAMP
)
val
).
stringValue
()));
}
else
{
return
TimestampData
.
fromTimestamp
((
Timestamp
)
val
);
}
};
case
CHAR:
case
VARCHAR:
return
val
->
StringData
.
fromString
((
String
)
val
);
case
BINARY:
case
VARBINARY:
return
val
->
(
byte
[])
val
;
case
ARRAY:
case
ROW:
case
MAP:
case
MULTISET:
case
RAW:
default
:
throw
new
UnsupportedOperationException
(
"Unsupported type:"
+
type
);
}
}
}
}
dlink-connectors/dlink-connector-jdbc-1.14/pom.xml
View file @
ec3ae8b2
...
@@ -113,7 +113,7 @@
...
@@ -113,7 +113,7 @@
<dependency>
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<artifactId>
ojdbc8
</artifactId>
<scope>
test
</scope>
<scope>
provided
</scope>
</dependency>
</dependency>
<!-- SQLServer test dependencies -->
<!-- SQLServer test dependencies -->
...
...
dlink-connectors/dlink-connector-jdbc-1.14/src/main/java/org/apache/flink/connector/jdbc/internal/converter/OracleRowConverter.java
View file @
ec3ae8b2
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
package
org
.
apache
.
flink
.
connector
.
jdbc
.
internal
.
converter
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.data.TimestampData
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.RowType
;
import
org.apache.flink.table.types.logical.RowType
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Date
;
import
java.sql.Time
;
import
java.sql.Timestamp
;
import
java.time.LocalDateTime
;
/**
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
* Oracle.
...
@@ -21,4 +33,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
...
@@ -21,4 +33,67 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
public
OracleRowConverter
(
RowType
rowType
)
{
public
OracleRowConverter
(
RowType
rowType
)
{
super
(
rowType
);
super
(
rowType
);
}
}
@Override
protected
JdbcDeserializationConverter
createInternalConverter
(
LogicalType
type
)
{
switch
(
type
.
getTypeRoot
())
{
case
NULL:
return
val
->
null
;
case
BOOLEAN:
case
FLOAT:
case
DOUBLE:
case
INTERVAL_YEAR_MONTH:
case
INTERVAL_DAY_TIME:
return
val
->
val
;
case
TINYINT:
return
val
->
((
Integer
)
val
).
byteValue
();
case
SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return
val
->
val
instanceof
Integer
?
((
Integer
)
val
).
shortValue
()
:
val
;
case
INTEGER:
case
BIGINT:
return
val
->
val
;
case
DECIMAL:
final
int
precision
=
((
DecimalType
)
type
).
getPrecision
();
final
int
scale
=
((
DecimalType
)
type
).
getScale
();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return
val
->
val
instanceof
BigInteger
?
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
BigInteger
)
val
,
0
),
precision
,
scale
)
:
DecimalData
.
fromBigDecimal
((
BigDecimal
)
val
,
precision
,
scale
);
case
DATE:
return
val
->
(
int
)
(((
Date
)
val
).
toLocalDate
().
toEpochDay
());
case
TIME_WITHOUT_TIME_ZONE:
return
val
->
(
int
)
(((
Time
)
val
).
toLocalTime
().
toNanoOfDay
()
/
1_000_000L
);
case
TIMESTAMP_WITH_TIME_ZONE:
case
TIMESTAMP_WITHOUT_TIME_ZONE:
return
(
val
)
->
{
if
(
val
instanceof
LocalDateTime
){
return
TimestampData
.
fromLocalDateTime
((
LocalDateTime
)
val
);
}
else
if
(
val
instanceof
oracle
.
sql
.
TIMESTAMP
){
return
TimestampData
.
fromTimestamp
(
Timestamp
.
valueOf
(((
oracle
.
sql
.
TIMESTAMP
)
val
).
stringValue
()));
}
else
{
return
TimestampData
.
fromTimestamp
((
Timestamp
)
val
);
}
};
case
CHAR:
case
VARCHAR:
return
val
->
StringData
.
fromString
((
String
)
val
);
case
BINARY:
case
VARBINARY:
return
val
->
val
;
case
ARRAY:
case
ROW:
case
MAP:
case
MULTISET:
case
RAW:
default
:
throw
new
UnsupportedOperationException
(
"Unsupported type:"
+
type
);
}
}
}
}
dlink-daemon/src/main/java/com/dlink/daemon/task/DaemonTask.java
View file @
ec3ae8b2
package
com
.
dlink
.
daemon
.
task
;
package
com
.
dlink
.
daemon
.
task
;
import
java.util.Optional
;
import
java.util.ServiceLoader
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.daemon.exception.DaemonTaskException
;
import
com.dlink.daemon.exception.DaemonTaskException
;
import
sun.misc.Service
;
import
java.util.Iterator
;
import
java.util.Optional
;
public
interface
DaemonTask
{
public
interface
DaemonTask
{
static
Optional
<
DaemonTask
>
get
(
DaemonTaskConfig
config
)
{
static
Optional
<
DaemonTask
>
get
(
DaemonTaskConfig
config
)
{
Asserts
.
checkNotNull
(
config
,
"线程任务配置不能为空"
);
Asserts
.
checkNotNull
(
config
,
"线程任务配置不能为空"
);
Iterator
<
DaemonTask
>
providers
=
Service
.
providers
(
DaemonTask
.
class
);
ServiceLoader
<
DaemonTask
>
daemonTasks
=
ServiceLoader
.
load
(
DaemonTask
.
class
);
while
(
providers
.
hasNext
())
{
for
(
DaemonTask
daemonTask
:
daemonTasks
)
{
DaemonTask
daemonTask
=
providers
.
next
();
if
(
daemonTask
.
canHandle
(
config
.
getType
()))
{
if
(
daemonTask
.
canHandle
(
config
.
getType
()))
{
return
Optional
.
of
(
daemonTask
.
setConfig
(
config
));
return
Optional
.
of
(
daemonTask
.
setConfig
(
config
));
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment