Commit 9c53352a authored by shezaixing's avatar shezaixing

处理唯一键update拆分cdc消息

parent ce59d998
......@@ -109,7 +109,7 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
resultList.add(Tuple3.of(excueteSql,groupKey,ts));
Boolean logEnable = MapUtil.getBool(dbInfoMap, "log_enable", false);
if (logEnable){
String logSql = buildLogData(type, table, pkNameSet, dataObj, ts);
String logSql = buildLogData(type, table, pkNameSet, dataObj, ts, value.toJSONString());
resultList.add(Tuple3.of(logSql,"dsc_cdc_log",ts));
}
resultFuture.complete(resultList);
......@@ -123,16 +123,16 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
});
}
private static String logSqlFormat = "INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,cdc_ts) values ('%s','%s','%s','%s', %d)";
private static String logSqlFormat = "INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values ('%s','%s','%s','%s','%s', %d)";
private String buildLogData(String type, String table, Set<String> pkNameSet, JSONObject dataObj, long ts) {
private String buildLogData(String type, String table, Set<String> pkNameSet, JSONObject dataObj, long ts, String dataJsonStr) {
List<String> pkValueList = new ArrayList<>();
for (String pk : pkNameSet) {
pkValueList.add(dataObj.getString(pk));
}
String pkColumns = String.join(",",pkNameSet);
String pkValues = String.join("-",pkValueList);
return String.format(logSqlFormat, table, type, pkColumns, pkValues, ts);
return String.format(logSqlFormat, table, type, pkColumns, pkValues, dataJsonStr, ts);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment