Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
d712e0a7
Commit
d712e0a7
authored
May 04, 2022
by
zhu-mingye
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix hudi Sink
parent
ed6477cd
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
19 additions
and
21 deletions
+19
-21
HudiSinkBuilder.java
....13/src/main/java/com/dlink/cdc/hudi/HudiSinkBuilder.java
+1
-1
SQLSinkBuilder.java
...-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+5
-9
CreateCDCSourceOperation.java
...in/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
+13
-11
No files found.
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/hudi/HudiSinkBuilder.java
View file @
d712e0a7
...
...
@@ -32,7 +32,7 @@ import com.dlink.model.Table;
*/
public
class
HudiSinkBuilder
extends
AbstractSinkBuilder
implements
SinkBuilder
,
Serializable
{
private
final
static
String
KEY_WORD
=
"hudi"
;
private
final
static
String
KEY_WORD
=
"
datastream-
hudi"
;
private
static
final
long
serialVersionUID
=
5324199407472847422L
;
public
HudiSinkBuilder
()
{
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
d712e0a7
package
com
.
dlink
.
cdc
.
sql
;
import
com.dlink.model.*
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
...
...
@@ -12,10 +13,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.TimestampType
;
import
org.apache.flink.table.types.logical.*
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.RowKind
;
...
...
@@ -34,9 +32,6 @@ import com.dlink.cdc.AbstractSinkBuilder;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.SqlUtil
;
/**
...
...
@@ -120,10 +115,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List
<
String
>
columnNameList
)
{
String
sinkTableName
=
getSinkTableName
(
table
);
customTableEnvironment
.
createTemporaryView
(
table
.
getSchemaTableNameWithUnderline
(),
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
customTableEnvironment
.
executeSql
(
getFlinkDDL
(
table
,
sinkTableName
));
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
table
.
getCDCSqlInsert
(
sinkTableName
,
table
.
getSchemaTableNameWithUnderline
()
));
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
table
.
getCDCSqlInsert
(
sinkTableName
,
viewName
));
if
(
operations
.
size
()
>
0
)
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
...
...
dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java
View file @
d712e0a7
...
...
@@ -71,6 +71,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
Driver
driver
=
Driver
.
build
(
driverConfig
);
final
List
<
Table
>
tables
=
driver
.
listTables
(
schemaName
);
for
(
Table
table
:
tables
)
{
if
(!
Asserts
.
isEquals
(
table
.
getType
(),
"VIEW"
))
{
if
(
Asserts
.
isNotNullCollection
(
tableRegList
))
{
for
(
String
tableReg
:
tableRegList
)
{
if
(
table
.
getSchemaTableName
().
matches
(
tableReg
)
&&
!
schema
.
getTables
().
contains
(
Table
.
build
(
table
.
getName
())))
{
...
...
@@ -86,6 +87,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
schema
.
getTables
().
add
(
table
);
}
}
}
schemaList
.
add
(
schema
);
}
config
.
setSchemaTableNameList
(
schemaTableNameList
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment