Commit f7d6c383 authored by wenmo's avatar wenmo

[Feature-451][client] CDCSource sync field type convertion in Flink1.13

parent 2ff10ac9
...@@ -12,21 +12,27 @@ import org.apache.flink.table.data.DecimalData; ...@@ -12,21 +12,27 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
...@@ -195,7 +201,7 @@ public abstract class AbstractSinkBuilder { ...@@ -195,7 +201,7 @@ public abstract class AbstractSinkBuilder {
} }
} }
protected LogicalType getLogicalType(ColumnType columnType) { public LogicalType getLogicalType(ColumnType columnType) {
switch (columnType) { switch (columnType) {
case STRING: case STRING:
return new VarCharType(); return new VarCharType();
...@@ -218,10 +224,16 @@ public abstract class AbstractSinkBuilder { ...@@ -218,10 +224,16 @@ public abstract class AbstractSinkBuilder {
case JAVA_LANG_DOUBLE: case JAVA_LANG_DOUBLE:
return new DoubleType(); return new DoubleType();
case DECIMAL: case DECIMAL:
return new DecimalType(); return new DecimalType(columnType.getPrecision(), columnType.getScale());
case INT: case INT:
case INTEGER: case INTEGER:
return new IntType(); return new IntType();
case DATE:
case LOCALDATE:
return new DateType();
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
default: default:
return new VarCharType(); return new VarCharType();
} }
...@@ -233,11 +245,15 @@ public abstract class AbstractSinkBuilder { ...@@ -233,11 +245,15 @@ public abstract class AbstractSinkBuilder {
} }
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DateType) {
return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString());
} else if (logicalType instanceof TimestampType) {
return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value)));
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision(); final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale(); final int scale = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala); return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scale);
} else { } else {
return value; return value;
} }
......
...@@ -85,8 +85,6 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -85,8 +85,6 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
case "LATEST": case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -13,7 +13,6 @@ import com.dlink.cdc.AbstractCDCBuilder; ...@@ -13,7 +13,6 @@ import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Table;
import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions; import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
...@@ -78,8 +77,6 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -78,8 +77,6 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
case "LATEST": case "LATEST":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
sourceBuilder.startupOptions(StartupOptions.latest());
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -9,13 +9,13 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -9,13 +9,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -23,6 +23,8 @@ import org.apache.flink.util.Collector; ...@@ -23,6 +23,8 @@ import org.apache.flink.util.Collector;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -76,32 +78,32 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -76,32 +78,32 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public void flatMap(Map value, Collector<Row> out) throws Exception { public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "c": case "c":
Row irow = Row.withNames(RowKind.INSERT); Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(columnNameList.get(i), convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(irow); out.collect(irow);
break; break;
case "d": case "d":
Row drow = Row.withNames(RowKind.DELETE); Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before"); Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(columnNameList.get(i), convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(drow); out.collect(drow);
break; break;
case "u": case "u":
Row ubrow = Row.withNames(RowKind.UPDATE_BEFORE); Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before"); Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(columnNameList.get(i), convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(ubrow); out.collect(ubrow);
Row uarow = Row.withNames(RowKind.UPDATE_AFTER); Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after"); Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(columnNameList.get(i), convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(uarow); out.collect(uarow);
break; break;
...@@ -117,7 +119,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -117,7 +119,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List<String> columnNameList) { List<String> columnNameList) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
// Boolean dateToString = Boolean.valueOf(config.getSink().get("field.convertType.dateToString"));
customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ",")); customTableEnvironment.createTemporaryView(table.getSchemaTableNameWithUnderline(), rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName)); customTableEnvironment.executeSql(table.getFlinkDDL(getSinkConfigurationString(table), sinkTableName));
...@@ -172,13 +174,12 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -172,13 +174,12 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
if (value == null) { if (value == null) {
return null; return null;
} }
if (logicalType instanceof VarCharType) { if (logicalType instanceof DateType) {
return value; return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
} else if (logicalType instanceof TimestampType) {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); return new BigDecimal((String) value);
final int precision = decimalType.getPrecision();
final int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
} else { } else {
return value; return value;
} }
......
...@@ -133,7 +133,7 @@ public class FlinkCDCConfig { ...@@ -133,7 +133,7 @@ public class FlinkCDCConfig {
private boolean skip(String key) { private boolean skip(String key) {
switch (key) { switch (key) {
case "db": case "sink.db":
case "table.prefix": case "table.prefix":
case "table.suffix": case "table.suffix":
case "table.upper": case "table.upper":
......
...@@ -40,12 +40,20 @@ public enum ColumnType { ...@@ -40,12 +40,20 @@ public enum ColumnType {
private String javaType; private String javaType;
private String flinkType; private String flinkType;
private Integer precision;
private Integer scale;
ColumnType(String javaType, String flinkType) { ColumnType(String javaType, String flinkType) {
this.javaType = javaType; this.javaType = javaType;
this.flinkType = flinkType; this.flinkType = flinkType;
} }
public ColumnType setPrecisionAndScale(Integer precision, Integer scale) {
this.precision = precision;
this.scale = scale;
return this;
}
public String getJavaType() { public String getJavaType() {
return javaType; return javaType;
} }
...@@ -53,4 +61,20 @@ public enum ColumnType { ...@@ -53,4 +61,20 @@ public enum ColumnType {
public String getFlinkType() { public String getFlinkType() {
return flinkType; return flinkType;
} }
public Integer getPrecision() {
return precision;
}
public void setPrecision(Integer precision) {
this.precision = precision;
}
public Integer getScale() {
return scale;
}
public void setScala(Integer scale) {
this.scale = scale;
}
} }
...@@ -53,7 +53,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -53,7 +53,7 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
CDCSource cdcSource = CDCSource.build(statement); CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getType(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername() FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getType(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema() , cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(),cdcSource.getDebezium(), cdcSource.getSink()); , cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSink());
try { try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config); CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs(); Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
...@@ -68,16 +68,18 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -68,16 +68,18 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
} }
DriverConfig driverConfig = DriverConfig.build(allConfigMap.get(schemaName)); DriverConfig driverConfig = DriverConfig.build(allConfigMap.get(schemaName));
Driver driver = Driver.build(driverConfig); Driver driver = Driver.build(driverConfig);
final List<Table> tables = driver.getTablesAndColumns(schemaName); final List<Table> tables = driver.listTables(schemaName);
for (Table table : tables) { for (Table table : tables) {
if(Asserts.isNotNullCollection(tableRegList)){ if (Asserts.isNotNullCollection(tableRegList)) {
for (String tableReg : tableRegList) { for (String tableReg : tableRegList) {
if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) { if (table.getSchemaTableName().matches(tableReg) && !schema.getTables().contains(Table.build(table.getName()))) {
table.setColumns(driver.listColumns(schemaName, table.getName()));
schema.getTables().add(table); schema.getTables().add(table);
break; break;
} }
} }
}else { } else {
table.setColumns(driver.listColumns(schemaName, table.getName()));
schema.getTables().add(table); schema.getTables().add(table);
} }
} }
......
...@@ -13,31 +13,34 @@ import com.dlink.model.ColumnType; ...@@ -13,31 +13,34 @@ import com.dlink.model.ColumnType;
public class OracleTypeConvert implements ITypeConvert { public class OracleTypeConvert implements ITypeConvert {
@Override @Override
public ColumnType convert(Column column) { public ColumnType convert(Column column) {
ColumnType columnType = ColumnType.STRING;
if (Asserts.isNull(column)) { if (Asserts.isNull(column)) {
return ColumnType.STRING; return columnType;
} }
String t = column.getType().toLowerCase(); String t = column.getType().toLowerCase();
if (t.contains("char")) { if (t.contains("char")) {
return ColumnType.STRING; columnType = ColumnType.STRING;
} else if (t.contains("date")) { } else if (t.contains("date")) {
return ColumnType.DATE; columnType = ColumnType.LOCALDATETIME;
} else if (t.contains("timestamp")) { } else if (t.contains("timestamp")) {
return ColumnType.TIMESTAMP; columnType = ColumnType.TIMESTAMP;
} else if (t.contains("number")) { } else if (t.contains("number")) {
if (t.matches("number\\(+\\d\\)")) { if (t.matches("number\\(+\\d\\)")) {
return ColumnType.INTEGER; columnType = ColumnType.INTEGER;
} else if (t.matches("number\\(+\\d{2}+\\)")) { } else if (t.matches("number\\(+\\d{2}+\\)")) {
return ColumnType.LONG; columnType = ColumnType.LONG;
} else {
columnType = ColumnType.DECIMAL;
} }
return ColumnType.DECIMAL;
} else if (t.contains("float")) { } else if (t.contains("float")) {
return ColumnType.FLOAT; columnType = ColumnType.FLOAT;
} else if (t.contains("clob")) { } else if (t.contains("clob")) {
return ColumnType.STRING; columnType = ColumnType.STRING;
} else if (t.contains("blob")) { } else if (t.contains("blob")) {
return ColumnType.BYTES; columnType = ColumnType.BYTES;
} }
return ColumnType.STRING; columnType.setPrecisionAndScale(column.getPrecision(), column.getScale());
return columnType;
} }
@Override @Override
......
...@@ -24,7 +24,8 @@ public class OracleQuery extends AbstractDBQuery { ...@@ -24,7 +24,8 @@ public class OracleQuery extends AbstractDBQuery {
+ "(CASE WHEN A.DATA_PRECISION IS NULL THEN A.DATA_TYPE " + "(CASE WHEN A.DATA_PRECISION IS NULL THEN A.DATA_TYPE "
+ "WHEN NVL(A.DATA_SCALE, 0) > 0 THEN A.DATA_TYPE||'('||A.DATA_PRECISION||','||A.DATA_SCALE||')' " + "WHEN NVL(A.DATA_SCALE, 0) > 0 THEN A.DATA_TYPE||'('||A.DATA_PRECISION||','||A.DATA_SCALE||')' "
+ "ELSE A.DATA_TYPE||'('||A.DATA_PRECISION||')' END) " + "ELSE A.DATA_TYPE||'('||A.DATA_PRECISION||')' END) "
+ "ELSE A.DATA_TYPE END DATA_TYPE, B.COMMENTS,A.NULLABLE,DECODE((select count(1) from all_constraints pc,all_cons_columns pcc" + "ELSE A.DATA_TYPE END DATA_TYPE,A.DATA_PRECISION NUMERIC_PRECISION,A.DATA_SCALE NUMERIC_SCALE,"
+ " B.COMMENTS,A.NULLABLE,DECODE((select count(1) from all_constraints pc,all_cons_columns pcc"
+ " where pcc.column_name = A.column_name" + " where pcc.column_name = A.column_name"
+ " and pcc.constraint_name = pc.constraint_name" + " and pcc.constraint_name = pc.constraint_name"
+ " and pc.constraint_type ='P'" + " and pc.constraint_type ='P'"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment