Commit 0a0fd085 authored by liaowenwu's avatar liaowenwu

添加日志

parent fc995aee
...@@ -30,8 +30,9 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> { ...@@ -30,8 +30,9 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
EnvProperties envProps; EnvProperties envProps;
private static transient ExecutorService executorService; private static transient ExecutorService executorService;
private static transient DruidDataSource dataSource; private static transient DruidDataSource dataSource;
private static final int BATCH_SIZE = 1000; private static final int BATCH_SIZE = 2000;
private static final int FLUSH_INTERVAL = 500; private static final int FLUSH_INTERVAL = 500;
private static final int INIT_CAPACITY = (int)(BATCH_SIZE / 0.75 + 1);
private ArrayBlockingQueue<String> sqlBatch = new ArrayBlockingQueue<>(BATCH_SIZE * 2); private ArrayBlockingQueue<String> sqlBatch = new ArrayBlockingQueue<>(BATCH_SIZE * 2);
private static transient ScheduledExecutorService scheduledExecutorService; private static transient ScheduledExecutorService scheduledExecutorService;
private AtomicBoolean flushing = new AtomicBoolean(false); private AtomicBoolean flushing = new AtomicBoolean(false);
...@@ -42,7 +43,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> { ...@@ -42,7 +43,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(10, 10, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000)); executorService = new ThreadPoolExecutor(15, 20, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000));
// 初始化获取配置 // 初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()); String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
dataSource = new DruidDataSource(); dataSource = new DruidDataSource();
...@@ -50,8 +51,8 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> { ...@@ -50,8 +51,8 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
dataSource.setUsername(envProps.getDb_username()); dataSource.setUsername(envProps.getDb_username());
dataSource.setPassword(envProps.getDb_password()); dataSource.setPassword(envProps.getDb_password());
dataSource.setUrl(configTidbUrl); dataSource.setUrl(configTidbUrl);
dataSource.setMaxActive(30); dataSource.setMaxActive(50);
dataSource.setInitialSize(20); dataSource.setInitialSize(30);
dataSource.setTestWhileIdle(true); dataSource.setTestWhileIdle(true);
dataSource.setMaxWait(20000); dataSource.setMaxWait(20000);
dataSource.setValidationQuery("select 1"); dataSource.setValidationQuery("select 1");
...@@ -81,7 +82,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> { ...@@ -81,7 +82,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
flushing.set(false); flushing.set(false);
return; return;
} }
List<String> batch = new ArrayList<>(1334); List<String> batch = new ArrayList<>(INIT_CAPACITY);
sqlBatch.drainTo(batch, BATCH_SIZE); sqlBatch.drainTo(batch, BATCH_SIZE);
if (batch.isEmpty()) { if (batch.isEmpty()) {
flushing.set(false); flushing.set(false);
...@@ -103,7 +104,8 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> { ...@@ -103,7 +104,8 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
logger.error("事务回滚异常", ex); logger.error("事务回滚异常", ex);
} }
logger.error("------错误时间:{}-------------异常:", new Date(), e); logger.error("------错误时间:{}-------------异常:", new Date(), e);
SqlErrorLog errorLog = new SqlErrorLog(new Date(), String.join(";", batch), e.getMessage()); //SqlErrorLog errorLog = new SqlErrorLog(new Date(), String.join(";", batch), e.getMessage());
SqlErrorLog errorLog = new SqlErrorLog(new Date(), batch.get(0), e.getMessage());
writeErrLogDb(errorLog); writeErrLogDb(errorLog);
} }
} catch (Exception e) { } catch (Exception e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment