Unverified Commit f7f7810f authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-499] [connector] Flink oracle connector can't cast CLOB to String

[Fix-499] [connector] Flink oracle connector can't cast CLOB to String
parents 44083ff6 8fcf21fd
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.apache.flink.connector.jdbc.internal.converter; package org.apache.flink.connector.jdbc.internal.converter;
import oracle.sql.CLOB;
import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
...@@ -101,7 +103,14 @@ public class OracleRowConverter extends AbstractJdbcRowConverter { ...@@ -101,7 +103,14 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
}; };
case CHAR: case CHAR:
case VARCHAR: case VARCHAR:
return val -> StringData.fromString((String) val); return (val) -> {
if(val instanceof CLOB){
CLOB clob = (CLOB) val;
return StringData.fromString(clob == null ? null : clob.stringValue());
}else {
return StringData.fromString((String) val);
}
};
case BINARY: case BINARY:
case VARBINARY: case VARBINARY:
return val -> (byte[]) val; return val -> (byte[]) val;
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.apache.flink.connector.jdbc.internal.converter; package org.apache.flink.connector.jdbc.internal.converter;
import oracle.sql.CLOB;
import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
...@@ -101,7 +103,14 @@ public class OracleRowConverter extends AbstractJdbcRowConverter { ...@@ -101,7 +103,14 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
}; };
case CHAR: case CHAR:
case VARCHAR: case VARCHAR:
return val -> StringData.fromString((String) val); return (val) -> {
if(val instanceof CLOB){
CLOB clob = (CLOB) val;
return StringData.fromString(clob == null ? null : clob.stringValue());
}else {
return StringData.fromString((String) val);
}
};
case BINARY: case BINARY:
case VARBINARY: case VARBINARY:
return val -> (byte[]) val; return val -> (byte[]) val;
......
package org.apache.flink.connector.jdbc.internal.converter; package org.apache.flink.connector.jdbc.internal.converter;
import oracle.sql.CLOB;
import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
...@@ -83,7 +85,14 @@ public class OracleRowConverter extends AbstractJdbcRowConverter { ...@@ -83,7 +85,14 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
}; };
case CHAR: case CHAR:
case VARCHAR: case VARCHAR:
return val -> StringData.fromString((String) val); return (val) -> {
if(val instanceof CLOB){
CLOB clob = (CLOB) val;
return StringData.fromString(clob == null ? null : clob.stringValue());
}else {
return StringData.fromString((String) val);
}
};
case BINARY: case BINARY:
case VARBINARY: case VARBINARY:
return val -> (byte[]) val; return val -> (byte[]) val;
......
package org.apache.flink.connector.jdbc.internal.converter; package org.apache.flink.connector.jdbc.internal.converter;
import oracle.sql.CLOB;
import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
...@@ -83,7 +85,14 @@ public class OracleRowConverter extends AbstractJdbcRowConverter { ...@@ -83,7 +85,14 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
}; };
case CHAR: case CHAR:
case VARCHAR: case VARCHAR:
return val -> StringData.fromString((String) val); return (val) -> {
if(val instanceof CLOB){
CLOB clob = (CLOB) val;
return StringData.fromString(clob == null ? null : clob.stringValue());
}else {
return StringData.fromString((String) val);
}
};
case BINARY: case BINARY:
case VARBINARY: case VARBINARY:
return val -> val; return val -> val;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment