Commit e2cad0bc authored by liaowenwu's avatar liaowenwu

优化代码

parent 33761772
...@@ -5,11 +5,11 @@ import cn.hutool.core.date.DateUtil; ...@@ -5,11 +5,11 @@ import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
/** /**
* @author lww * @author lww
...@@ -18,6 +18,7 @@ import java.util.*; ...@@ -18,6 +18,7 @@ import java.util.*;
public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple3<String,String,Long>> { public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple3<String,String,Long>> {
private static final Map<String,Integer> STR_SQL_TYPE; private static final Map<String,Integer> STR_SQL_TYPE;
private EnvProperties dbInfoMap;
static { static {
STR_SQL_TYPE = MapUtil.newHashMap(); STR_SQL_TYPE = MapUtil.newHashMap();
...@@ -36,6 +37,12 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -36,6 +37,12 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
STR_SQL_TYPE.put("JSON",1); STR_SQL_TYPE.put("JSON",1);
} }
public MysqlDataTransferFunction(EnvProperties envProps) {
this.dbInfoMap = envProps;
}
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) { private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet(); Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder("REPLACE INTO ").append(table).append(" ("); StringBuilder sb = new StringBuilder("REPLACE INTO ").append(table).append(" (");
...@@ -105,11 +112,11 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -105,11 +112,11 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
String excueteSql; String excueteSql;
JSONObject dataObj = value.getJSONArray("data").getJSONObject(0); JSONObject dataObj = value.getJSONArray("data").getJSONObject(0);
/*Boolean logicalDelete = MapUtil.getBool(dbInfoMap, "logical_delete", false); Boolean logicalDelete = MapUtil.getBool(dbInfoMap, "logical_delete", false);
if(logicalDelete){ if(logicalDelete){
mysqlType.put("is_del", "int"); mysqlType.put("is_del", "int");
dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0); dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0);
}*/ }
//处理先后顺序 //处理先后顺序
//获取该条数据的表名和主键作为唯一的groupKey //获取该条数据的表名和主键作为唯一的groupKey
...@@ -120,6 +127,10 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -120,6 +127,10 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
} }
String groupKey = groupKeyBuilder.toString(); String groupKey = groupKeyBuilder.toString();
//添加分表参数
//String shardingRule = dataObj.getString("shardingRule");
//String shardedTable = dbInfoMap.getSharded_table();
if("INSERT".equals(type) || "UPDATE".equals(type)){ if("INSERT".equals(type) || "UPDATE".equals(type)){
excueteSql = tranferInsertSql(table,dataObj,mysqlType); excueteSql = tranferInsertSql(table,dataObj,mysqlType);
} else { } else {
......
...@@ -89,7 +89,7 @@ public class SyncCustomerDataSource { ...@@ -89,7 +89,7 @@ public class SyncCustomerDataSource {
//tsGroupStream.print("source==>"); //tsGroupStream.print("source==>");
SingleOutputStreamOperator<Tuple3<String, String, Long>> slide = tsGroupStream SingleOutputStreamOperator<Tuple3<String, String, Long>> slide = tsGroupStream
.process(new MysqlDataTransferFunction()) .process(new MysqlDataTransferFunction(envProps))
.name("dsc-sql") .name("dsc-sql")
.uid("dsc-sql"); .uid("dsc-sql");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment