Unverified Commit a42c7a0c authored by chrofram's avatar chrofram Committed by GitHub

fix SQLSinkBuilder.buildRow in flink-client-1.11 and flink-client-1.12 (#864)

change Row.withPositions() to new Row()
parent c0355b5f
...@@ -102,7 +102,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -102,7 +102,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); Row irow = new Row(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -110,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -110,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(irow); out.collect(irow);
break; break;
case "d": case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); Row drow = new Row(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before"); Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -118,13 +118,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -118,13 +118,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(drow); out.collect(drow);
break; break;
case "u": case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); Row ubrow = new Row(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before"); Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(ubrow); out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); Row uarow = new Row(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after"); Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
......
...@@ -102,7 +102,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -102,7 +102,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); Row irow = new Row(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -110,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -110,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(irow); out.collect(irow);
break; break;
case "d": case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); Row drow = new Row(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before"); Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -118,13 +118,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -118,13 +118,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(drow); out.collect(drow);
break; break;
case "u": case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); Row ubrow = new Row(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before"); Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(ubrow); out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); Row uarow = new Row(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after"); Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment