Unverified Commit a06a7900 authored by chrofram's avatar chrofram Committed by GitHub

[fix 833] [dlink-client] Error with SQLSinkBuilder.buildRow (#833) (#842)

Co-authored-by: 's avatarqiaoyiming <wenpin@tianmagroup.com>
parent dc2ce689
......@@ -101,7 +101,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.ofKind(RowKind.INSERT);
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -109,7 +109,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(irow);
break;
case "d":
Row drow = Row.ofKind(RowKind.DELETE);
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -117,13 +117,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(drow);
break;
case "u":
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
......
......@@ -101,7 +101,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.ofKind(RowKind.INSERT);
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -109,7 +109,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(irow);
break;
case "d":
Row drow = Row.ofKind(RowKind.DELETE);
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -117,13 +117,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(drow);
break;
case "u":
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
......
......@@ -82,62 +82,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList,
String schemaTableName) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes));
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
try {
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
}
}
}, rowTypeInfo);
}
private DataStream<Row> buildRow(
DataStream<Map> filterOperator,
List<String> columnNameList,
......@@ -157,7 +101,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.ofKind(RowKind.INSERT);
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -165,7 +109,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(irow);
break;
case "d":
Row drow = Row.ofKind(RowKind.DELETE);
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -173,13 +117,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(drow);
break;
case "u":
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
......
......@@ -102,7 +102,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.ofKind(RowKind.INSERT);
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -110,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(irow);
break;
case "d":
Row drow = Row.ofKind(RowKind.DELETE);
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -118,13 +118,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(drow);
break;
case "u":
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
......
......@@ -101,7 +101,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.ofKind(RowKind.INSERT);
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -109,7 +109,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(irow);
break;
case "d":
Row drow = Row.ofKind(RowKind.DELETE);
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
......@@ -117,13 +117,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(drow);
break;
case "u":
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment