Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
c0e9de42
Unverified
Commit
c0e9de42
authored
Apr 21, 2022
by
aiwenmo
Committed by
GitHub
Apr 21, 2022
Browse files
Options
Browse Files
Download
Plain Diff
[Optimization-439][client] Optimize CDCSource sync doris
[Optimization-439][client] Optimize CDCSource sync doris
parents
e27af06a
564b1547
Changes
19
Show whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
1062 additions
and
877 deletions
+1062
-877
AbstractCDCBuilder.java
...-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+1
-1
AbstractSinkBuilder.java
...1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
+204
-0
DorisSinkBuilder.java
...1/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
+22
-177
KafkaSinkBuilder.java
...1/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
+13
-1
AbstractCDCBuilder.java
...-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+1
-1
AbstractSinkBuilder.java
...1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
+204
-0
DorisSinkBuilder.java
...2/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
+22
-177
KafkaSinkBuilder.java
...2/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
+13
-1
pom.xml
dlink-client/dlink-client-1.13/pom.xml
+5
-58
AbstractCDCBuilder.java
...-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+1
-1
AbstractSinkBuilder.java
...1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
+204
-0
DorisSinkBuilder.java
...3/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
+22
-177
JdbcSinkBuilder.java
....13/src/main/java/com/dlink/cdc/jdbc/JdbcSinkBuilder.java
+86
-17
KafkaSinkBuilder.java
...3/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
+16
-1
AbstractCDCBuilder.java
...-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
+1
-1
AbstractSinkBuilder.java
...1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
+204
-0
DorisSinkBuilder.java
...4/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
+22
-177
JdbcSinkBuilder.java
....14/src/main/java/com/dlink/cdc/jdbc/JdbcSinkBuilder.java
+8
-86
KafkaSinkBuilder.java
...4/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
+13
-1
No files found.
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
View file @
c0e9de42
...
...
@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder {
List
<
String
>
tableList
=
getTableList
();
for
(
String
tableName
:
tableList
)
{
if
(
Asserts
.
isNotNullString
(
tableName
)
&&
tableName
.
contains
(
"."
))
{
String
[]
names
=
tableName
.
split
(
"."
);
String
[]
names
=
tableName
.
split
(
"
\\
."
);
if
(!
schemaList
.
contains
(
names
[
0
]))
{
schemaList
.
add
(
names
[
0
]);
}
...
...
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
View file @
c0e9de42
package
com
.
dlink
.
cdc
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.BooleanType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DoubleType
;
import
org.apache.flink.table.types.logical.FloatType
;
import
org.apache.flink.table.types.logical.IntType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.SmallIntType
;
import
org.apache.flink.table.types.logical.TinyIntType
;
import
org.apache.flink.table.types.logical.VarCharType
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.math.BigDecimal
;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* AbstractCDCBuilder
...
...
@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder {
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
protected
Properties
getProperties
()
{
Properties
properties
=
new
Properties
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
sink
.
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
return
properties
;
}
protected
SingleOutputStreamOperator
<
Map
>
deserialize
(
DataStreamSource
<
String
>
dataStreamSource
)
{
return
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
}
protected
SingleOutputStreamOperator
<
Map
>
shunt
(
SingleOutputStreamOperator
<
Map
>
mapOperator
,
Table
table
,
String
schemaFieldName
)
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
return
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
}
protected
DataStream
<
RowData
>
buildRowData
(
SingleOutputStreamOperator
<
Map
>
filterOperator
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
return
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
RowData
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"c"
:
GenericRowData
igenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
igenericRowData
.
setRowKind
(
RowKind
.
INSERT
);
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
igenericRowData
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
igenericRowData
);
break
;
case
"d"
:
GenericRowData
dgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
dgenericRowData
.
setRowKind
(
RowKind
.
DELETE
);
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
dgenericRowData
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
dgenericRowData
);
break
;
case
"u"
:
GenericRowData
ubgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
ubgenericRowData
.
setRowKind
(
RowKind
.
UPDATE_BEFORE
);
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubgenericRowData
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubgenericRowData
);
GenericRowData
uagenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
uagenericRowData
.
setRowKind
(
RowKind
.
UPDATE_AFTER
);
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uagenericRowData
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uagenericRowData
);
break
;
}
}
});
}
public
abstract
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
);
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
deserialize
(
dataStreamSource
);
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
SingleOutputStreamOperator
<
Map
>
filterOperator
=
shunt
(
mapOperator
,
table
,
schemaFieldName
);
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
DataStream
<
RowData
>
rowDataDataStream
=
buildRowData
(
filterOperator
,
columnNameList
,
columnTypeList
);
addSink
(
rowDataDataStream
,
table
.
getSchemaTableName
(),
columnNameList
,
columnTypeList
);
}
}
}
return
dataStreamSource
;
}
protected
void
buildColumn
(
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
,
List
<
Column
>
columns
)
{
for
(
Column
column
:
columns
)
{
columnNameList
.
add
(
column
.
getName
());
columnTypeList
.
add
(
getLogicalType
(
column
.
getJavaType
()));
}
}
protected
LogicalType
getLogicalType
(
ColumnType
columnType
)
{
switch
(
columnType
)
{
case
STRING:
return
new
VarCharType
();
case
BOOLEAN:
case
JAVA_LANG_BOOLEAN:
return
new
BooleanType
();
case
BYTE:
case
JAVA_LANG_BYTE:
return
new
TinyIntType
();
case
SHORT:
case
JAVA_LANG_SHORT:
return
new
SmallIntType
();
case
LONG:
case
JAVA_LANG_LONG:
return
new
BigIntType
();
case
FLOAT:
case
JAVA_LANG_FLOAT:
return
new
FloatType
();
case
DOUBLE:
case
JAVA_LANG_DOUBLE:
return
new
DoubleType
();
case
DECIMAL:
return
new
DecimalType
();
case
INT:
case
INTEGER:
return
new
IntType
();
default
:
return
new
VarCharType
();
}
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
logicalType
instanceof
VarCharType
)
{
return
StringData
.
fromString
((
String
)
value
);
}
else
if
(
logicalType
instanceof
DecimalType
)
{
final
DecimalType
decimalType
=
((
DecimalType
)
logicalType
);
final
int
precision
=
decimalType
.
getPrecision
();
final
int
scala
=
decimalType
.
getScale
();
return
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
String
)
value
),
precision
,
scala
);
}
else
{
return
value
;
}
}
}
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
View file @
c0e9de42
...
...
@@ -4,49 +4,17 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import
org.apache.doris.flink.cfg.DorisOptions
;
import
org.apache.doris.flink.cfg.DorisReadOptions
;
import
org.apache.doris.flink.cfg.DorisSink
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.BooleanType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DoubleType
;
import
org.apache.flink.table.types.logical.FloatType
;
import
org.apache.flink.table.types.logical.IntType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.SmallIntType
;
import
org.apache.flink.table.types.logical.TinyIntType
;
import
org.apache.flink.table.types.logical.VarCharType
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.io.Serializable
;
import
java.math.BigDecimal
;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* DorisSinkBuilder
...
...
@@ -77,15 +45,14 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
Map
<
String
,
String
>
sink
=
config
.
getSink
();
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
sink
.
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
DorisExecutionOptions
.
Builder
dorisExecutionOptionsBuilder
=
DorisExecutionOptions
.
builder
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
if
(
sink
.
containsKey
(
"sink.batch.size"
))
{
dorisExecutionOptionsBuilder
.
setBatchSize
(
Integer
.
valueOf
(
sink
.
get
(
"sink.batch.size"
)));
}
...
...
@@ -98,77 +65,11 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
if
(
sink
.
containsKey
(
"sink.enable-delete"
))
{
dorisExecutionOptionsBuilder
.
setEnableDelete
(
Boolean
.
valueOf
(
sink
.
get
(
"sink.enable-delete"
)));
}
dorisExecutionOptionsBuilder
.
setStreamLoadProp
(
properties
);
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
final
String
schemaTableName
=
table
.
getSchemaTableName
();
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
dorisExecutionOptionsBuilder
.
setStreamLoadProp
(
getProperties
());
final
String
[]
columnNames
=
columnNameList
.
toArray
(
new
String
[
columnNameList
.
size
()]);
final
LogicalType
[]
columnTypes
=
columnTypeList
.
toArray
(
new
LogicalType
[
columnTypeList
.
size
()]);
SingleOutputStreamOperator
<
Map
>
filterOperator
=
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
DataStream
<
RowData
>
rowDataDataStream
=
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
RowData
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"r"
:
GenericRowData
igenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
igenericRowData
.
setRowKind
(
RowKind
.
INSERT
);
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
igenericRowData
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
igenericRowData
);
break
;
case
"d"
:
GenericRowData
dgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
dgenericRowData
.
setRowKind
(
RowKind
.
DELETE
);
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
dgenericRowData
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
dgenericRowData
);
break
;
case
"u"
:
GenericRowData
ubgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
ubgenericRowData
.
setRowKind
(
RowKind
.
UPDATE_BEFORE
);
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubgenericRowData
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubgenericRowData
);
GenericRowData
uagenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
uagenericRowData
.
setRowKind
(
RowKind
.
UPDATE_AFTER
);
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uagenericRowData
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uagenericRowData
);
break
;
}
}
});
rowDataDataStream
.
addSink
(
DorisSink
.
sink
(
columnNames
,
...
...
@@ -182,60 +83,4 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.
setPassword
(
config
.
getSink
().
get
(
"password"
)).
build
()
));
}
}
}
return
dataStreamSource
;
}
private
void
buildColumn
(
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
,
List
<
Column
>
columns
)
{
for
(
Column
column
:
columns
)
{
columnNameList
.
add
(
column
.
getName
());
columnTypeList
.
add
(
getLogicalType
(
column
.
getJavaType
()));
}
}
private
LogicalType
getLogicalType
(
ColumnType
columnType
)
{
switch
(
columnType
)
{
case
STRING:
return
new
VarCharType
();
case
BOOLEAN:
case
JAVA_LANG_BOOLEAN:
return
new
BooleanType
();
case
BYTE:
case
JAVA_LANG_BYTE:
return
new
TinyIntType
();
case
SHORT:
case
JAVA_LANG_SHORT:
return
new
SmallIntType
();
case
LONG:
case
JAVA_LANG_LONG:
return
new
BigIntType
();
case
FLOAT:
case
JAVA_LANG_FLOAT:
return
new
FloatType
();
case
DOUBLE:
case
JAVA_LANG_DOUBLE:
return
new
DoubleType
();
case
DECIMAL:
return
new
DecimalType
();
case
INT:
case
INTEGER:
return
new
IntType
();
default
:
return
new
VarCharType
();
}
}
private
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
logicalType
instanceof
VarCharType
)
{
return
StringData
.
fromString
((
String
)
value
);
}
else
if
(
logicalType
instanceof
DecimalType
)
{
final
DecimalType
decimalType
=
((
DecimalType
)
logicalType
);
final
int
precision
=
decimalType
.
getPrecision
();
final
int
scala
=
decimalType
.
getScale
();
return
DecimalData
.
fromBigDecimal
((
BigDecimal
)
value
,
precision
,
scala
);
}
else
{
return
value
;
}
}
}
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
View file @
c0e9de42
...
...
@@ -4,10 +4,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
...
...
@@ -39,6 +42,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
super
(
config
);
}
@Override
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
...
...
@@ -50,7 +58,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
if
(
Asserts
.
isNotNullString
(
config
.
getSink
().
get
(
"topic"
)))
{
dataStreamSource
.
addSink
(
new
FlinkKafkaProducer
<
String
>(
config
.
getSink
().
get
(
"brokers"
),
config
.
getSink
().
get
(
"topic"
),
...
...
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
View file @
c0e9de42
...
...
@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder {
List
<
String
>
tableList
=
getTableList
();
for
(
String
tableName
:
tableList
)
{
if
(
Asserts
.
isNotNullString
(
tableName
)
&&
tableName
.
contains
(
"."
))
{
String
[]
names
=
tableName
.
split
(
"."
);
String
[]
names
=
tableName
.
split
(
"
\\
."
);
if
(!
schemaList
.
contains
(
names
[
0
]))
{
schemaList
.
add
(
names
[
0
]);
}
...
...
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
View file @
c0e9de42
package
com
.
dlink
.
cdc
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.BooleanType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DoubleType
;
import
org.apache.flink.table.types.logical.FloatType
;
import
org.apache.flink.table.types.logical.IntType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.SmallIntType
;
import
org.apache.flink.table.types.logical.TinyIntType
;
import
org.apache.flink.table.types.logical.VarCharType
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.math.BigDecimal
;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* AbstractCDCBuilder
...
...
@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder {
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
protected
Properties
getProperties
()
{
Properties
properties
=
new
Properties
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
sink
.
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
return
properties
;
}
protected
SingleOutputStreamOperator
<
Map
>
deserialize
(
DataStreamSource
<
String
>
dataStreamSource
)
{
return
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
}
protected
SingleOutputStreamOperator
<
Map
>
shunt
(
SingleOutputStreamOperator
<
Map
>
mapOperator
,
Table
table
,
String
schemaFieldName
)
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
return
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
}
protected
DataStream
<
RowData
>
buildRowData
(
SingleOutputStreamOperator
<
Map
>
filterOperator
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
return
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
RowData
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"c"
:
GenericRowData
igenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
igenericRowData
.
setRowKind
(
RowKind
.
INSERT
);
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
igenericRowData
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
igenericRowData
);
break
;
case
"d"
:
GenericRowData
dgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
dgenericRowData
.
setRowKind
(
RowKind
.
DELETE
);
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
dgenericRowData
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
dgenericRowData
);
break
;
case
"u"
:
GenericRowData
ubgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
ubgenericRowData
.
setRowKind
(
RowKind
.
UPDATE_BEFORE
);
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubgenericRowData
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubgenericRowData
);
GenericRowData
uagenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
uagenericRowData
.
setRowKind
(
RowKind
.
UPDATE_AFTER
);
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uagenericRowData
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uagenericRowData
);
break
;
}
}
});
}
public
abstract
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
);
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
deserialize
(
dataStreamSource
);
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
SingleOutputStreamOperator
<
Map
>
filterOperator
=
shunt
(
mapOperator
,
table
,
schemaFieldName
);
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
DataStream
<
RowData
>
rowDataDataStream
=
buildRowData
(
filterOperator
,
columnNameList
,
columnTypeList
);
addSink
(
rowDataDataStream
,
table
.
getSchemaTableName
(),
columnNameList
,
columnTypeList
);
}
}
}
return
dataStreamSource
;
}
protected
void
buildColumn
(
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
,
List
<
Column
>
columns
)
{
for
(
Column
column
:
columns
)
{
columnNameList
.
add
(
column
.
getName
());
columnTypeList
.
add
(
getLogicalType
(
column
.
getJavaType
()));
}
}
protected
LogicalType
getLogicalType
(
ColumnType
columnType
)
{
switch
(
columnType
)
{
case
STRING:
return
new
VarCharType
();
case
BOOLEAN:
case
JAVA_LANG_BOOLEAN:
return
new
BooleanType
();
case
BYTE:
case
JAVA_LANG_BYTE:
return
new
TinyIntType
();
case
SHORT:
case
JAVA_LANG_SHORT:
return
new
SmallIntType
();
case
LONG:
case
JAVA_LANG_LONG:
return
new
BigIntType
();
case
FLOAT:
case
JAVA_LANG_FLOAT:
return
new
FloatType
();
case
DOUBLE:
case
JAVA_LANG_DOUBLE:
return
new
DoubleType
();
case
DECIMAL:
return
new
DecimalType
();
case
INT:
case
INTEGER:
return
new
IntType
();
default
:
return
new
VarCharType
();
}
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
logicalType
instanceof
VarCharType
)
{
return
StringData
.
fromString
((
String
)
value
);
}
else
if
(
logicalType
instanceof
DecimalType
)
{
final
DecimalType
decimalType
=
((
DecimalType
)
logicalType
);
final
int
precision
=
decimalType
.
getPrecision
();
final
int
scala
=
decimalType
.
getScale
();
return
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
String
)
value
),
precision
,
scala
);
}
else
{
return
value
;
}
}
}
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
View file @
c0e9de42
...
...
@@ -4,49 +4,17 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import
org.apache.doris.flink.cfg.DorisOptions
;
import
org.apache.doris.flink.cfg.DorisReadOptions
;
import
org.apache.doris.flink.cfg.DorisSink
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.BooleanType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DoubleType
;
import
org.apache.flink.table.types.logical.FloatType
;
import
org.apache.flink.table.types.logical.IntType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.SmallIntType
;
import
org.apache.flink.table.types.logical.TinyIntType
;
import
org.apache.flink.table.types.logical.VarCharType
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.io.Serializable
;
import
java.math.BigDecimal
;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* DorisSinkBuilder
...
...
@@ -77,15 +45,14 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
Map
<
String
,
String
>
sink
=
config
.
getSink
();
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
sink
.
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
DorisExecutionOptions
.
Builder
dorisExecutionOptionsBuilder
=
DorisExecutionOptions
.
builder
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
if
(
sink
.
containsKey
(
"sink.batch.size"
))
{
dorisExecutionOptionsBuilder
.
setBatchSize
(
Integer
.
valueOf
(
sink
.
get
(
"sink.batch.size"
)));
}
...
...
@@ -98,77 +65,11 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
if
(
sink
.
containsKey
(
"sink.enable-delete"
))
{
dorisExecutionOptionsBuilder
.
setEnableDelete
(
Boolean
.
valueOf
(
sink
.
get
(
"sink.enable-delete"
)));
}
dorisExecutionOptionsBuilder
.
setStreamLoadProp
(
properties
);
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
final
String
schemaTableName
=
table
.
getSchemaTableName
();
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
dorisExecutionOptionsBuilder
.
setStreamLoadProp
(
getProperties
());
final
String
[]
columnNames
=
columnNameList
.
toArray
(
new
String
[
columnNameList
.
size
()]);
final
LogicalType
[]
columnTypes
=
columnTypeList
.
toArray
(
new
LogicalType
[
columnTypeList
.
size
()]);
SingleOutputStreamOperator
<
Map
>
filterOperator
=
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
DataStream
<
RowData
>
rowDataDataStream
=
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
RowData
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"r"
:
GenericRowData
igenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
igenericRowData
.
setRowKind
(
RowKind
.
INSERT
);
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
igenericRowData
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
igenericRowData
);
break
;
case
"d"
:
GenericRowData
dgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
dgenericRowData
.
setRowKind
(
RowKind
.
DELETE
);
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
dgenericRowData
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
dgenericRowData
);
break
;
case
"u"
:
GenericRowData
ubgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
ubgenericRowData
.
setRowKind
(
RowKind
.
UPDATE_BEFORE
);
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubgenericRowData
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubgenericRowData
);
GenericRowData
uagenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
uagenericRowData
.
setRowKind
(
RowKind
.
UPDATE_AFTER
);
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uagenericRowData
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uagenericRowData
);
break
;
}
}
});
rowDataDataStream
.
addSink
(
DorisSink
.
sink
(
columnNames
,
...
...
@@ -182,60 +83,4 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.
setPassword
(
config
.
getSink
().
get
(
"password"
)).
build
()
));
}
}
}
return
dataStreamSource
;
}
private
void
buildColumn
(
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
,
List
<
Column
>
columns
)
{
for
(
Column
column
:
columns
)
{
columnNameList
.
add
(
column
.
getName
());
columnTypeList
.
add
(
getLogicalType
(
column
.
getJavaType
()));
}
}
private
LogicalType
getLogicalType
(
ColumnType
columnType
)
{
switch
(
columnType
)
{
case
STRING:
return
new
VarCharType
();
case
BOOLEAN:
case
JAVA_LANG_BOOLEAN:
return
new
BooleanType
();
case
BYTE:
case
JAVA_LANG_BYTE:
return
new
TinyIntType
();
case
SHORT:
case
JAVA_LANG_SHORT:
return
new
SmallIntType
();
case
LONG:
case
JAVA_LANG_LONG:
return
new
BigIntType
();
case
FLOAT:
case
JAVA_LANG_FLOAT:
return
new
FloatType
();
case
DOUBLE:
case
JAVA_LANG_DOUBLE:
return
new
DoubleType
();
case
DECIMAL:
return
new
DecimalType
();
case
INT:
case
INTEGER:
return
new
IntType
();
default
:
return
new
VarCharType
();
}
}
private
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
logicalType
instanceof
VarCharType
)
{
return
StringData
.
fromString
((
String
)
value
);
}
else
if
(
logicalType
instanceof
DecimalType
)
{
final
DecimalType
decimalType
=
((
DecimalType
)
logicalType
);
final
int
precision
=
decimalType
.
getPrecision
();
final
int
scala
=
decimalType
.
getScale
();
return
DecimalData
.
fromBigDecimal
((
BigDecimal
)
value
,
precision
,
scala
);
}
else
{
return
value
;
}
}
}
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
View file @
c0e9de42
...
...
@@ -4,10 +4,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
...
...
@@ -39,6 +42,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
super
(
config
);
}
@Override
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
...
...
@@ -50,7 +58,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
if
(
Asserts
.
isNotNullString
(
config
.
getSink
().
get
(
"topic"
)))
{
dataStreamSource
.
addSink
(
new
FlinkKafkaProducer
<
String
>(
config
.
getSink
().
get
(
"brokers"
),
config
.
getSink
().
get
(
"topic"
),
...
...
dlink-client/dlink-client-1.13/pom.xml
View file @
c0e9de42
...
...
@@ -32,63 +32,10 @@
<artifactId>
dlink-flink-1.13
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-connector-jdbc-1.13
</artifactId>
<scope>
provided
</scope>
</dependency>
</dependencies>
<!--<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!–打jar包–>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>uber</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>-->
</project>
\ No newline at end of file
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
View file @
c0e9de42
...
...
@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder {
List
<
String
>
tableList
=
getTableList
();
for
(
String
tableName
:
tableList
)
{
if
(
Asserts
.
isNotNullString
(
tableName
)
&&
tableName
.
contains
(
"."
))
{
String
[]
names
=
tableName
.
split
(
"."
);
String
[]
names
=
tableName
.
split
(
"
\\
."
);
if
(!
schemaList
.
contains
(
names
[
0
]))
{
schemaList
.
add
(
names
[
0
]);
}
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
View file @
c0e9de42
package
com
.
dlink
.
cdc
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.BooleanType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DoubleType
;
import
org.apache.flink.table.types.logical.FloatType
;
import
org.apache.flink.table.types.logical.IntType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.SmallIntType
;
import
org.apache.flink.table.types.logical.TinyIntType
;
import
org.apache.flink.table.types.logical.VarCharType
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.math.BigDecimal
;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* AbstractCDCBuilder
...
...
@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder {
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
protected
Properties
getProperties
()
{
Properties
properties
=
new
Properties
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
sink
.
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
return
properties
;
}
protected
SingleOutputStreamOperator
<
Map
>
deserialize
(
DataStreamSource
<
String
>
dataStreamSource
)
{
return
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
}
protected
SingleOutputStreamOperator
<
Map
>
shunt
(
SingleOutputStreamOperator
<
Map
>
mapOperator
,
Table
table
,
String
schemaFieldName
)
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
return
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
}
protected
DataStream
<
RowData
>
buildRowData
(
SingleOutputStreamOperator
<
Map
>
filterOperator
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
return
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
RowData
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"c"
:
GenericRowData
igenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
igenericRowData
.
setRowKind
(
RowKind
.
INSERT
);
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
igenericRowData
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
igenericRowData
);
break
;
case
"d"
:
GenericRowData
dgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
dgenericRowData
.
setRowKind
(
RowKind
.
DELETE
);
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
dgenericRowData
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
dgenericRowData
);
break
;
case
"u"
:
GenericRowData
ubgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
ubgenericRowData
.
setRowKind
(
RowKind
.
UPDATE_BEFORE
);
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubgenericRowData
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubgenericRowData
);
GenericRowData
uagenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
uagenericRowData
.
setRowKind
(
RowKind
.
UPDATE_AFTER
);
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uagenericRowData
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uagenericRowData
);
break
;
}
}
});
}
public
abstract
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
);
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
deserialize
(
dataStreamSource
);
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
SingleOutputStreamOperator
<
Map
>
filterOperator
=
shunt
(
mapOperator
,
table
,
schemaFieldName
);
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
DataStream
<
RowData
>
rowDataDataStream
=
buildRowData
(
filterOperator
,
columnNameList
,
columnTypeList
);
addSink
(
rowDataDataStream
,
table
.
getSchemaTableName
(),
columnNameList
,
columnTypeList
);
}
}
}
return
dataStreamSource
;
}
protected
void
buildColumn
(
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
,
List
<
Column
>
columns
)
{
for
(
Column
column
:
columns
)
{
columnNameList
.
add
(
column
.
getName
());
columnTypeList
.
add
(
getLogicalType
(
column
.
getJavaType
()));
}
}
protected
LogicalType
getLogicalType
(
ColumnType
columnType
)
{
switch
(
columnType
)
{
case
STRING:
return
new
VarCharType
();
case
BOOLEAN:
case
JAVA_LANG_BOOLEAN:
return
new
BooleanType
();
case
BYTE:
case
JAVA_LANG_BYTE:
return
new
TinyIntType
();
case
SHORT:
case
JAVA_LANG_SHORT:
return
new
SmallIntType
();
case
LONG:
case
JAVA_LANG_LONG:
return
new
BigIntType
();
case
FLOAT:
case
JAVA_LANG_FLOAT:
return
new
FloatType
();
case
DOUBLE:
case
JAVA_LANG_DOUBLE:
return
new
DoubleType
();
case
DECIMAL:
return
new
DecimalType
();
case
INT:
case
INTEGER:
return
new
IntType
();
default
:
return
new
VarCharType
();
}
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
logicalType
instanceof
VarCharType
)
{
return
StringData
.
fromString
((
String
)
value
);
}
else
if
(
logicalType
instanceof
DecimalType
)
{
final
DecimalType
decimalType
=
((
DecimalType
)
logicalType
);
final
int
precision
=
decimalType
.
getPrecision
();
final
int
scala
=
decimalType
.
getScale
();
return
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
String
)
value
),
precision
,
scala
);
}
else
{
return
value
;
}
}
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
View file @
c0e9de42
...
...
@@ -4,49 +4,17 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import
org.apache.doris.flink.cfg.DorisOptions
;
import
org.apache.doris.flink.cfg.DorisReadOptions
;
import
org.apache.doris.flink.cfg.DorisSink
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.BooleanType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DoubleType
;
import
org.apache.flink.table.types.logical.FloatType
;
import
org.apache.flink.table.types.logical.IntType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.SmallIntType
;
import
org.apache.flink.table.types.logical.TinyIntType
;
import
org.apache.flink.table.types.logical.VarCharType
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.io.Serializable
;
import
java.math.BigDecimal
;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* DorisSinkBuilder
...
...
@@ -77,15 +45,14 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
Map
<
String
,
String
>
sink
=
config
.
getSink
();
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
sink
.
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
DorisExecutionOptions
.
Builder
dorisExecutionOptionsBuilder
=
DorisExecutionOptions
.
builder
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
if
(
sink
.
containsKey
(
"sink.batch.size"
))
{
dorisExecutionOptionsBuilder
.
setBatchSize
(
Integer
.
valueOf
(
sink
.
get
(
"sink.batch.size"
)));
}
...
...
@@ -98,77 +65,11 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
if
(
sink
.
containsKey
(
"sink.enable-delete"
))
{
dorisExecutionOptionsBuilder
.
setEnableDelete
(
Boolean
.
valueOf
(
sink
.
get
(
"sink.enable-delete"
)));
}
dorisExecutionOptionsBuilder
.
setStreamLoadProp
(
properties
);
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
final
String
schemaTableName
=
table
.
getSchemaTableName
();
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
dorisExecutionOptionsBuilder
.
setStreamLoadProp
(
getProperties
());
final
String
[]
columnNames
=
columnNameList
.
toArray
(
new
String
[
columnNameList
.
size
()]);
final
LogicalType
[]
columnTypes
=
columnTypeList
.
toArray
(
new
LogicalType
[
columnTypeList
.
size
()]);
SingleOutputStreamOperator
<
Map
>
filterOperator
=
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
DataStream
<
RowData
>
rowDataDataStream
=
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
RowData
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"r"
:
GenericRowData
igenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
igenericRowData
.
setRowKind
(
RowKind
.
INSERT
);
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
igenericRowData
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
igenericRowData
);
break
;
case
"d"
:
GenericRowData
dgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
dgenericRowData
.
setRowKind
(
RowKind
.
DELETE
);
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
dgenericRowData
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
dgenericRowData
);
break
;
case
"u"
:
GenericRowData
ubgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
ubgenericRowData
.
setRowKind
(
RowKind
.
UPDATE_BEFORE
);
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubgenericRowData
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubgenericRowData
);
GenericRowData
uagenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
uagenericRowData
.
setRowKind
(
RowKind
.
UPDATE_AFTER
);
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uagenericRowData
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uagenericRowData
);
break
;
}
}
});
rowDataDataStream
.
addSink
(
DorisSink
.
sink
(
columnNames
,
...
...
@@ -182,60 +83,4 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.
setPassword
(
config
.
getSink
().
get
(
"password"
)).
build
()
));
}
}
}
return
dataStreamSource
;
}
private
void
buildColumn
(
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
,
List
<
Column
>
columns
)
{
for
(
Column
column
:
columns
)
{
columnNameList
.
add
(
column
.
getName
());
columnTypeList
.
add
(
getLogicalType
(
column
.
getJavaType
()));
}
}
private
LogicalType
getLogicalType
(
ColumnType
columnType
)
{
switch
(
columnType
)
{
case
STRING:
return
new
VarCharType
();
case
BOOLEAN:
case
JAVA_LANG_BOOLEAN:
return
new
BooleanType
();
case
BYTE:
case
JAVA_LANG_BYTE:
return
new
TinyIntType
();
case
SHORT:
case
JAVA_LANG_SHORT:
return
new
SmallIntType
();
case
LONG:
case
JAVA_LANG_LONG:
return
new
BigIntType
();
case
FLOAT:
case
JAVA_LANG_FLOAT:
return
new
FloatType
();
case
DOUBLE:
case
JAVA_LANG_DOUBLE:
return
new
DoubleType
();
case
DECIMAL:
return
new
DecimalType
();
case
INT:
case
INTEGER:
return
new
IntType
();
default
:
return
new
VarCharType
();
}
}
private
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
logicalType
instanceof
VarCharType
)
{
return
StringData
.
fromString
((
String
)
value
);
}
else
if
(
logicalType
instanceof
DecimalType
)
{
final
DecimalType
decimalType
=
((
DecimalType
)
logicalType
);
final
int
precision
=
decimalType
.
getPrecision
();
final
int
scala
=
decimalType
.
getScale
();
return
DecimalData
.
fromBigDecimal
((
BigDecimal
)
value
,
precision
,
scala
);
}
else
{
return
value
;
}
}
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/jdbc/JdbcSinkBuilder.java
View file @
c0e9de42
package
com
.
dlink
.
cdc
.
jdbc
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.connector.jdbc.JdbcConnectionOptions
;
import
org.apache.flink.connector.jdbc.JdbcExecutionOptions
;
import
org.apache.flink.connector.jdbc.JdbcSink
;
import
org.apache.flink.connector.jdbc.dialect.ClickHouseDialect
;
import
org.apache.flink.connector.jdbc.dialect.MySQLDialect
;
import
org.apache.flink.connector.jdbc.dialect.OracleDialect
;
import
org.apache.flink.connector.jdbc.dialect.PostgresDialect
;
import
org.apache.flink.connector.jdbc.dialect.SQLServerDialect
;
import
org.apache.flink.connector.jdbc.internal.options.JdbcOptions
;
import
org.apache.flink.connector.jdbc.table.JdbcUpsertTableSink
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
org.apache.flink.table.api.StatementSet
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.TableSchema
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* MysqlCDCBuilder
*
...
...
@@ -54,6 +58,71 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
return
new
JdbcSinkBuilder
(
config
);
}
@Override
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
JdbcUpsertTableSink
.
Builder
builder
=
JdbcUpsertTableSink
.
builder
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
if
(
sink
.
containsKey
(
"sink.buffer-flush.interval"
))
{
builder
.
setFlushIntervalMills
(
Integer
.
valueOf
(
sink
.
get
(
"sink.buffer-flush.interval"
)));
}
if
(
sink
.
containsKey
(
"sink.buffer-flush.max-rows"
))
{
builder
.
setFlushMaxSize
(
Integer
.
valueOf
(
sink
.
get
(
"sink.buffer-flush.max-rows"
)));
}
if
(
sink
.
containsKey
(
"sink.max-retries"
))
{
builder
.
setMaxRetryTimes
(
Integer
.
valueOf
(
sink
.
get
(
"sink.max-retries"
)));
}
JdbcOptions
.
Builder
jdbcOptionsBuilder
=
JdbcOptions
.
builder
();
if
(
sink
.
containsKey
(
"connection.max-retry-timeout"
))
{
jdbcOptionsBuilder
.
setConnectionCheckTimeoutSeconds
(
Integer
.
valueOf
(
sink
.
get
(
"connection.max-retry-timeout"
)));
}
if
(
sink
.
containsKey
(
"url"
))
{
jdbcOptionsBuilder
.
setDBUrl
(
sink
.
get
(
"url"
));
}
if
(
sink
.
containsKey
(
"dialect"
))
{
switch
(
sink
.
get
(
"dialect"
))
{
case
"MySql"
:
jdbcOptionsBuilder
.
setDialect
(
new
MySQLDialect
());
break
;
case
"Oracle"
:
jdbcOptionsBuilder
.
setDialect
(
new
OracleDialect
());
break
;
case
"ClickHouse"
:
jdbcOptionsBuilder
.
setDialect
(
new
ClickHouseDialect
());
break
;
case
"SQLServer"
:
jdbcOptionsBuilder
.
setDialect
(
new
SQLServerDialect
());
break
;
case
"Postgres"
:
jdbcOptionsBuilder
.
setDialect
(
new
PostgresDialect
());
break
;
}
}
if
(
sink
.
containsKey
(
"driver"
))
{
jdbcOptionsBuilder
.
setDriverName
(
sink
.
get
(
"driver"
));
}
if
(
sink
.
containsKey
(
"sink.parallelism"
))
{
jdbcOptionsBuilder
.
setParallelism
(
Integer
.
valueOf
(
sink
.
get
(
"sink.parallelism"
)));
}
if
(
sink
.
containsKey
(
"password"
))
{
jdbcOptionsBuilder
.
setPassword
(
sink
.
get
(
"password"
));
}
if
(
sink
.
containsKey
(
"username"
))
{
jdbcOptionsBuilder
.
setUsername
(
sink
.
get
(
"username"
));
}
jdbcOptionsBuilder
.
setTableName
(
schemaTableName
);
builder
.
setOptions
(
jdbcOptionsBuilder
.
build
());
builder
.
setTableSchema
(
TableSchema
.
fromTypeInfo
(
rowDataDataStream
.
getType
()));
/*JdbcUpsertTableSink build = builder.build();
build.consumeDataStream(rowDataDataStream);
rowDataDataStream.addSink(build.);*/
}
/*@Override
public DataStreamSource build(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
...
...
@@ -111,7 +180,7 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
item
:
schema
.
getTables
())
{
customTableEnvironment
.
executeSql
(
item
.
getFlinkTableSql
(
sb
.
toString
()
+
"'table-name' = '"
+
item
.
getSchemaTableName
()+
"'\n"
));
customTableEnvironment
.
executeSql
(
item
.
getFlinkTableSql
(
sb
.
toString
()
+
"'table-name' = '"
+
item
.
getSchemaTableName
()
+
"'\n"
));
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcBuilder
.
getInsertSQL
(
item
,
TABLE_NAME
));
if
(
operations
.
size
()
>
0
)
{
Operation
operation
=
operations
.
get
(
0
);
...
...
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
View file @
c0e9de42
...
...
@@ -4,10 +4,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
...
...
@@ -50,7 +53,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
if
(
Asserts
.
isNotNullString
(
config
.
getSink
().
get
(
"topic"
)))
{
dataStreamSource
.
addSink
(
new
FlinkKafkaProducer
<
String
>(
config
.
getSink
().
get
(
"brokers"
),
config
.
getSink
().
get
(
"topic"
),
...
...
@@ -94,4 +101,12 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
return
dataStreamSource
;
}
@Override
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractCDCBuilder.java
View file @
c0e9de42
...
...
@@ -44,7 +44,7 @@ public abstract class AbstractCDCBuilder {
List
<
String
>
tableList
=
getTableList
();
for
(
String
tableName
:
tableList
)
{
if
(
Asserts
.
isNotNullString
(
tableName
)
&&
tableName
.
contains
(
"."
))
{
String
[]
names
=
tableName
.
split
(
"."
);
String
[]
names
=
tableName
.
split
(
"
\\
."
);
if
(!
schemaList
.
contains
(
names
[
0
]))
{
schemaList
.
add
(
names
[
0
]);
}
...
...
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java
View file @
c0e9de42
package
com
.
dlink
.
cdc
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.BooleanType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DoubleType
;
import
org.apache.flink.table.types.logical.FloatType
;
import
org.apache.flink.table.types.logical.IntType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.SmallIntType
;
import
org.apache.flink.table.types.logical.TinyIntType
;
import
org.apache.flink.table.types.logical.VarCharType
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.math.BigDecimal
;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* AbstractCDCBuilder
...
...
@@ -26,4 +64,170 @@ public abstract class AbstractSinkBuilder {
public
void
setConfig
(
FlinkCDCConfig
config
)
{
this
.
config
=
config
;
}
protected
Properties
getProperties
()
{
Properties
properties
=
new
Properties
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
sink
.
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
return
properties
;
}
protected
SingleOutputStreamOperator
<
Map
>
deserialize
(
DataStreamSource
<
String
>
dataStreamSource
)
{
return
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
}
protected
SingleOutputStreamOperator
<
Map
>
shunt
(
SingleOutputStreamOperator
<
Map
>
mapOperator
,
Table
table
,
String
schemaFieldName
)
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
return
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
}
protected
DataStream
<
RowData
>
buildRowData
(
SingleOutputStreamOperator
<
Map
>
filterOperator
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
return
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
RowData
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"c"
:
GenericRowData
igenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
igenericRowData
.
setRowKind
(
RowKind
.
INSERT
);
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
igenericRowData
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
igenericRowData
);
break
;
case
"d"
:
GenericRowData
dgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
dgenericRowData
.
setRowKind
(
RowKind
.
DELETE
);
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
dgenericRowData
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
dgenericRowData
);
break
;
case
"u"
:
GenericRowData
ubgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
ubgenericRowData
.
setRowKind
(
RowKind
.
UPDATE_BEFORE
);
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubgenericRowData
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubgenericRowData
);
GenericRowData
uagenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
uagenericRowData
.
setRowKind
(
RowKind
.
UPDATE_AFTER
);
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uagenericRowData
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uagenericRowData
);
break
;
}
}
});
}
public
abstract
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
);
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
deserialize
(
dataStreamSource
);
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
SingleOutputStreamOperator
<
Map
>
filterOperator
=
shunt
(
mapOperator
,
table
,
schemaFieldName
);
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
DataStream
<
RowData
>
rowDataDataStream
=
buildRowData
(
filterOperator
,
columnNameList
,
columnTypeList
);
addSink
(
rowDataDataStream
,
table
.
getSchemaTableName
(),
columnNameList
,
columnTypeList
);
}
}
}
return
dataStreamSource
;
}
protected
void
buildColumn
(
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
,
List
<
Column
>
columns
)
{
for
(
Column
column
:
columns
)
{
columnNameList
.
add
(
column
.
getName
());
columnTypeList
.
add
(
getLogicalType
(
column
.
getJavaType
()));
}
}
protected
LogicalType
getLogicalType
(
ColumnType
columnType
)
{
switch
(
columnType
)
{
case
STRING:
return
new
VarCharType
();
case
BOOLEAN:
case
JAVA_LANG_BOOLEAN:
return
new
BooleanType
();
case
BYTE:
case
JAVA_LANG_BYTE:
return
new
TinyIntType
();
case
SHORT:
case
JAVA_LANG_SHORT:
return
new
SmallIntType
();
case
LONG:
case
JAVA_LANG_LONG:
return
new
BigIntType
();
case
FLOAT:
case
JAVA_LANG_FLOAT:
return
new
FloatType
();
case
DOUBLE:
case
JAVA_LANG_DOUBLE:
return
new
DoubleType
();
case
DECIMAL:
return
new
DecimalType
();
case
INT:
case
INTEGER:
return
new
IntType
();
default
:
return
new
VarCharType
();
}
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
logicalType
instanceof
VarCharType
)
{
return
StringData
.
fromString
((
String
)
value
);
}
else
if
(
logicalType
instanceof
DecimalType
)
{
final
DecimalType
decimalType
=
((
DecimalType
)
logicalType
);
final
int
precision
=
decimalType
.
getPrecision
();
final
int
scala
=
decimalType
.
getScale
();
return
DecimalData
.
fromBigDecimal
(
new
BigDecimal
((
String
)
value
),
precision
,
scala
);
}
else
{
return
value
;
}
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/doris/DorisSinkBuilder.java
View file @
c0e9de42
...
...
@@ -4,49 +4,17 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import
org.apache.doris.flink.cfg.DorisOptions
;
import
org.apache.doris.flink.cfg.DorisReadOptions
;
import
org.apache.doris.flink.cfg.DorisSink
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.DecimalData
;
import
org.apache.flink.table.data.GenericRowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.StringData
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.BooleanType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.DoubleType
;
import
org.apache.flink.table.types.logical.FloatType
;
import
org.apache.flink.table.types.logical.IntType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.SmallIntType
;
import
org.apache.flink.table.types.logical.TinyIntType
;
import
org.apache.flink.table.types.logical.VarCharType
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.util.Collector
;
import
java.io.Serializable
;
import
java.math.BigDecimal
;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* DorisSinkBuilder
...
...
@@ -77,15 +45,14 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
Map
<
String
,
String
>
sink
=
config
.
getSink
();
Properties
properties
=
new
Properties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
sink
.
entrySet
())
{
if
(
Asserts
.
isNotNullString
(
entry
.
getKey
())
&&
Asserts
.
isNotNullString
(
entry
.
getValue
()))
{
properties
.
setProperty
(
entry
.
getKey
(),
entry
.
getValue
());
}
}
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
DorisExecutionOptions
.
Builder
dorisExecutionOptionsBuilder
=
DorisExecutionOptions
.
builder
();
Map
<
String
,
String
>
sink
=
config
.
getSink
();
if
(
sink
.
containsKey
(
"sink.batch.size"
))
{
dorisExecutionOptionsBuilder
.
setBatchSize
(
Integer
.
valueOf
(
sink
.
get
(
"sink.batch.size"
)));
}
...
...
@@ -98,77 +65,11 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
if
(
sink
.
containsKey
(
"sink.enable-delete"
))
{
dorisExecutionOptionsBuilder
.
setEnableDelete
(
Boolean
.
valueOf
(
sink
.
get
(
"sink.enable-delete"
)));
}
dorisExecutionOptionsBuilder
.
setStreamLoadProp
(
properties
);
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
final
String
schemaFieldName
=
config
.
getSchemaFieldName
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
SingleOutputStreamOperator
<
Map
>
mapOperator
=
dataStreamSource
.
map
(
new
MapFunction
<
String
,
Map
>()
{
@Override
public
Map
map
(
String
value
)
throws
Exception
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
return
objectMapper
.
readValue
(
value
,
Map
.
class
);
}
});
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
table
:
schema
.
getTables
())
{
final
String
tableName
=
table
.
getName
();
final
String
schemaName
=
table
.
getSchema
();
final
String
schemaTableName
=
table
.
getSchemaTableName
();
List
<
String
>
columnNameList
=
new
ArrayList
<>();
List
<
LogicalType
>
columnTypeList
=
new
ArrayList
<>();
buildColumn
(
columnNameList
,
columnTypeList
,
table
.
getColumns
());
dorisExecutionOptionsBuilder
.
setStreamLoadProp
(
getProperties
());
final
String
[]
columnNames
=
columnNameList
.
toArray
(
new
String
[
columnNameList
.
size
()]);
final
LogicalType
[]
columnTypes
=
columnTypeList
.
toArray
(
new
LogicalType
[
columnTypeList
.
size
()]);
SingleOutputStreamOperator
<
Map
>
filterOperator
=
mapOperator
.
filter
(
new
FilterFunction
<
Map
>()
{
@Override
public
boolean
filter
(
Map
value
)
throws
Exception
{
LinkedHashMap
source
=
(
LinkedHashMap
)
value
.
get
(
"source"
);
return
tableName
.
equals
(
source
.
get
(
"table"
).
toString
())
&&
schemaName
.
equals
(
source
.
get
(
schemaFieldName
).
toString
());
}
});
DataStream
<
RowData
>
rowDataDataStream
=
filterOperator
.
flatMap
(
new
FlatMapFunction
<
Map
,
RowData
>()
{
@Override
public
void
flatMap
(
Map
value
,
Collector
<
RowData
>
out
)
throws
Exception
{
switch
(
value
.
get
(
"op"
).
toString
())
{
case
"r"
:
GenericRowData
igenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
igenericRowData
.
setRowKind
(
RowKind
.
INSERT
);
Map
idata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
igenericRowData
.
setField
(
i
,
convertValue
(
idata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
igenericRowData
);
break
;
case
"d"
:
GenericRowData
dgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
dgenericRowData
.
setRowKind
(
RowKind
.
DELETE
);
Map
ddata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
dgenericRowData
.
setField
(
i
,
convertValue
(
ddata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
dgenericRowData
);
break
;
case
"u"
:
GenericRowData
ubgenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
ubgenericRowData
.
setRowKind
(
RowKind
.
UPDATE_BEFORE
);
Map
ubdata
=
(
Map
)
value
.
get
(
"before"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
ubgenericRowData
.
setField
(
i
,
convertValue
(
ubdata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
ubgenericRowData
);
GenericRowData
uagenericRowData
=
new
GenericRowData
(
columnNameList
.
size
());
uagenericRowData
.
setRowKind
(
RowKind
.
UPDATE_AFTER
);
Map
uadata
=
(
Map
)
value
.
get
(
"after"
);
for
(
int
i
=
0
;
i
<
columnNameList
.
size
();
i
++)
{
uagenericRowData
.
setField
(
i
,
convertValue
(
uadata
.
get
(
columnNameList
.
get
(
i
)),
columnTypeList
.
get
(
i
)));
}
out
.
collect
(
uagenericRowData
);
break
;
}
}
});
rowDataDataStream
.
addSink
(
DorisSink
.
sink
(
columnNames
,
...
...
@@ -182,60 +83,4 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.
setPassword
(
config
.
getSink
().
get
(
"password"
)).
build
()
));
}
}
}
return
dataStreamSource
;
}
private
void
buildColumn
(
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
,
List
<
Column
>
columns
)
{
for
(
Column
column
:
columns
)
{
columnNameList
.
add
(
column
.
getName
());
columnTypeList
.
add
(
getLogicalType
(
column
.
getJavaType
()));
}
}
private
LogicalType
getLogicalType
(
ColumnType
columnType
)
{
switch
(
columnType
)
{
case
STRING:
return
new
VarCharType
();
case
BOOLEAN:
case
JAVA_LANG_BOOLEAN:
return
new
BooleanType
();
case
BYTE:
case
JAVA_LANG_BYTE:
return
new
TinyIntType
();
case
SHORT:
case
JAVA_LANG_SHORT:
return
new
SmallIntType
();
case
LONG:
case
JAVA_LANG_LONG:
return
new
BigIntType
();
case
FLOAT:
case
JAVA_LANG_FLOAT:
return
new
FloatType
();
case
DOUBLE:
case
JAVA_LANG_DOUBLE:
return
new
DoubleType
();
case
DECIMAL:
return
new
DecimalType
();
case
INT:
case
INTEGER:
return
new
IntType
();
default
:
return
new
VarCharType
();
}
}
private
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
logicalType
instanceof
VarCharType
)
{
return
StringData
.
fromString
((
String
)
value
);
}
else
if
(
logicalType
instanceof
DecimalType
)
{
final
DecimalType
decimalType
=
((
DecimalType
)
logicalType
);
final
int
precision
=
decimalType
.
getPrecision
();
final
int
scala
=
decimalType
.
getScale
();
return
DecimalData
.
fromBigDecimal
((
BigDecimal
)
value
,
precision
,
scala
);
}
else
{
return
value
;
}
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/jdbc/JdbcSinkBuilder.java
View file @
c0e9de42
package
com
.
dlink
.
cdc
.
jdbc
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
/**
* MysqlCDCBuilder
...
...
@@ -37,6 +28,11 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
super
(
config
);
}
@Override
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
...
...
@@ -47,78 +43,4 @@ public class JdbcSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
return
new
JdbcSinkBuilder
(
config
);
}
/*@Override
public DataStreamSource build(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
*//*dataStreamSource.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
return value.containsKey("table_name") && table.getName().equals(value.get("table_name"));
}
});
dataStreamSource.addSink(
JdbcSink.sink(
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));*//*
}
}
}
return dataStreamSource;
}*/
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
final
List
<
Schema
>
schemaList
=
config
.
getSchemaList
();
if
(
Asserts
.
isNotNullCollection
(
schemaList
))
{
/*org.apache.flink.table.api.Table table = env.fromChangelogStream(dataStreamSource);
env.registerTable("cdc_table",table);*/
customTableEnvironment
.
registerDataStream
(
TABLE_NAME
,
dataStreamSource
);
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
StringBuilder
sb
=
new
StringBuilder
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
config
.
getSink
().
entrySet
())
{
sb
.
append
(
"'"
);
sb
.
append
(
entry
.
getKey
());
sb
.
append
(
"' = '"
);
sb
.
append
(
entry
.
getValue
());
sb
.
append
(
"',\n"
);
}
for
(
Schema
schema
:
schemaList
)
{
for
(
Table
item
:
schema
.
getTables
())
{
customTableEnvironment
.
executeSql
(
item
.
getFlinkTableSql
(
sb
.
toString
()
+
"'table-name' = '"
+
item
.
getSchemaTableName
()
+
"'\n"
));
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcBuilder
.
getInsertSQL
(
item
,
TABLE_NAME
));
if
(
operations
.
size
()
>
0
)
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
}
}
}
List
<
Transformation
<?>>
trans
=
customTableEnvironment
.
getPlanner
().
translate
(
modifyOperations
);
for
(
Transformation
<?>
item
:
trans
)
{
env
.
addOperator
(
item
);
}
}
return
dataStreamSource
;
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/kafka/KafkaSinkBuilder.java
View file @
c0e9de42
...
...
@@ -6,9 +6,12 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
import
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
;
import
org.apache.flink.connector.kafka.sink.KafkaSink
;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
...
...
@@ -40,6 +43,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
super
(
config
);
}
@Override
public
void
addSink
(
DataStream
<
RowData
>
rowDataDataStream
,
String
schemaTableName
,
List
<
String
>
columnNameList
,
List
<
LogicalType
>
columnTypeList
)
{
}
@Override
public
String
getHandle
()
{
return
KEY_WORD
;
...
...
@@ -51,7 +59,11 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
}
@Override
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
public
DataStreamSource
build
(
CDCBuilder
cdcBuilder
,
StreamExecutionEnvironment
env
,
CustomTableEnvironment
customTableEnvironment
,
DataStreamSource
<
String
>
dataStreamSource
)
{
if
(
Asserts
.
isNotNullString
(
config
.
getSink
().
get
(
"topic"
)))
{
dataStreamSource
.
sinkTo
(
KafkaSink
.<
String
>
builder
()
.
setBootstrapServers
(
config
.
getSink
().
get
(
"brokers"
))
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment