Commit b366e456 authored by liaowenwu's avatar liaowenwu

修改kafka连接

parent 0ef6a5f8
...@@ -16,10 +16,7 @@ import org.slf4j.LoggerFactory; ...@@ -16,10 +16,7 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.util.Date; import java.util.Date;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MysqlDataTransferSink extends RichSinkFunction<String> { public class MysqlDataTransferSink extends RichSinkFunction<String> {
...@@ -29,6 +26,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -29,6 +26,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
private transient DruidDataSource dataSource; private transient DruidDataSource dataSource;
private static final int MAX_RETRIES = 3; // 最大重试次数 private static final int MAX_RETRIES = 3; // 最大重试次数
private static final int RETRY_DELAY_MS = 100; // 重试间隔时间 private static final int RETRY_DELAY_MS = 100; // 重试间隔时间
private static final int QUEUE_CAPACITY = 1000;
public MysqlDataTransferSink(EnvProperties envProps) { public MysqlDataTransferSink(EnvProperties envProps) {
this.envProps = envProps; this.envProps = envProps;
...@@ -36,7 +34,10 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -36,7 +34,10 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(8, 20, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); executorService = new ThreadPoolExecutor(8, 20, 60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
//初始化获取配置 //初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()); String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl); //System.out.println(configTidbUrl);
...@@ -54,31 +55,27 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -54,31 +55,27 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override @Override
public void close() throws Exception { public void close() throws Exception {
try {
if (executorService != null) {
executorService.shutdown(); executorService.shutdown();
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
}
} finally {
if (dataSource != null) {
dataSource.close(); dataSource.close();
} }
}
}
@Override @Override
public void invoke(String value, Context context) throws Exception { public void invoke(String value, Context context) throws Exception {
executorService.execute(() -> { executorService.submit(() -> {
executeSqlWithRetry(value); executeSqlWithRetry(value);
}); });
} }
/*private void executeSql(String sql){
Connection connection = null;
try {
connection = dataSource.getConnection();
SqlExecutor.execute(connection,sql);
} catch (Exception e) {
//logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
logger.error("异常信息:",e);
SqlErrorLog errorLog = new SqlErrorLog(new Date(), sql, e.getMessage());
writeErrLogDb(errorLog);
} finally {
DbUtil.close(connection);
}
}*/
private void executeSqlWithRetry(String sql) { private void executeSqlWithRetry(String sql) {
int retries = 0; int retries = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment