Commit 9521bf62 authored by wenmo's avatar wenmo

[Feature-461][client] CDCSource sync add sink table-name RegExp

parent 405f934d
...@@ -36,15 +36,14 @@ public abstract class AbstractCDCBuilder { ...@@ -36,15 +36,14 @@ public abstract class AbstractCDCBuilder {
public List<String> getSchemaList() { public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>(); List<String> schemaList = new ArrayList<>();
String schema = config.getSchema(); String schema = config.getSchema();
if (Asserts.isNullString(schema)) { if (Asserts.isNotNullString(schema)) {
return schemaList; String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
} }
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
schemaList.add(names[0]); schemaList.add(names[0]);
} }
...@@ -65,6 +64,6 @@ public abstract class AbstractCDCBuilder { ...@@ -65,6 +64,6 @@ public abstract class AbstractCDCBuilder {
} }
public String getSchemaFieldName() { public String getSchemaFieldName() {
return "db"; return "schema";
} }
} }
...@@ -12,21 +12,27 @@ import org.apache.flink.table.data.DecimalData; ...@@ -12,21 +12,27 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
...@@ -113,6 +119,7 @@ public abstract class AbstractSinkBuilder { ...@@ -113,6 +119,7 @@ public abstract class AbstractSinkBuilder {
@Override @Override
public void flatMap(Map value, Collector<RowData> out) throws Exception { public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r":
case "c": case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT); igenericRowData.setRowKind(RowKind.INSERT);
...@@ -164,17 +171,23 @@ public abstract class AbstractSinkBuilder { ...@@ -164,17 +171,23 @@ public abstract class AbstractSinkBuilder {
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource); SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
} }
...@@ -189,7 +202,7 @@ public abstract class AbstractSinkBuilder { ...@@ -189,7 +202,7 @@ public abstract class AbstractSinkBuilder {
} }
} }
protected LogicalType getLogicalType(ColumnType columnType) { public LogicalType getLogicalType(ColumnType columnType) {
switch (columnType) { switch (columnType) {
case STRING: case STRING:
return new VarCharType(); return new VarCharType();
...@@ -212,10 +225,16 @@ public abstract class AbstractSinkBuilder { ...@@ -212,10 +225,16 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
return new DecimalType(); return new DecimalType(columnType.getPrecision(), columnType.getScale());
case INT: case INT:
case INTEGER: case INTEGER:
return new IntType(); return new IntType();
case DATE:
case LOCALDATE:
return new DateType();
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
default: default:
return new VarCharType(); return new VarCharType();
} }
...@@ -227,11 +246,15 @@ public abstract class AbstractSinkBuilder { ...@@ -227,11 +246,15 @@ public abstract class AbstractSinkBuilder {
} }
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DateType) {
return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString());
} else if (logicalType instanceof TimestampType) {
return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value)));
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision(); final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale(); final int scale = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala); return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scale);
} else { } else {
return value; return value;
} }
...@@ -270,4 +293,5 @@ public abstract class AbstractSinkBuilder { ...@@ -270,4 +293,5 @@ public abstract class AbstractSinkBuilder {
} }
return tableName; return tableName;
} }
} }
...@@ -18,7 +18,6 @@ import com.dlink.cdc.CDCBuilder; ...@@ -18,7 +18,6 @@ import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
...@@ -61,11 +60,18 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -61,11 +60,18 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
.port(config.getPort()) .port(config.getPort())
.username(config.getUsername()) .username(config.getUsername())
.password(config.getPassword()); .password(config.getPassword());
if (Asserts.isNotNullString(config.getDatabase())) { String database = config.getDatabase();
sourceBuilder.databaseList(config.getDatabase().split(FlinkParamConstant.SPLIT)); if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
} else {
sourceBuilder.databaseList(new String[0]);
} }
if (Asserts.isNotNullString(config.getTable())) { List<String> schemaTableNameList = config.getSchemaTableNameList();
sourceBuilder.tableList(config.getTable().split(FlinkParamConstant.SPLIT)); if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
} }
sourceBuilder.deserializer(new StringDebeziumDeserializationSchema()); sourceBuilder.deserializer(new StringDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties); sourceBuilder.debeziumProperties(properties);
...@@ -75,11 +81,19 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -75,11 +81,19 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
public List<String> getSchemaList() { public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>(); List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase(); String schema = config.getDatabase();
if (Asserts.isNullString(schema)) { if (Asserts.isNotNullString(schema)) {
return schemaList; String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
} }
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
return schemaList; return schemaList;
} }
......
...@@ -9,13 +9,13 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -9,13 +9,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -23,6 +23,8 @@ import org.apache.flink.util.Collector; ...@@ -23,6 +23,8 @@ import org.apache.flink.util.Collector;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -75,6 +77,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -75,6 +77,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r":
case "c": case "c":
Row irow = Row.ofKind(RowKind.INSERT); Row irow = Row.ofKind(RowKind.INSERT);
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
...@@ -117,9 +120,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -117,9 +120,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName)); customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline())); List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
if (operations.size() > 0) { if (operations.size() > 0) {
...@@ -168,17 +170,66 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -168,17 +170,66 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return dataStreamSource; return dataStreamSource;
} }
public String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
protected String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) { if (value == null) {
return null; return null;
} }
if (logicalType instanceof VarCharType) { if (logicalType instanceof DateType) {
return value; return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} else if (logicalType instanceof TimestampType) {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); return new BigDecimal((String) value);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else { } else {
return value; return value;
} }
......
...@@ -36,15 +36,14 @@ public abstract class AbstractCDCBuilder { ...@@ -36,15 +36,14 @@ public abstract class AbstractCDCBuilder {
public List<String> getSchemaList() { public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>(); List<String> schemaList = new ArrayList<>();
String schema = config.getSchema(); String schema = config.getSchema();
if (Asserts.isNullString(schema)) { if (Asserts.isNotNullString(schema)) {
return schemaList; String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
} }
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
schemaList.add(names[0]); schemaList.add(names[0]);
} }
...@@ -65,6 +64,6 @@ public abstract class AbstractCDCBuilder { ...@@ -65,6 +64,6 @@ public abstract class AbstractCDCBuilder {
} }
public String getSchemaFieldName() { public String getSchemaFieldName() {
return "db"; return "schema";
} }
} }
...@@ -12,21 +12,27 @@ import org.apache.flink.table.data.DecimalData; ...@@ -12,21 +12,27 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
...@@ -113,6 +119,7 @@ public abstract class AbstractSinkBuilder { ...@@ -113,6 +119,7 @@ public abstract class AbstractSinkBuilder {
@Override @Override
public void flatMap(Map value, Collector<RowData> out) throws Exception { public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r":
case "c": case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT); igenericRowData.setRowKind(RowKind.INSERT);
...@@ -164,17 +171,23 @@ public abstract class AbstractSinkBuilder { ...@@ -164,17 +171,23 @@ public abstract class AbstractSinkBuilder {
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource); SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName);
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList); DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList);
addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
} }
} }
...@@ -189,7 +202,7 @@ public abstract class AbstractSinkBuilder { ...@@ -189,7 +202,7 @@ public abstract class AbstractSinkBuilder {
} }
} }
protected LogicalType getLogicalType(ColumnType columnType) { public LogicalType getLogicalType(ColumnType columnType) {
switch (columnType) { switch (columnType) {
case STRING: case STRING:
return new VarCharType(); return new VarCharType();
...@@ -212,10 +225,16 @@ public abstract class AbstractSinkBuilder { ...@@ -212,10 +225,16 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
return new DecimalType(); return new DecimalType(columnType.getPrecision(), columnType.getScale());
case INT: case INT:
case INTEGER: case INTEGER:
return new IntType(); return new IntType();
case DATE:
case LOCALDATE:
return new DateType();
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
default: default:
return new VarCharType(); return new VarCharType();
} }
...@@ -227,11 +246,15 @@ public abstract class AbstractSinkBuilder { ...@@ -227,11 +246,15 @@ public abstract class AbstractSinkBuilder {
} }
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DateType) {
return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString());
} else if (logicalType instanceof TimestampType) {
return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value)));
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision(); final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale(); final int scale = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala); return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scale);
} else { } else {
return value; return value;
} }
...@@ -270,4 +293,5 @@ public abstract class AbstractSinkBuilder { ...@@ -270,4 +293,5 @@ public abstract class AbstractSinkBuilder {
} }
return tableName; return tableName;
} }
} }
...@@ -61,27 +61,29 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -61,27 +61,29 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
.port(config.getPort()) .port(config.getPort())
.username(config.getUsername()) .username(config.getUsername())
.password(config.getPassword()); .password(config.getPassword());
if (Asserts.isNotNullString(config.getDatabase())) { String database = config.getDatabase();
sourceBuilder.databaseList(config.getDatabase().split(FlinkParamConstant.SPLIT)); if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
} else {
sourceBuilder.databaseList(new String[0]);
} }
if (Asserts.isNotNullString(config.getTable())) { List<String> schemaTableNameList = config.getSchemaTableNameList();
sourceBuilder.tableList(config.getTable().split(FlinkParamConstant.SPLIT)); if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
} }
sourceBuilder.deserializer(new StringDebeziumDeserializationSchema()); sourceBuilder.deserializer(new StringDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties); sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) { switch (config.getStartupMode().toLowerCase()) {
case "INITIAL": case "initial":
sourceBuilder.startupOptions(StartupOptions.initial()); sourceBuilder.startupOptions(StartupOptions.initial());
break; break;
case "EARLIEST": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
...@@ -92,11 +94,19 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -92,11 +94,19 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
public List<String> getSchemaList() { public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>(); List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase(); String schema = config.getDatabase();
if (Asserts.isNullString(schema)) { if (Asserts.isNotNullString(schema)) {
return schemaList; String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
} }
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
return schemaList; return schemaList;
} }
......
...@@ -9,13 +9,13 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -9,13 +9,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -23,6 +23,8 @@ import org.apache.flink.util.Collector; ...@@ -23,6 +23,8 @@ import org.apache.flink.util.Collector;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -75,6 +77,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -75,6 +77,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r":
case "c": case "c":
Row irow = Row.ofKind(RowKind.INSERT); Row irow = Row.ofKind(RowKind.INSERT);
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
...@@ -117,9 +120,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -117,9 +120,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName)); customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline())); List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
if (operations.size() > 0) { if (operations.size() > 0) {
...@@ -168,17 +170,66 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -168,17 +170,66 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return dataStreamSource; return dataStreamSource;
} }
public String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
protected String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) { if (value == null) {
return null; return null;
} }
if (logicalType instanceof VarCharType) { if (logicalType instanceof DateType) {
return value; return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} else if (logicalType instanceof TimestampType) {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); return new BigDecimal((String) value);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else { } else {
return value; return value;
} }
......
...@@ -36,15 +36,14 @@ public abstract class AbstractCDCBuilder { ...@@ -36,15 +36,14 @@ public abstract class AbstractCDCBuilder {
public List<String> getSchemaList() { public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>(); List<String> schemaList = new ArrayList<>();
String schema = config.getSchema(); String schema = config.getSchema();
if (Asserts.isNullString(schema)) { if (Asserts.isNotNullString(schema)) {
return schemaList; String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
} }
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
schemaList.add(names[0]); schemaList.add(names[0]);
} }
......
...@@ -119,6 +119,7 @@ public abstract class AbstractSinkBuilder { ...@@ -119,6 +119,7 @@ public abstract class AbstractSinkBuilder {
@Override @Override
public void flatMap(Map value, Collector<RowData> out) throws Exception { public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r":
case "c": case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT); igenericRowData.setRowKind(RowKind.INSERT);
......
...@@ -8,9 +8,12 @@ import org.apache.flink.table.types.logical.LogicalType; ...@@ -8,9 +8,12 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
...@@ -29,7 +32,7 @@ import com.dlink.model.Table; ...@@ -29,7 +32,7 @@ import com.dlink.model.Table;
*/ */
public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private final static String KEY_WORD = "datastream-hudi"; private final static String KEY_WORD = "hudi";
private static final long serialVersionUID = 5324199407472847422L; private static final long serialVersionUID = 5324199407472847422L;
public HudiSinkBuilder() { public HudiSinkBuilder() {
...@@ -78,16 +81,38 @@ public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -78,16 +81,38 @@ public class HudiSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
configuration.set(FlinkOptions.TABLE_NAME, tableName); configuration.set(FlinkOptions.TABLE_NAME, tableName);
configuration.set(FlinkOptions.HIVE_SYNC_DB, getSinkSchemaName(table)); configuration.set(FlinkOptions.HIVE_SYNC_DB, getSinkSchemaName(table));
configuration.set(FlinkOptions.HIVE_SYNC_TABLE, tableName); configuration.set(FlinkOptions.HIVE_SYNC_TABLE, tableName);
long ckpTimeout = rowDataDataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
RowType rowType = RowType.of(false, columnTypes, columnNames); RowType rowType = RowType.of(false, columnTypes, columnNames);
configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA,
AvroSchemaConverter.convertToSchema(rowType, tableName).toString()); AvroSchemaConverter.convertToSchema(rowType).toString());
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism, rowDataDataStream); // bulk_insert mode
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism, hoodieRecordDataStream); final String writeOperation = configuration.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
Pipelines.bulkInsert(configuration, rowType, rowDataDataStream);
} else
// Append mode
if (OptionsResolver.isAppendMode(configuration)) {
Pipelines.append(configuration, rowType, rowDataDataStream);
} else {
if (isMor) { DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism, rowDataDataStream);
Pipelines.clean(configuration, pipeline); DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism, hoodieRecordDataStream);
Pipelines.compact(configuration, pipeline);
} // compaction
if (StreamerUtil.needsAsyncCompaction(configuration)) {
Pipelines.compact(configuration, pipeline);
} else {
Pipelines.clean(configuration, pipeline);
}
if (isMor) {
Pipelines.clean(configuration, pipeline);
Pipelines.compact(configuration, pipeline);
}
}
} }
} }
...@@ -67,22 +67,23 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -67,22 +67,23 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
if (Asserts.isNotNullString(database)) { if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT); String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases); sourceBuilder.databaseList(databases);
} else {
sourceBuilder.databaseList(new String[0]);
} }
String table = config.getTable(); List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullString(table)) { if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(table); sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
} }
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties); sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) { switch (config.getStartupMode().toLowerCase()) {
case "INITIAL": case "initial":
sourceBuilder.startupOptions(StartupOptions.initial()); sourceBuilder.startupOptions(StartupOptions.initial());
break; break;
case "EARLIEST": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
} }
...@@ -95,11 +96,19 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -95,11 +96,19 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
public List<String> getSchemaList() { public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>(); List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase(); String schema = config.getDatabase();
if (Asserts.isNullString(schema)) { if (Asserts.isNotNullString(schema)) {
return schemaList; String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
} }
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
return schemaList; return schemaList;
} }
......
...@@ -12,6 +12,7 @@ import com.dlink.assertion.Asserts; ...@@ -12,6 +12,7 @@ import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder; import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions; import com.ververica.cdc.connectors.oracle.table.StartupOptions;
...@@ -61,20 +62,25 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -61,20 +62,25 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
.database(config.getDatabase()); .database(config.getDatabase());
String schema = config.getSchema(); String schema = config.getSchema();
if (Asserts.isNotNullString(schema)) { if (Asserts.isNotNullString(schema)) {
sourceBuilder.schemaList(schema); String[] schemas = schema.split(FlinkParamConstant.SPLIT);
sourceBuilder.schemaList(schemas);
} else {
sourceBuilder.schemaList(new String[0]);
} }
String table = config.getTable(); List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullString(table)) { if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(table); sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
} }
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties); sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) { switch (config.getStartupMode().toLowerCase()) {
case "INITIAL": case "initial":
sourceBuilder.startupOptions(StartupOptions.initial()); sourceBuilder.startupOptions(StartupOptions.initial());
break; break;
case "LATEST": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
} }
......
...@@ -77,6 +77,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -77,6 +77,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r":
case "c": case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
...@@ -119,9 +120,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -119,9 +120,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
// Boolean dateToString = Boolean.valueOf(config.getSink().get("field.convertType.dateToString"));
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName)); customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline())); List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
if (operations.size() > 0) { if (operations.size() > 0) {
...@@ -170,6 +170,56 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -170,6 +170,56 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return dataStreamSource; return dataStreamSource;
} }
public String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
protected String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) { if (value == null) {
return null; return null;
......
...@@ -36,15 +36,14 @@ public abstract class AbstractCDCBuilder { ...@@ -36,15 +36,14 @@ public abstract class AbstractCDCBuilder {
public List<String> getSchemaList() { public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>(); List<String> schemaList = new ArrayList<>();
String schema = config.getSchema(); String schema = config.getSchema();
if (Asserts.isNullString(schema)) { if (Asserts.isNotNullString(schema)) {
return schemaList; String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
} }
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
List<String> tableList = getTableList(); List<String> tableList = getTableList();
for (String tableName : tableList) { for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) { if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\."); String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) { if (!schemaList.contains(names[0])) {
schemaList.add(names[0]); schemaList.add(names[0]);
} }
......
...@@ -12,21 +12,27 @@ import org.apache.flink.table.data.DecimalData; ...@@ -12,21 +12,27 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
...@@ -113,6 +119,7 @@ public abstract class AbstractSinkBuilder { ...@@ -113,6 +119,7 @@ public abstract class AbstractSinkBuilder {
@Override @Override
public void flatMap(Map value, Collector<RowData> out) throws Exception { public void flatMap(Map value, Collector<RowData> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r":
case "c": case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT); igenericRowData.setRowKind(RowKind.INSERT);
...@@ -195,7 +202,7 @@ public abstract class AbstractSinkBuilder { ...@@ -195,7 +202,7 @@ public abstract class AbstractSinkBuilder {
} }
} }
protected LogicalType getLogicalType(ColumnType columnType) { public LogicalType getLogicalType(ColumnType columnType) {
switch (columnType) { switch (columnType) {
case STRING: case STRING:
return new VarCharType(); return new VarCharType();
...@@ -218,10 +225,16 @@ public abstract class AbstractSinkBuilder { ...@@ -218,10 +225,16 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
return new DecimalType(); return new DecimalType(columnType.getPrecision(), columnType.getScale());
case INT: case INT:
case INTEGER: case INTEGER:
return new IntType(); return new IntType();
case DATE:
case LOCALDATE:
return new DateType();
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
default: default:
return new VarCharType(); return new VarCharType();
} }
...@@ -233,11 +246,15 @@ public abstract class AbstractSinkBuilder { ...@@ -233,11 +246,15 @@ public abstract class AbstractSinkBuilder {
} }
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DateType) {
return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString());
} else if (logicalType instanceof TimestampType) {
return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value)));
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision(); final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale(); final int scale = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala); return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scale);
} else { } else {
return value; return value;
} }
...@@ -276,4 +293,5 @@ public abstract class AbstractSinkBuilder { ...@@ -276,4 +293,5 @@ public abstract class AbstractSinkBuilder {
} }
return tableName; return tableName;
} }
} }
...@@ -30,7 +30,7 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; ...@@ -30,7 +30,7 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/ **/
public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "mysql-cdc"; private final static String KEY_WORD = "mysql-cdc";
private final static String METADATA_TYPE = "MySql"; private final static String METADATA_TYPE = "MySql";
public MysqlCDCBuilder() { public MysqlCDCBuilder() {
...@@ -67,26 +67,25 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -67,26 +67,25 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
if (Asserts.isNotNullString(database)) { if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT); String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases); sourceBuilder.databaseList(databases);
} else {
sourceBuilder.databaseList(new String[0]);
} }
String table = config.getTable(); List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullString(table)) { if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(table); sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
} }
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties); sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) { switch (config.getStartupMode().toLowerCase()) {
case "INITIAL": case "initial":
sourceBuilder.startupOptions(StartupOptions.initial()); sourceBuilder.startupOptions(StartupOptions.initial());
break; break;
case "EARLIEST": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
...@@ -97,11 +96,19 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -97,11 +96,19 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
public List<String> getSchemaList() { public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>(); List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase(); String schema = config.getDatabase();
if (Asserts.isNullString(schema)) { if (Asserts.isNotNullString(schema)) {
return schemaList; String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
} }
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
return schemaList; return schemaList;
} }
......
...@@ -12,6 +12,7 @@ import com.dlink.assertion.Asserts; ...@@ -12,6 +12,7 @@ import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder; import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions; import com.ververica.cdc.connectors.oracle.table.StartupOptions;
...@@ -25,9 +26,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; ...@@ -25,9 +26,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
**/ **/
public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private String KEY_WORD = "oracle-cdc"; private final static String KEY_WORD = "oracle-cdc";
private final static String METADATA_TYPE = "MySql"; private final static String METADATA_TYPE = "Oracle";
public OracleCDCBuilder() { public OracleCDCBuilder() {
} }
...@@ -62,24 +62,27 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -62,24 +62,27 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
.database(config.getDatabase()); .database(config.getDatabase());
String schema = config.getSchema(); String schema = config.getSchema();
if (Asserts.isNotNullString(schema)) { if (Asserts.isNotNullString(schema)) {
sourceBuilder.schemaList(schema); String[] schemas = schema.split(FlinkParamConstant.SPLIT);
sourceBuilder.schemaList(schemas);
} else {
sourceBuilder.schemaList(new String[0]);
} }
String table = config.getTable(); List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullString(table)) { if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(table); sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
} }
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties); sourceBuilder.debeziumProperties(properties);
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toUpperCase()) { switch (config.getStartupMode().toLowerCase()) {
case "INITIAL": case "initial":
sourceBuilder.startupOptions(StartupOptions.initial()); sourceBuilder.startupOptions(StartupOptions.initial());
break; break;
case "LATEST": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -9,13 +9,13 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -9,13 +9,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -23,6 +23,8 @@ import org.apache.flink.util.Collector; ...@@ -23,6 +23,8 @@ import org.apache.flink.util.Collector;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -75,33 +77,34 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -75,33 +77,34 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r":
case "c": case "c":
Row irow = Row.withNames(RowKind.INSERT); Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(columnNameList.get(i), convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(irow); out.collect(irow);
break; break;
case "d": case "d":
Row drow = Row.withNames(RowKind.DELETE); Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before"); Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(columnNameList.get(i), convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(drow); out.collect(drow);
break; break;
case "u": case "u":
Row ubrow = Row.withNames(RowKind.UPDATE_BEFORE); Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before"); Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(columnNameList.get(i), convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(ubrow); out.collect(ubrow);
Row uarow = Row.withNames(RowKind.UPDATE_AFTER); Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after"); Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(columnNameList.get(i), convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(uarow); out.collect(uarow);
break; break;
...@@ -117,9 +120,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -117,9 +120,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName)); customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline())); List<Operation> operations = customTableEnvironment.getParser().parse(table.getCDCSqlInsert(sinkTableName, table.getSchemaTableNameWithUnderline()));
if (operations.size() > 0) { if (operations.size() > 0) {
...@@ -168,17 +170,66 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -168,17 +170,66 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return dataStreamSource; return dataStreamSource;
} }
public String getFlinkDDL(Table table, String tableName) {
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (\n");
List<String> pks = new ArrayList<>();
for (int i = 0; i < table.getColumns().size(); i++) {
String type = table.getColumns().get(i).getJavaType().getFlinkType();
sb.append(" ");
if (i > 0) {
sb.append(",");
}
sb.append("`");
sb.append(table.getColumns().get(i).getName());
sb.append("` ");
sb.append(convertSinkColumnType(type));
sb.append("\n");
if (table.getColumns().get(i).isKeyFlag()) {
pks.add(table.getColumns().get(i).getName());
}
}
StringBuilder pksb = new StringBuilder("PRIMARY KEY ( ");
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
pksb.append(",");
}
pksb.append("`");
pksb.append(pks.get(i));
pksb.append("`");
}
pksb.append(" ) NOT ENFORCED\n");
if (pks.size() > 0) {
sb.append(" ,");
sb.append(pksb);
}
sb.append(") WITH (\n");
sb.append(getSinkConfigurationString(table));
sb.append(")\n");
return sb.toString();
}
protected String convertSinkColumnType(String type) {
if (config.getSink().get("connector").equals("hudi")) {
if (type.equals("TIMESTAMP")) {
return "TIMESTAMP(3)";
}
}
return type;
}
protected Object convertValue(Object value, LogicalType logicalType) { protected Object convertValue(Object value, LogicalType logicalType) {
if (value == null) { if (value == null) {
return null; return null;
} }
if (logicalType instanceof VarCharType) { if (logicalType instanceof DateType) {
return value; return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} else if (logicalType instanceof TimestampType) {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); return new BigDecimal((String) value);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else { } else {
return value; return value;
} }
......
...@@ -21,6 +21,7 @@ public class FlinkCDCConfig { ...@@ -21,6 +21,7 @@ public class FlinkCDCConfig {
private String database; private String database;
private String schema; private String schema;
private String table; private String table;
private List<String> schemaTableNameList;
private String startupMode; private String startupMode;
private Map<String, String> debezium; private Map<String, String> debezium;
private Map<String, String> sink; private Map<String, String> sink;
...@@ -131,6 +132,14 @@ public class FlinkCDCConfig { ...@@ -131,6 +132,14 @@ public class FlinkCDCConfig {
return sink; return sink;
} }
public List<String> getSchemaTableNameList() {
return schemaTableNameList;
}
public void setSchemaTableNameList(List<String> schemaTableNameList) {
this.schemaTableNameList = schemaTableNameList;
}
private boolean skip(String key) { private boolean skip(String key) {
switch (key) { switch (key) {
case "sink.db": case "sink.db":
......
...@@ -25,18 +25,18 @@ public enum ColumnType { ...@@ -25,18 +25,18 @@ public enum ColumnType {
DOUBLE("double", "DOUBLE NOT NULL"), DOUBLE("double", "DOUBLE NOT NULL"),
DATE("java.sql.Date", "DATE"), DATE("java.sql.Date", "DATE"),
LOCALDATE("java.time.LocalDate", "DATE"), LOCALDATE("java.time.LocalDate", "DATE"),
TIME("java.sql.Time", "TIME(0)"), TIME("java.sql.Time", "TIME"),
LOCALTIME("java.time.LocalTime", "TIME(9)"), LOCALTIME("java.time.LocalTime", "TIME"),
TIMESTAMP("java.sql.Timestamp", "TIMESTAMP(9)"), TIMESTAMP("java.sql.Timestamp", "TIMESTAMP"),
LOCALDATETIME("java.time.LocalDateTime", "TIMESTAMP(9)"), LOCALDATETIME("java.time.LocalDateTime", "TIMESTAMP"),
OFFSETDATETIME("java.time.OffsetDateTime", "TIMESTAMP(9) WITH TIME ZONE"), OFFSETDATETIME("java.time.OffsetDateTime", "TIMESTAMP WITH TIME ZONE"),
INSTANT("java.time.Instant", "TIMESTAMP_LTZ(9)"), INSTANT("java.time.Instant", "TIMESTAMP_LTZ"),
DURATION("java.time.Duration", "INVERVAL SECOND(9)"), DURATION("java.time.Duration", "INVERVAL SECOND"),
PERIOD("java.time.Period", "INTERVAL YEAR(4) TO MONTH"), PERIOD("java.time.Period", "INTERVAL YEAR TO MONTH"),
DECIMAL("java.math.BigDecimal", "DECIMAL"), DECIMAL("java.math.BigDecimal", "DECIMAL"),
BYTES("byte[]", "BYTES"), BYTES("byte[]", "BYTES"),
T("T[]", "ARRAY<T>"), T("T[]", "ARRAY"),
MAP("java.util.Map<K, V>", "MAP<K, V>"); MAP("java.util.Map<K, V>", "MAP");
private String javaType; private String javaType;
private String flinkType; private String flinkType;
...@@ -59,7 +59,11 @@ public enum ColumnType { ...@@ -59,7 +59,11 @@ public enum ColumnType {
} }
public String getFlinkType() { public String getFlinkType() {
return flinkType; if (flinkType.equals("DECIMAL")) {
return flinkType + "(" + precision + "," + scale + ")";
} else {
return flinkType;
}
} }
public Integer getPrecision() { public Integer getPrecision() {
......
...@@ -17,7 +17,7 @@ import com.dlink.parser.SingleSqlParserFactory; ...@@ -17,7 +17,7 @@ import com.dlink.parser.SingleSqlParserFactory;
*/ */
public class CDCSource { public class CDCSource {
private String type; private String connector;
private String statement; private String statement;
private String name; private String name;
private String hostname; private String hostname;
...@@ -33,9 +33,9 @@ public class CDCSource { ...@@ -33,9 +33,9 @@ public class CDCSource {
private Map<String, String> debezium; private Map<String, String> debezium;
private Map<String, String> sink; private Map<String, String> sink;
public CDCSource(String type, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> sink) { Map<String, String> debezium, Map<String, String> sink) {
this.type = type; this.connector = connector;
this.statement = statement; this.statement = statement;
this.name = name; this.name = name;
this.hostname = hostname; this.hostname = hostname;
...@@ -73,7 +73,7 @@ public class CDCSource { ...@@ -73,7 +73,7 @@ public class CDCSource {
} }
} }
CDCSource cdcSource = new CDCSource( CDCSource cdcSource = new CDCSource(
config.get("type"), config.get("connector"),
statement, statement,
map.get("CDCSOURCE").toString(), map.get("CDCSOURCE").toString(),
config.get("hostname"), config.get("hostname"),
...@@ -82,7 +82,7 @@ public class CDCSource { ...@@ -82,7 +82,7 @@ public class CDCSource {
config.get("password"), config.get("password"),
Integer.valueOf(config.get("checkpoint")), Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")), Integer.valueOf(config.get("parallelism")),
config.get("startup"), config.get("scan.startup.mode"),
debezium, debezium,
sink sink
); );
...@@ -110,12 +110,12 @@ public class CDCSource { ...@@ -110,12 +110,12 @@ public class CDCSource {
return map; return map;
} }
public String getType() { public String getConnector() {
return type; return connector;
} }
public void setType(String type) { public void setConnector(String connector) {
this.type = type; this.connector = connector;
} }
public String getStatement() { public String getStatement() {
......
...@@ -51,7 +51,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -51,7 +51,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
@Override @Override
public TableResult build(Executor executor) { public TableResult build(Executor executor) {
CDCSource cdcSource = CDCSource.build(statement); CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getType(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername() FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema() , cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSink()); , cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSink());
try { try {
...@@ -61,6 +61,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -61,6 +61,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
List<Schema> schemaList = new ArrayList<>(); List<Schema> schemaList = new ArrayList<>();
final List<String> schemaNameList = cdcBuilder.getSchemaList(); final List<String> schemaNameList = cdcBuilder.getSchemaList();
final List<String> tableRegList = cdcBuilder.getTableList(); final List<String> tableRegList = cdcBuilder.getTableList();
final List<String> schemaTableNameList = new ArrayList<>();
for (String schemaName : schemaNameList) { for (String schemaName : schemaNameList) {
Schema schema = Schema.build(schemaName); Schema schema = Schema.build(schemaName);
if (!allConfigMap.containsKey(schemaName)) { if (!allConfigMap.containsKey(schemaName)) {
...@@ -75,16 +76,19 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -75,16 +76,19 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) { if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) {
table.setColumns(driver.listColumns(schemaName, table.getName())); table.setColumns(driver.listColumns(schemaName, table.getName()));
schema.getTables().add(table); schema.getTables().add(table);
schemaTableNameList.add(table.getSchemaTableName());
break; break;
} }
} }
} else { } else {
table.setColumns(driver.listColumns(schemaName, table.getName())); table.setColumns(driver.listColumns(schemaName, table.getName()));
schemaTableNameList.add(table.getSchemaTableName());
schema.getTables().add(table); schema.getTables().add(table);
} }
} }
schemaList.add(schema); schemaList.add(schema);
} }
config.setSchemaTableNameList(schemaTableNameList);
config.setSchemaList(schemaList); config.setSchemaList(schemaList);
StreamExecutionEnvironment streamExecutionEnvironment = executor.getStreamExecutionEnvironment(); StreamExecutionEnvironment streamExecutionEnvironment = executor.getStreamExecutionEnvironment();
if (Asserts.isNotNull(config.getParallelism())) { if (Asserts.isNotNull(config.getParallelism())) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment