Commit 48c9e821 authored by wenmo's avatar wenmo

[Feature-477][client] CDCSOURCE add pkList

parent 73a62595
......@@ -294,4 +294,16 @@ public abstract class AbstractSinkBuilder {
return tableName;
}
protected List<String> getPKList(Table table){
List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){
return pks;
}
for(Column column: table.getColumns()){
if(column.isKeyFlag()){
pks.add(column.getName());
}
}
return pks;
}
}
......@@ -237,6 +237,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
protected String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
return SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
}
......@@ -294,4 +294,16 @@ public abstract class AbstractSinkBuilder {
return tableName;
}
protected List<String> getPKList(Table table){
List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){
return pks;
}
for(Column column: table.getColumns()){
if(column.isKeyFlag()){
pks.add(column.getName());
}
}
return pks;
}
}
......@@ -237,6 +237,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
protected String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
return SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
}
......@@ -294,4 +294,16 @@ public abstract class AbstractSinkBuilder {
return tableName;
}
protected List<String> getPKList(Table table){
List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){
return pks;
}
for(Column column: table.getColumns()){
if(column.isKeyFlag()){
pks.add(column.getName());
}
}
return pks;
}
}
package com.dlink.cdc.sql;
import com.dlink.model.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -115,7 +116,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
List<String> columnNameList) {
String sinkTableName = getSinkTableName(table);
String viewName="VIEW_"+table.getSchemaTableNameWithUnderline();
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
customTableEnvironment.executeSql(getFlinkDDL(table, sinkTableName));
......@@ -233,6 +234,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
protected String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
return SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
}
......@@ -294,4 +294,16 @@ public abstract class AbstractSinkBuilder {
return tableName;
}
protected List<String> getPKList(Table table){
List<String> pks = new ArrayList<>();
if(Asserts.isNullCollection(table.getColumns())){
return pks;
}
for(Column column: table.getColumns()){
if(column.isKeyFlag()){
pks.add(column.getName());
}
}
return pks;
}
}
......@@ -237,6 +237,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
protected String getSinkConfigurationString(Table table) {
String configurationString = SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", getSinkSchemaName(table));
return SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", getSinkTableName(table));
if (configurationString.contains("${pkList}")) {
configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", StringUtils.join(getPKList(table), "."));
}
return configurationString;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment