Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
07dfec49
Commit
07dfec49
authored
Mar 20, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
executor format
parent
3bc3deb2
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
32 additions
and
33 deletions
+32
-33
FunctionManager.java
...main/java/com/dlink/catalog/function/FunctionManager.java
+5
-5
EnvironmentSetting.java
.../src/main/java/com/dlink/executor/EnvironmentSetting.java
+5
-5
ExecutorSetting.java
...tor/src/main/java/com/dlink/executor/ExecutorSetting.java
+13
-13
AbstractOperation.java
...utor/src/main/java/com/dlink/trans/AbstractOperation.java
+4
-4
CreateOperation.java
...ecutor/src/main/java/com/dlink/trans/CreateOperation.java
+1
-1
CDCSource.java
...executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
+4
-5
No files found.
dlink-executor/src/main/java/com/dlink/catalog/function/FunctionManager.java
View file @
07dfec49
...
...
@@ -17,7 +17,7 @@ import java.util.Map;
@Deprecated
public
class
FunctionManager
{
private
static
Map
<
String
,
UDFunction
>
functions
=
new
HashMap
<
String
,
UDFunction
>()
{
private
static
Map
<
String
,
UDFunction
>
functions
=
new
HashMap
<
String
,
UDFunction
>()
{
{
put
(
FlinkFunctionConstant
.
GET_KEY
,
new
UDFunction
(
FlinkFunctionConstant
.
GET_KEY
,
...
...
@@ -34,12 +34,12 @@ public class FunctionManager {
}
};
public
static
Map
<
String
,
UDFunction
>
getUsedFunctions
(
String
statement
)
{
Map
<
String
,
UDFunction
>
map
=
new
HashMap
<>();
public
static
Map
<
String
,
UDFunction
>
getUsedFunctions
(
String
statement
)
{
Map
<
String
,
UDFunction
>
map
=
new
HashMap
<>();
String
sql
=
statement
.
toLowerCase
();
for
(
Map
.
Entry
<
String
,
UDFunction
>
entry
:
functions
.
entrySet
())
{
if
(
sql
.
contains
(
entry
.
getKey
().
toLowerCase
()))
{
map
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
if
(
sql
.
contains
(
entry
.
getKey
().
toLowerCase
()))
{
map
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
return
map
;
...
...
dlink-executor/src/main/java/com/dlink/executor/EnvironmentSetting.java
View file @
07dfec49
...
...
@@ -31,17 +31,17 @@ public class EnvironmentSetting {
this
.
useRemote
=
true
;
}
public
static
EnvironmentSetting
build
(
String
address
){
Asserts
.
checkNull
(
address
,
"Flink 地址不能为空"
);
public
static
EnvironmentSetting
build
(
String
address
)
{
Asserts
.
checkNull
(
address
,
"Flink 地址不能为空"
);
String
[]
strs
=
address
.
split
(
NetConstant
.
COLON
);
if
(
strs
.
length
>=
2
)
{
return
new
EnvironmentSetting
(
strs
[
0
],
Integer
.
parseInt
(
strs
[
1
]));
return
new
EnvironmentSetting
(
strs
[
0
],
Integer
.
parseInt
(
strs
[
1
]));
}
else
{
return
new
EnvironmentSetting
(
strs
[
0
],
FlinkConstant
.
FLINK_REST_DEFAULT_PORT
);
return
new
EnvironmentSetting
(
strs
[
0
],
FlinkConstant
.
FLINK_REST_DEFAULT_PORT
);
}
}
public
String
getAddress
(){
public
String
getAddress
()
{
return
host
+
NetConstant
.
COLON
+
port
;
}
...
...
dlink-executor/src/main/java/com/dlink/executor/ExecutorSetting.java
View file @
07dfec49
...
...
@@ -28,8 +28,8 @@ public class ExecutorSetting {
private
boolean
useStatementSet
;
private
String
savePointPath
;
private
String
jobName
;
private
Map
<
String
,
String
>
config
;
public
static
final
ExecutorSetting
DEFAULT
=
new
ExecutorSetting
(
0
,
1
,
true
);
private
Map
<
String
,
String
>
config
;
public
static
final
ExecutorSetting
DEFAULT
=
new
ExecutorSetting
(
0
,
1
,
true
);
private
static
final
ObjectMapper
mapper
=
new
ObjectMapper
();
public
ExecutorSetting
(
boolean
useSqlFragment
)
{
...
...
@@ -75,7 +75,7 @@ public class ExecutorSetting {
this
.
config
=
config
;
}
public
ExecutorSetting
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
public
ExecutorSetting
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
boolean
useBatchModel
,
String
savePointPath
,
String
jobName
,
Map
<
String
,
String
>
config
)
{
this
.
checkpoint
=
checkpoint
;
this
.
parallelism
=
parallelism
;
...
...
@@ -87,29 +87,29 @@ public class ExecutorSetting {
this
.
config
=
config
;
}
public
static
ExecutorSetting
build
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
boolean
useBatchModel
,
String
savePointPath
,
String
jobName
,
String
configJson
)
{
List
<
Map
<
String
,
String
>>
configList
=
new
ArrayList
<>();
if
(
Asserts
.
isNotNullString
(
configJson
))
{
public
static
ExecutorSetting
build
(
Integer
checkpoint
,
Integer
parallelism
,
boolean
useSqlFragment
,
boolean
useStatementSet
,
boolean
useBatchModel
,
String
savePointPath
,
String
jobName
,
String
configJson
)
{
List
<
Map
<
String
,
String
>>
configList
=
new
ArrayList
<>();
if
(
Asserts
.
isNotNullString
(
configJson
))
{
try
{
configList
=
mapper
.
readValue
(
configJson
,
ArrayList
.
class
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
}
Map
<
String
,
String
>
config
=
new
HashMap
<>();
for
(
Map
<
String
,
String
>
item
:
configList
)
{
config
.
put
(
item
.
get
(
"key"
),
item
.
get
(
"value"
));
Map
<
String
,
String
>
config
=
new
HashMap
<>();
for
(
Map
<
String
,
String
>
item
:
configList
)
{
config
.
put
(
item
.
get
(
"key"
),
item
.
get
(
"value"
));
}
return
new
ExecutorSetting
(
checkpoint
,
parallelism
,
useSqlFragment
,
useStatementSet
,
useBatchModel
,
savePointPath
,
jobName
,
config
);
return
new
ExecutorSetting
(
checkpoint
,
parallelism
,
useSqlFragment
,
useStatementSet
,
useBatchModel
,
savePointPath
,
jobName
,
config
);
}
public
static
ExecutorSetting
build
(
Map
<
String
,
String
>
settingMap
)
{
public
static
ExecutorSetting
build
(
Map
<
String
,
String
>
settingMap
)
{
Integer
checkpoint
=
null
;
Integer
parallelism
=
null
;
if
(
settingMap
.
containsKey
(
"checkpoint"
)&&!
""
.
equals
(
settingMap
.
get
(
"checkpoint"
)))
{
if
(
settingMap
.
containsKey
(
"checkpoint"
)
&&
!
""
.
equals
(
settingMap
.
get
(
"checkpoint"
)))
{
checkpoint
=
Integer
.
valueOf
(
settingMap
.
get
(
"checkpoint"
));
}
if
(
settingMap
.
containsKey
(
"parallelism"
)&&!
""
.
equals
(
settingMap
.
get
(
"parallelism"
)))
{
if
(
settingMap
.
containsKey
(
"parallelism"
)
&&
!
""
.
equals
(
settingMap
.
get
(
"parallelism"
)))
{
parallelism
=
Integer
.
valueOf
(
settingMap
.
get
(
"parallelism"
));
}
return
build
(
checkpoint
,
...
...
dlink-executor/src/main/java/com/dlink/trans/AbstractOperation.java
View file @
07dfec49
...
...
@@ -30,17 +30,17 @@ public class AbstractOperation {
this
.
statement
=
statement
;
}
public
boolean
checkFunctionExist
(
CustomTableEnvironmentImpl
stEnvironment
,
String
key
)
{
public
boolean
checkFunctionExist
(
CustomTableEnvironmentImpl
stEnvironment
,
String
key
)
{
String
[]
udfs
=
stEnvironment
.
listUserDefinedFunctions
();
List
<
String
>
udflist
=
Arrays
.
asList
(
udfs
);
if
(
udflist
.
contains
(
key
.
toLowerCase
()))
{
if
(
udflist
.
contains
(
key
.
toLowerCase
()))
{
return
true
;
}
else
{
}
else
{
return
false
;
}
}
public
boolean
noExecute
(){
public
boolean
noExecute
()
{
return
true
;
}
}
dlink-executor/src/main/java/com/dlink/trans/CreateOperation.java
View file @
07dfec49
...
...
@@ -6,7 +6,7 @@ package com.dlink.trans;
* @author wenmo
* @since 2021/6/13 19:34
*/
public
interface
CreateOperation
extends
Operation
{
public
interface
CreateOperation
extends
Operation
{
//void create(CustomTableEnvironmentImpl stEnvironment);
}
dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
View file @
07dfec49
...
...
@@ -2,7 +2,6 @@ package com.dlink.trans.ddl;
import
com.dlink.assertion.Asserts
;
import
com.dlink.parser.SingleSqlParserFactory
;
import
org.apache.commons.lang3.StringUtils
;
import
java.util.Arrays
;
import
java.util.HashMap
;
...
...
@@ -62,10 +61,10 @@ public class CDCSource {
config
.
get
(
"topic"
),
config
.
get
(
"brokers"
)
);
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"database"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"database"
)))
{
cdcSource
.
setDatabase
(
Arrays
.
asList
(
config
.
get
(
"database"
).
split
(
":"
)));
}
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"table"
)))
{
if
(
Asserts
.
isNotNullString
(
config
.
get
(
"table"
)))
{
cdcSource
.
setTable
(
Arrays
.
asList
(
config
.
get
(
"table"
).
split
(
":"
)));
}
return
cdcSource
;
...
...
@@ -76,8 +75,8 @@ public class CDCSource {
Pattern
p
=
Pattern
.
compile
(
"'(.*?)'\\s*=\\s*'(.*?)'"
);
for
(
int
i
=
0
;
i
<
list
.
size
();
i
++)
{
Matcher
m
=
p
.
matcher
(
list
.
get
(
i
));
if
(
m
.
find
())
{
map
.
put
(
m
.
group
(
1
),
m
.
group
(
2
));
if
(
m
.
find
())
{
map
.
put
(
m
.
group
(
1
),
m
.
group
(
2
));
}
}
return
map
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment