Commit 28890f5a authored by liaowenwu's avatar liaowenwu

添加日志

parent ba62a7c4
......@@ -9,8 +9,10 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.*;
/**
......@@ -21,7 +23,8 @@ import java.util.*;
public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple3<String,String,Long>> {
private static final Map<String,Integer> STR_SQL_TYPE;
private EnvProperties dbInfoMap;
private final EnvProperties dbInfoMap;
private final OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag;
static {
STR_SQL_TYPE = MapUtil.newHashMap();
......@@ -40,53 +43,9 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
STR_SQL_TYPE.put("JSON",1);
}
public MysqlDataTransferFunction(EnvProperties envProps) {
public MysqlDataTransferFunction(EnvProperties envProps, OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag) {
this.dbInfoMap = envProps;
}
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder("REPLACE INTO ").append(table).append(" (");
List<String> valueList = new ArrayList<>();
for (String s : columnSet) {
sb.append("`").append(s).append("`,");
valueList.add(getValueString(dataObj, s, mysqlType.getString(s)));
}
sb.setLength(sb.length() - 1);
sb.append(") values (");
sb.append(String.join(",", valueList));
sb.append(")");
return sb.toString();
}
private static String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
StringBuilder whereClauseBuilder = new StringBuilder();
for (String pk : pkNameSet) {
if (whereClauseBuilder.length() > 0) {
whereClauseBuilder.append(" and ");
}
whereClauseBuilder.append(pk).append(" = ").append(getValueString(dataObj, pk, mysqlType.getString(pk)));
}
return String.format("DELETE FROM %s WHERE %s",table, whereClauseBuilder);
}
private static String getValueString(JSONObject dataObj,String columnKey,String mysqlType){
if(null == dataObj.get(columnKey)){
return "null";
}
String upperCase = mysqlType.toUpperCase();
//需要处理成字符串加引号的类型
if(STR_SQL_TYPE.containsKey(upperCase)){
return String.format("'%s'", dataObj.getString(columnKey).replace("\\","\\\\").replace("'", "\\'") );
}
//时间字段处理
if("DATE".equals(upperCase) || "DATETIME".equals(upperCase)){
String date = "DATETIME".equals(upperCase) ? DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd HH:mm:ss") : DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd");
return String.format("\"%s\"",date);
}
return dataObj.getString(columnKey);
this.logSlideTag = logSlideTag;
}
@Override
......@@ -110,9 +69,13 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0);
}
StringBuilder groupKeyBuilder = new StringBuilder(table);
StringBuilder pkColumns = new StringBuilder();
StringBuilder pkColumnVals = new StringBuilder();
for (String pk : pkNameSet) {
String pkValue = getValueString(dataObj, pk, mysqlType.getString(pk));
groupKeyBuilder.append("-").append(pkValue);
pkColumns.append(pk).append(",");
pkColumnVals.append(pkValue).append("-");
}
String groupKey = groupKeyBuilder.toString();
......@@ -135,10 +98,62 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
} else {
excueteSql = transferDeleteSql(table,dataObj,mysqlType,pkNameSet);
}
if (MapUtil.getBool(dbInfoMap, "log_enable", false)){
ctx.output(logSlideTag,buildLogData(type, table, pkColumns, pkColumnVals, ts, value.toJSONString()));
}
out.collect(Tuple3.of(excueteSql,groupKey,ts));
/*if (MapUtil.getBool(dbInfoMap, "log_enable", false)){
String logSql = buildLogData(type, table, pkNameSet, dataObj, ts, value.toJSONString());
ctx.output(toSlideTag,Tuple3.of(logSql,"dsc_cdc_log",ts));
}*/
}
private static Tuple6<String,String,String,String,String,Long> buildLogData(String type, String table, StringBuilder pkColumns, StringBuilder pkValues, long ts, String dataJsonStr) {
if (pkColumns.length() > 0) {
pkColumns.setLength(pkColumns.length()-1);
pkValues.setLength(pkValues.length()-1);
}
return Tuple6.of(table,type, pkColumns.toString(), pkValues.toString(),dataJsonStr.replace("\\","\\\\").replace("'", "\\'"),ts);
}
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder("REPLACE INTO ").append(table).append(" (");
List<String> valueList = new ArrayList<>();
for (String s : columnSet) {
sb.append("`").append(s).append("`,");
valueList.add(getValueString(dataObj, s, mysqlType.getString(s)));
}
sb.setLength(sb.length() - 1);
sb.append(") values (");
sb.append(String.join(",", valueList));
sb.append(")");
return sb.toString();
}
private static String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
StringBuilder whereClauseBuilder = new StringBuilder();
for (String pk : pkNameSet) {
if (whereClauseBuilder.length() > 0) {
whereClauseBuilder.append(" and ");
}
whereClauseBuilder.append(pk).append(" = ").append(getValueString(dataObj, pk, mysqlType.getString(pk)));
}
return String.format("DELETE FROM %s WHERE %s",table, whereClauseBuilder);
}
private static String getValueString(JSONObject dataObj,String columnKey,String mysqlType){
if(null == dataObj.get(columnKey)){
return "null";
}
String upperCase = mysqlType.toUpperCase();
//需要处理成字符串加引号的类型
if(STR_SQL_TYPE.containsKey(upperCase)){
return String.format("'%s'", dataObj.getString(columnKey).replace("\\","\\\\").replace("'", "\\'") );
}
//时间字段处理
if("DATE".equals(upperCase) || "DATETIME".equals(upperCase)){
String date = "DATETIME".equals(upperCase) ? DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd HH:mm:ss") : DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd");
return String.format("\"%s\"",date);
}
return dataObj.getString(columnKey);
}
}
......@@ -12,8 +12,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
......@@ -24,6 +29,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.LocalDateTime;
import java.time.ZoneId;
......@@ -55,7 +61,6 @@ public class SyncCustomerDataSource {
String offsetTimestamp = parameterTool.get("offsetTimestamp");
String propertiesPath = parameterTool.get("propertiesPath");
EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath);
//envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
System.out.println("读取到的配置文件:-> " + envProps.toString());
System.out.println("读取到的数据连接配置:->" + String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()));
System.out.println("获取到的kafka消费组:->" + EtlUtils.getKafkaGroup(envProps));
......@@ -87,9 +92,10 @@ public class SyncCustomerDataSource {
.uid("dsc-source");
//tsGroupStream.print("source==>");
OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag = new OutputTag<Tuple6<String,String,String,String,String,Long>>("log_slide") {};
SingleOutputStreamOperator<Tuple3<String, String, Long>> slide = tsGroupStream
.process(new MysqlDataTransferFunction(envProps))
.process(new MysqlDataTransferFunction(envProps,logSlideTag))
.name("dsc-sql")
.uid("dsc-sql");
......@@ -118,11 +124,40 @@ public class SyncCustomerDataSource {
.uid("dsc-max");
//groupWindowSqlResultStream.print("dsc-max==>");
groupWindowSqlResultStream.addSink(new MysqlDataTransferSinkBatch(envProps))
.name("dsc-sink")
.uid("dsc-sink");
DataStream<Tuple6<String,String,String,String,String,Long>> sideOutput = slide.getSideOutput(logSlideTag);
sideOutput.addSink(JdbcSink.sink(
"INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values (?,?,?,?,?,?)",
(ps,t) -> {
ps.setString(1,t.f0);
ps.setString(2,t.f1);
ps.setString(3,t.f2);
ps.setString(4,t.f3);
ps.setString(5,t.f4);
ps.setLong(6,t.f5);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl(getSinkUrl(envProps))
.withUsername(envProps.getDb_username())
.withPassword(envProps.getDb_password())
.build()
)).uid("deleteProject")
.name("deleteProject");
env.execute("dsc-client");
}
private static String getSinkUrl(EnvProperties envProps) {
return String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment