privatestaticfinalConfigOption<String>DRIVER=ConfigOptions.key("driver").stringType().noDefaultValue().withDescription("The class name of the JDBC driver to use to connect to this URL. If not set, it will automatically be derived from the URL.");
publicstaticfinalConfigOption<Duration>MAX_RETRY_TIMEOUT=ConfigOptions.key("connection.max-retry-timeout").durationType().defaultValue(Duration.ofSeconds(60L)).withDescription("Maximum timeout between retries.");
privatestaticfinalConfigOption<String>SCAN_PARTITION_COLUMN=ConfigOptions.key("scan.partition.column").stringType().noDefaultValue().withDescription("The column name used for partitioning the input.");
privatestaticfinalConfigOption<Integer>SCAN_PARTITION_NUM=ConfigOptions.key("scan.partition.num").intType().noDefaultValue().withDescription("The number of partitions.");
privatestaticfinalConfigOption<Long>SCAN_PARTITION_LOWER_BOUND=ConfigOptions.key("scan.partition.lower-bound").longType().noDefaultValue().withDescription("The smallest value of the first partition.");
privatestaticfinalConfigOption<Long>SCAN_PARTITION_UPPER_BOUND=ConfigOptions.key("scan.partition.upper-bound").longType().noDefaultValue().withDescription("The largest value of the last partition.");
privatestaticfinalConfigOption<Integer>SCAN_FETCH_SIZE=ConfigOptions.key("scan.fetch-size").intType().defaultValue(0).withDescription("Gives the reader a hint as to the number of rows that should be fetched from the database per round-trip when reading. If the value is zero, this hint is ignored.");
privatestaticfinalConfigOption<Boolean>SCAN_AUTO_COMMIT=ConfigOptions.key("scan.auto-commit").booleanType().defaultValue(true).withDescription("Sets whether the driver is in auto-commit mode.");
privatestaticfinalConfigOption<Long>LOOKUP_CACHE_MAX_ROWS=ConfigOptions.key("lookup.cache.max-rows").longType().defaultValue(-1L).withDescription("The max number of rows of lookup cache, over this value, the oldest rows will be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is specified.");
privatestaticfinalConfigOption<Duration>LOOKUP_CACHE_TTL=ConfigOptions.key("lookup.cache.ttl").durationType().defaultValue(Duration.ofSeconds(10L)).withDescription("The cache time to live.");
privatestaticfinalConfigOption<Integer>LOOKUP_MAX_RETRIES=ConfigOptions.key("lookup.max-retries").intType().defaultValue(3).withDescription("The max retry times if lookup database failed.");
privatestaticfinalConfigOption<Integer>SINK_BUFFER_FLUSH_MAX_ROWS=ConfigOptions.key("sink.buffer-flush.max-rows").intType().defaultValue(100).withDescription("The flush max size (includes all append, upsert and delete records), over this number of records, will flush data.");
privatestaticfinalConfigOption<Duration>SINK_BUFFER_FLUSH_INTERVAL=ConfigOptions.key("sink.buffer-flush.interval").durationType().defaultValue(Duration.ofSeconds(1L)).withDescription("The flush interval mills, over this time, asynchronous threads will flush data.");
privatestaticfinalConfigOption<Integer>SINK_MAX_RETRIES=ConfigOptions.key("sink.max-retries").intType().defaultValue(3).withDescription("The max retry times if writing records to database failed.");
thrownewIllegalArgumentException(String.format("'%s'='%s' must not be larger than '%s'='%s'.",SCAN_PARTITION_LOWER_BOUND.key(),lowerBound,SCAN_PARTITION_UPPER_BOUND.key(),upperBound));
thrownewIllegalArgumentException(String.format("The value of '%s' option shouldn't be negative, but is %s.",LOOKUP_MAX_RETRIES.key(),config.get(LOOKUP_MAX_RETRIES)));
}elseif((Integer)config.get(SINK_MAX_RETRIES)<0){
thrownewIllegalArgumentException(String.format("The value of '%s' option shouldn't be negative, but is %s.",SINK_MAX_RETRIES.key(),config.get(SINK_MAX_RETRIES)));
thrownewIllegalArgumentException(String.format("The value of '%s' option must be in second granularity and shouldn't be smaller than 1 second, but is %s.",MAX_RETRY_TIMEOUT.key(),config.get(ConfigOptions.key(MAX_RETRY_TIMEOUT.key()).stringType().noDefaultValue())));
Preconditions.checkArgument(configOptions.length==presentCount||presentCount==0,"Either all or none of the following options should be provided:\n"+String.join("\n",propertyNames));
thrownewIllegalArgumentException("open() failed. Parameter "+i+" of type "+param.getClass()+" is not handled (yet).");
}
this.statement.setArray(i+1,(Array)param);
}
}
if(LOG.isDebugEnabled()){
LOG.debug(String.format("Executing '%s' with parameters %s",this.queryTemplate,Arrays.deepToString(this.parameterValues[inputSplit.getSplitNumber()])));
Preconditions.checkArgument(fetchSize==-2147483648||fetchSize>0,"Illegal value %s for fetchSize, has to be positive or Integer.MIN_VALUE.",newObject[]{fetchSize});