Unverified Commit 28694eb3 authored by wanshicheng's avatar wanshicheng Committed by GitHub

fix #674 Support MySQL varbinary and binary in CDCSOURCE. (#675)

parent 6cfe7bae
...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData; ...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder { ...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder {
case LOCALDATETIME: case LOCALDATETIME:
case TIMESTAMP: case TIMESTAMP:
return new TimestampType(); return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default: default:
return new VarCharType(); return new VarCharType();
} }
......
...@@ -12,16 +12,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -12,16 +12,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
...@@ -223,6 +220,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -223,6 +220,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} else { } else {
return value; return value;
} }
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData; ...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder { ...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder {
case LOCALDATETIME: case LOCALDATETIME:
case TIMESTAMP: case TIMESTAMP:
return new TimestampType(); return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default: default:
return new VarCharType(); return new VarCharType();
} }
......
...@@ -12,16 +12,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -12,16 +12,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
...@@ -223,6 +220,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -223,6 +220,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} else { } else {
return value; return value;
} }
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData; ...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder { ...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder {
case LOCALDATETIME: case LOCALDATETIME:
case TIMESTAMP: case TIMESTAMP:
return new TimestampType(); return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default: default:
return new VarCharType(); return new VarCharType();
} }
......
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
...@@ -12,11 +14,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -12,11 +14,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -35,12 +33,12 @@ import com.dlink.cdc.AbstractSinkBuilder; ...@@ -35,12 +33,12 @@ import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder; import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment; import com.dlink.executor.CustomTableEnvironment;
import javax.xml.bind.DatatypeConverter;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
...@@ -225,6 +223,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -225,6 +223,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} else { } else {
return value; return value;
} }
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData; ...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder { ...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder {
case LOCALDATETIME: case LOCALDATETIME:
case TIMESTAMP: case TIMESTAMP:
return new TimestampType(); return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default: default:
return new VarCharType(); return new VarCharType();
} }
......
...@@ -12,16 +12,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -12,16 +12,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
...@@ -225,6 +222,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -225,6 +222,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} else { } else {
return value; return value;
} }
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData; ...@@ -14,18 +14,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder { ...@@ -251,6 +240,8 @@ public abstract class AbstractSinkBuilder {
case LOCALDATETIME: case LOCALDATETIME:
case TIMESTAMP: case TIMESTAMP:
return new TimestampType(); return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default: default:
return new VarCharType(); return new VarCharType();
} }
......
...@@ -12,16 +12,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -12,16 +12,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
...@@ -225,6 +222,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -225,6 +222,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} else { } else {
return value; return value;
} }
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else { } else {
return value; return value;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment