Commit 88805488 authored by liaowenwu's avatar liaowenwu

修改kafka连接

parent b366e456
......@@ -30,6 +30,7 @@ public class MysqlDataSlideSink extends RichSinkFunction<Tuple6<String,String,St
private transient DruidDataSource dataSource;
private static final int MAX_RETRIES = 3; // 最大重试次数
private static final int RETRY_DELAY_MS = 100; // 重试间隔时间
private static final int QUEUE_CAPACITY = 1000;
private static final String SQL = "INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values (?,?,?,?,?,?)";
public MysqlDataSlideSink(EnvProperties envProps) {
......@@ -38,7 +39,10 @@ public class MysqlDataSlideSink extends RichSinkFunction<Tuple6<String,String,St
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(5, 5, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
executorService = new ThreadPoolExecutor(8, 20, 60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
//初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl);
......@@ -56,8 +60,18 @@ public class MysqlDataSlideSink extends RichSinkFunction<Tuple6<String,String,St
@Override
public void close() throws Exception {
executorService.shutdown();
dataSource.close();
try {
if (executorService != null) {
executorService.shutdown();
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
}
} finally {
if (dataSource != null) {
dataSource.close();
}
}
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment