Commit 2ee59f1c authored by gaogao110's avatar gaogao110

Add flink connecotr phoenix 1.14

parent 6833b22a
...@@ -143,14 +143,15 @@ public class PhoenixInputFormat extends RichInputFormat<Row, InputSplit> ...@@ -143,14 +143,15 @@ public class PhoenixInputFormat extends RichInputFormat<Row, InputSplit>
dbConn.setAutoCommit(autoCommit); dbConn.setAutoCommit(autoCommit);
} }
LOG.info("openInputFormat query :" +queryTemplate); LOG.debug("openInputFormat query :" +queryTemplate);
LOG.info("openInputFormat resultSetType :" +resultSetType); //删除 ` 号 phoenix中不支持
LOG.info("openInputFormat resultSetConcurrency :" +resultSetConcurrency);
String initQuery = StringUtils.remove(queryTemplate, "\\`"); String initQuery = StringUtils.remove(queryTemplate, "\\`");
LOG.debug("openInputFormat initQuery :" +initQuery);
//将 " 双引号替换成 ' 单引号
String replaceQuery = StringUtils.replace(initQuery, "\"", "'");
LOG.info("openInputFormat replaceQuery :" +replaceQuery);
LOG.info("openInputFormat initQuery :" +initQuery); statement = dbConn.prepareStatement(replaceQuery, resultSetType, resultSetConcurrency);
statement = dbConn.prepareStatement(initQuery, resultSetType, resultSetConcurrency);
if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) { if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
statement.setFetchSize(fetchSize); statement.setFetchSize(fetchSize);
} }
......
...@@ -25,6 +25,12 @@ public class PhoenixDialect extends AbstractDialect { ...@@ -25,6 +25,12 @@ public class PhoenixDialect extends AbstractDialect {
private static final int MAX_DECIMAL_PRECISION = 65; private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1; private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
return null;
}
@Override @Override
public boolean canHandle(String url) { public boolean canHandle(String url) {
return url.startsWith("jdbc:phoenix:"); return url.startsWith("jdbc:phoenix:");
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-connectors</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-connector-phoenix-1.14</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.3</flink.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<!-- compile/provided-->
<phoenix.scope.runtime>provided</phoenix.scope.runtime>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${phoenix.scope.runtime}</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
<scope>${phoenix.scope.runtime}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- <dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.14.2-HBase-1.4</version>
<scope>${scope.runtime}</scope>
</dependency>-->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>5.0.0-HBase-2.0</version>
<scope>${scope.runtime}</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Optional;
/** JDBC connection options. */
@PublicEvolving
public class JdbcConnectionOptions implements Serializable {
private static final long serialVersionUID = 1L;
protected final String url;
@Nullable protected final String driverName;
protected final int connectionCheckTimeoutSeconds;
@Nullable protected final String username;
@Nullable protected final String password;
protected JdbcConnectionOptions(
String url,
String driverName,
String username,
String password,
int connectionCheckTimeoutSeconds) {
Preconditions.checkArgument(connectionCheckTimeoutSeconds > 0);
this.url = Preconditions.checkNotNull(url, "jdbc url is empty");
this.driverName = driverName;
this.username = username;
this.password = password;
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
}
public String getDbURL() {
return url;
}
@Nullable
public String getDriverName() {
return driverName;
}
public Optional<String> getUsername() {
return Optional.ofNullable(username);
}
public Optional<String> getPassword() {
return Optional.ofNullable(password);
}
public int getConnectionCheckTimeoutSeconds() {
return connectionCheckTimeoutSeconds;
}
/** Builder for {@link JdbcConnectionOptions}. */
public static class JdbcConnectionOptionsBuilder {
private String url;
private String driverName;
private String username;
private String password;
private int connectionCheckTimeoutSeconds = 60;
public JdbcConnectionOptionsBuilder withUrl(String url) {
this.url = url;
return this;
}
public JdbcConnectionOptionsBuilder withDriverName(String driverName) {
this.driverName = driverName;
return this;
}
public JdbcConnectionOptionsBuilder withUsername(String username) {
this.username = username;
return this;
}
public JdbcConnectionOptionsBuilder withPassword(String password) {
this.password = password;
return this;
}
/**
* Set the maximum timeout between retries, default is 60 seconds.
*
* @param connectionCheckTimeoutSeconds the timeout seconds, shouldn't smaller than 1
* second.
*/
public JdbcConnectionOptionsBuilder withConnectionCheckTimeoutSeconds(
int connectionCheckTimeoutSeconds) {
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
return this;
}
public JdbcConnectionOptions build() {
return new JdbcConnectionOptions(
url, driverName, username, password, connectionCheckTimeoutSeconds);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.Optional;
/**
* JDBC exactly once sink options.
*
* <p><b>maxCommitAttempts</b> - maximum number of commit attempts to make per transaction; must be
* > 0; state size is proportional to the product of max number of in-flight snapshots and this
* number.
*
* <p><b>allowOutOfOrderCommits</b> - If true, all prepared transactions will be attempted to commit
* regardless of any transient failures during this operation. This may lead to inconsistency.
* Default: false.
*
* <p><b>recoveredAndRollback</b> - whether to rollback prepared transactions known to XA RM on
* startup (after committing <b>known</b> transactions, i.e. restored from state).
*
* <p>NOTE that setting this parameter to true may:
*
* <ol>
* <li>interfere with other subtasks or applications (one subtask rolling back transactions
* prepared by the other one (and known to it))
* <li>block when using with some non-MVCC databases, if there are ended-not-prepared transactions
* </ol>
*
* <p>See also {@link org.apache.flink.connector.jdbc.xa.XaFacade#recover()}
*/
@PublicEvolving
public class JdbcExactlyOnceOptions implements Serializable {
private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = true;
private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
private static final boolean DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS = false;
public static final boolean DEFAULT_TRANSACTION_PER_CONNECTION = false;
private final boolean discoverAndRollbackOnRecovery;
private final int maxCommitAttempts;
private final boolean allowOutOfOrderCommits;
private final Integer timeoutSec;
private final boolean transactionPerConnection;
private JdbcExactlyOnceOptions(
boolean discoverAndRollbackOnRecovery,
int maxCommitAttempts,
boolean allowOutOfOrderCommits,
Optional<Integer> timeoutSec,
boolean transactionPerConnection) {
this.discoverAndRollbackOnRecovery = discoverAndRollbackOnRecovery;
this.maxCommitAttempts = maxCommitAttempts;
this.allowOutOfOrderCommits = allowOutOfOrderCommits;
this.timeoutSec = timeoutSec.orElse(null);
this.transactionPerConnection = transactionPerConnection;
Preconditions.checkArgument(this.maxCommitAttempts > 0, "maxCommitAttempts should be > 0");
}
public static JdbcExactlyOnceOptions defaults() {
return builder().build();
}
public boolean isDiscoverAndRollbackOnRecovery() {
return discoverAndRollbackOnRecovery;
}
public boolean isAllowOutOfOrderCommits() {
return allowOutOfOrderCommits;
}
public int getMaxCommitAttempts() {
return maxCommitAttempts;
}
public Integer getTimeoutSec() {
return timeoutSec;
}
public boolean isTransactionPerConnection() {
return transactionPerConnection;
}
public static JDBCExactlyOnceOptionsBuilder builder() {
return new JDBCExactlyOnceOptionsBuilder();
}
/** JDBCExactlyOnceOptionsBuilder. */
public static class JDBCExactlyOnceOptionsBuilder {
private boolean recoveredAndRollback = DEFAULT_RECOVERED_AND_ROLLBACK;
private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
private boolean allowOutOfOrderCommits = DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS;
private Optional<Integer> timeoutSec = Optional.empty();
private boolean transactionPerConnection = DEFAULT_TRANSACTION_PER_CONNECTION;
/**
* Toggle discovery and rollback of prepared transactions upon recovery to prevent new
* transactions from being blocked by the older ones. Each subtask rollbacks its own
* transaction. This flag must be disabled when rescaling to prevent data loss.
*/
public JDBCExactlyOnceOptionsBuilder withRecoveredAndRollback(
boolean recoveredAndRollback) {
this.recoveredAndRollback = recoveredAndRollback;
return this;
}
/**
* Set the number of attempt to commit a transaction (takes effect only if transient failure
* happens).
*/
public JDBCExactlyOnceOptionsBuilder withMaxCommitAttempts(int maxCommitAttempts) {
this.maxCommitAttempts = maxCommitAttempts;
return this;
}
/**
* Set whether transactions may be committed out-of-order in case of retries and this option
* is enabled.
*/
public JDBCExactlyOnceOptionsBuilder withAllowOutOfOrderCommits(
boolean allowOutOfOrderCommits) {
this.allowOutOfOrderCommits = allowOutOfOrderCommits;
return this;
}
/** Set transaction timeout in seconds (vendor-specific). */
public JDBCExactlyOnceOptionsBuilder withTimeoutSec(Optional<Integer> timeoutSec) {
this.timeoutSec = timeoutSec;
return this;
}
/**
* Set whether the same connection can be used for multiple XA transactions. A transaction
* is prepared each time a checkpoint is performed; it is committed once the checkpoint is
* confirmed. There can be multiple un-confirmed checkpoints and therefore multiple prepared
* transactions.
*
* <p>Some databases support this natively (e.g. Oracle); while others only allow a single
* XA transaction per connection (e.g. MySQL, PostgreSQL).
*
* <p>If enabled, each transaction uses a separate connection from a pool. The database
* limit of open connections might need to be adjusted.
*
* <p>Disabled by default.
*/
public JDBCExactlyOnceOptionsBuilder withTransactionPerConnection(
boolean transactionPerConnection) {
this.transactionPerConnection = transactionPerConnection;
return this;
}
public JdbcExactlyOnceOptions build() {
return new JdbcExactlyOnceOptions(
recoveredAndRollback,
maxCommitAttempts,
allowOutOfOrderCommits,
timeoutSec,
transactionPerConnection);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.Objects;
/** JDBC sink batch options. */
@PublicEvolving
public class JdbcExecutionOptions implements Serializable {
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
private static final int DEFAULT_INTERVAL_MILLIS = 0;
public static final int DEFAULT_SIZE = 5000;
private final long batchIntervalMs;
private final int batchSize;
private final int maxRetries;
private JdbcExecutionOptions(long batchIntervalMs, int batchSize, int maxRetries) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchIntervalMs = batchIntervalMs;
this.batchSize = batchSize;
this.maxRetries = maxRetries;
}
public long getBatchIntervalMs() {
return batchIntervalMs;
}
public int getBatchSize() {
return batchSize;
}
public int getMaxRetries() {
return maxRetries;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JdbcExecutionOptions that = (JdbcExecutionOptions) o;
return batchIntervalMs == that.batchIntervalMs
&& batchSize == that.batchSize
&& maxRetries == that.maxRetries;
}
@Override
public int hashCode() {
return Objects.hash(batchIntervalMs, batchSize, maxRetries);
}
public static Builder builder() {
return new Builder();
}
public static JdbcExecutionOptions defaults() {
return builder().build();
}
/** Builder for {@link JdbcExecutionOptions}. */
public static final class Builder {
private long intervalMs = DEFAULT_INTERVAL_MILLIS;
private int size = DEFAULT_SIZE;
private int maxRetries = DEFAULT_MAX_RETRY_TIMES;
public Builder withBatchSize(int size) {
this.size = size;
return this;
}
public Builder withBatchIntervalMs(long intervalMs) {
this.intervalMs = intervalMs;
return this;
}
public Builder withMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}
public JdbcExecutionOptions build() {
return new JdbcExecutionOptions(intervalMs, size, maxRetries);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.phoenix.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.util.function.BiConsumerWithException;
import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of
* StreamRecord.
*
* @param <T> type of payload in {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord
* StreamRecord}
* @see JdbcBatchStatementExecutor
*/
@PublicEvolving
public interface JdbcStatementBuilder<T>
extends BiConsumerWithException<PreparedStatement, T, SQLException>, Serializable {}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.dialect;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import java.util.List;
abstract class AbstractDialect implements JdbcDialect {
@Override
public void validate(TableSchema schema) throws ValidationException {
for (int i = 0; i < schema.getFieldCount(); i++) {
DataType dt = schema.getFieldDataType(i).get();
String fieldName = schema.getFieldName(i).get();
// TODO: We can't convert VARBINARY(n) data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter
// when n is smaller than Integer.MAX_VALUE
if (unsupportedTypes().contains(dt.getLogicalType().getTypeRoot())
|| (dt.getLogicalType() instanceof VarBinaryType
&& Integer.MAX_VALUE
!= ((VarBinaryType) dt.getLogicalType()).getLength())) {
throw new ValidationException(
String.format(
"The %s dialect doesn't support type: %s.",
dialectName(), dt.toString()));
}
// only validate precision of DECIMAL type for blink planner
if (dt.getLogicalType() instanceof DecimalType) {
int precision = ((DecimalType) dt.getLogicalType()).getPrecision();
if (precision > maxDecimalPrecision() || precision < minDecimalPrecision()) {
throw new ValidationException(
String.format(
"The precision of field '%s' is out of the DECIMAL "
+ "precision range [%d, %d] supported by %s dialect.",
fieldName,
minDecimalPrecision(),
maxDecimalPrecision(),
dialectName()));
}
}
// only validate precision of DECIMAL type for blink planner
if (dt.getLogicalType() instanceof TimestampType) {
int precision = ((TimestampType) dt.getLogicalType()).getPrecision();
if (precision > maxTimestampPrecision() || precision < minTimestampPrecision()) {
throw new ValidationException(
String.format(
"The precision of field '%s' is out of the TIMESTAMP "
+ "precision range [%d, %d] supported by %s dialect.",
fieldName,
minTimestampPrecision(),
maxTimestampPrecision(),
dialectName()));
}
}
}
}
public abstract int maxDecimalPrecision();
public abstract int minDecimalPrecision();
public abstract int maxTimestampPrecision();
public abstract int minTimestampPrecision();
/**
* Defines the unsupported types for the dialect.
*
* @return a list of logical type roots.
*/
public abstract List<LogicalTypeRoot> unsupportedTypes();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.dialect;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.lang.String.format;
/** Handle the SQL dialect of jdbc driver. */
@Internal
public interface JdbcDialect extends Serializable {
/**
* Get the name of jdbc dialect.
*
* @return the dialect name.
*/
String dialectName();
/**
* Check if this dialect instance can handle a certain jdbc url.
*
* @param url the jdbc url.
* @return True if the dialect can be applied on the given jdbc url.
*/
boolean canHandle(String url);
/**
* Get converter that convert jdbc object and Flink internal object each other.
*
* @param rowType the given row type
* @return a row converter for the database
*/
JdbcRowConverter getRowConverter(RowType rowType);
/**
* Get limit clause to limit the number of emitted row from the jdbc source.
*
* @param limit number of row to emit. The value of the parameter should be non-negative.
* @return the limit clause.
*/
String getLimitClause(long limit);
/**
* Check if this dialect instance support a specific data type in table schema.
*
* @param schema the table schema.
* @exception ValidationException in case of the table schema contains unsupported type.
*/
default void validate(TableSchema schema) throws ValidationException {}
/**
* @return the default driver class name, if user not configure the driver class name, then will
* use this one.
*/
default Optional<String> defaultDriverName() {
return Optional.empty();
}
/**
* Quotes the identifier. This is used to put quotes around the identifier in case the column
* name is a reserved keyword, or in case it contains characters that require quotes (e.g.
* space). Default using double quotes {@code "} to quote.
*/
default String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}
/**
* Get dialect upsert statement, the database has its own upsert syntax, such as Mysql using
* DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO UPDATE SET..
*
* @return None if dialect does not support upsert statement, the writer will degrade to the use
* of select + update/insert, this performance is poor.
*/
default Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
/** Get row exists statement by condition fields. Default use SELECT. */
default String getRowExistsStatement(String tableName, String[] conditionFields) {
String fieldExpressions =
Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
return "SELECT 1 FROM " + quoteIdentifier(tableName) + " WHERE " + fieldExpressions;
}
/** Get insert into statement. */
default String getInsertIntoStatement(String tableName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "INSERT INTO "
+ quoteIdentifier(tableName)
+ "("
+ columns
+ ")"
+ " VALUES ("
+ placeholders
+ ")";
}
/**
* Get update one row statement by condition fields, default not use limit 1, because limit 1 is
* a sql dialect.
*/
default String getUpdateStatement(
String tableName, String[] fieldNames, String[] conditionFields) {
String setClause =
Arrays.stream(fieldNames)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(", "));
String conditionClause =
Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
return "UPDATE "
+ quoteIdentifier(tableName)
+ " SET "
+ setClause
+ " WHERE "
+ conditionClause;
}
/**
* Get delete one row statement by condition fields, default not use limit 1, because limit 1 is
* a sql dialect.
*/
default String getDeleteStatement(String tableName, String[] conditionFields) {
String conditionClause =
Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
}
/** Get select fields statement by condition fields. Default use SELECT. */
default String getSelectFromStatement(
String tableName, String[] selectFields, String[] conditionFields) {
String selectExpressions =
Arrays.stream(selectFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String fieldExpressions =
Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
return "SELECT "
+ selectExpressions
+ " FROM "
+ quoteIdentifier(tableName)
+ (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.dialect;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/** Default JDBC dialects. */
public final class JdbcDialects {
private static final List<JdbcDialect> DIALECTS =
Arrays.asList(new PhoenixDialect());
/** Fetch the JdbcDialect class corresponding to a given database url. */
public static Optional<JdbcDialect> get(String url) {
for (JdbcDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) {
return Optional.of(dialect);
}
}
return Optional.empty();
}
}
package org.apache.flink.connector.phoenix.dialect;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.phoenix.internal.converter.PhoenixRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* PhoenixDialect
*
* @author gy
* @since 2022/3/16 11:19
**/
public class PhoenixDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:phoenix:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new PhoenixRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("org.apache.phoenix.jdbc.PhoenixDriver");
}
/**
* phoenix不支持 ` 号
* 不加任何 " ` 号 在列名以及表名上,否则会导致phoenix解析错误
* @param identifier
* @return
*/
@Override
public String quoteIdentifier(String identifier) {
//return "`" + identifier + "`";
//return super.quoteIdentifier(identifier);
return identifier;
}
@Override
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String columns = (String) Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(", "));
String placeholders = (String) Arrays.stream(fieldNames).map((f) -> {
return ":" + f;
}).collect(Collectors.joining(", "));
String sql = "UPSERT INTO " + this.quoteIdentifier(tableName) + "(" + columns + ") VALUES (" + placeholders + ")";
return Optional.of(sql);
}
@Override
public String getInsertIntoStatement(String tableName, String[] fieldNames) {
return this.getUpsertStatement(tableName,fieldNames,null).get();
}
@Override
public String dialectName() {
return "Phoenix";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Flushable;
import java.io.IOException;
import java.sql.Connection;
/** Base jdbc outputFormat. */
public abstract class AbstractJdbcOutputFormat<T> extends RichOutputFormat<T> implements Flushable {
private static final long serialVersionUID = 1L;
public static final int DEFAULT_FLUSH_MAX_SIZE = 5000;
public static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0L;
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcOutputFormat.class);
protected final JdbcConnectionProvider connectionProvider;
public AbstractJdbcOutputFormat(JdbcConnectionProvider connectionProvider) {
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
}
@Override
public void configure(Configuration parameters) {}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
connectionProvider.getOrEstablishConnection();
} catch (Exception e) {
throw new IOException("unable to open JDBC writer", e);
}
}
@Override
public void close() {
connectionProvider.closeConnection();
}
@Override
public void flush() throws IOException {}
//@VisibleForTesting
public Connection getConnection() {
return connectionProvider.getConnection();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import java.io.IOException;
/** A generic SinkFunction for JDBC. */
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
private final AbstractJdbcOutputFormat<T> outputFormat;
public GenericJdbcSinkFunction(@Nonnull AbstractJdbcOutputFormat<T> outputFormat) {
this.outputFormat = Preconditions.checkNotNull(outputFormat);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@Override
public void invoke(T value, Context context) throws IOException {
outputFormat.writeRecord(value);
}
@Override
public void initializeState(FunctionInitializationContext context) {}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
outputFormat.flush();
}
@Override
public void close() {
outputFormat.close();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.phoenix.JdbcExecutionOptions;
import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.executor.InsertOrUpdateJdbcExecutor;
import org.apache.flink.connector.phoenix.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.phoenix.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.function.Function;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.getPrimaryKey;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkArgument;
class TableJdbcUpsertOutputFormat
extends JdbcBatchingOutputFormat<
Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> {
private static final Logger LOG = LoggerFactory.getLogger(TableJdbcUpsertOutputFormat.class);
private JdbcBatchStatementExecutor<Row> deleteExecutor;
private final StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
deleteStatementExecutorFactory;
private Connection conn = null;
TableJdbcUpsertOutputFormat(
JdbcConnectionProvider connectionProvider,
JdbcDmlOptions dmlOptions,
JdbcExecutionOptions batchOptions) {
this(
connectionProvider,
batchOptions,
ctx -> createUpsertRowExecutor(dmlOptions, ctx),
ctx -> createDeleteExecutor(dmlOptions, ctx));
}
@VisibleForTesting
TableJdbcUpsertOutputFormat(
JdbcConnectionProvider connectionProvider,
JdbcExecutionOptions batchOptions,
StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> statementExecutorFactory,
StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
deleteStatementExecutorFactory) {
super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1);
this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
try {
conn = connectionProvider.getOrEstablishConnection();
} catch (Exception e) {
throw new IOException("unable to open JDBC writer", e);
}
deleteExecutor = deleteStatementExecutorFactory.apply(getRuntimeContext());
try {
deleteExecutor.prepareStatements(connectionProvider.getConnection());
} catch (SQLException e) {
throw new IOException(e);
}
}
private static JdbcBatchStatementExecutor<Row> createDeleteExecutor(
JdbcDmlOptions dmlOptions, RuntimeContext ctx) {
int[] pkFields =
Arrays.stream(dmlOptions.getFieldNames())
.mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf)
.toArray();
int[] pkTypes =
dmlOptions.getFieldTypes() == null
? null
: Arrays.stream(pkFields).map(f -> dmlOptions.getFieldTypes()[f]).toArray();
String deleteSql =
FieldNamedPreparedStatementImpl.parseNamedStatement(
dmlOptions
.getDialect()
.getDeleteStatement(
dmlOptions.getTableName(), dmlOptions.getFieldNames()),
new HashMap<>());
return createKeyedRowExecutor(pkFields, pkTypes, deleteSql);
}
@Override
protected void addToBatch(Tuple2<Boolean, Row> original, Row extracted) throws SQLException {
if (original.f0) {
super.addToBatch(original, extracted);
} else {
deleteExecutor.addToBatch(extracted);
}
}
@Override
public synchronized void close() {
try {
super.close();
} finally {
try {
if (deleteExecutor != null) {
deleteExecutor.closeStatements();
}
} catch (SQLException e) {
LOG.warn("unable to close delete statement runner", e);
}
}
}
@Override
protected void attemptFlush() throws SQLException {
super.attemptFlush();
deleteExecutor.executeBatch(conn);
}
@Override
public void updateExecutor(boolean reconnect) throws SQLException, ClassNotFoundException {
super.updateExecutor(reconnect);
deleteExecutor.closeStatements();
deleteExecutor.prepareStatements(connectionProvider.getConnection());
}
private static JdbcBatchStatementExecutor<Row> createKeyedRowExecutor(
int[] pkFields, int[] pkTypes, String sql) {
return JdbcBatchStatementExecutor.keyed(
sql,
createRowKeyExtractor(pkFields),
(st, record) ->
setRecordToStatement(
st, pkTypes, createRowKeyExtractor(pkFields).apply(record)));
}
private static JdbcBatchStatementExecutor<Row> createUpsertRowExecutor(
JdbcDmlOptions opt, RuntimeContext ctx) {
checkArgument(opt.getKeyFields().isPresent());
int[] pkFields =
Arrays.stream(opt.getKeyFields().get())
.mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
.toArray();
int[] pkTypes =
opt.getFieldTypes() == null
? null
: Arrays.stream(pkFields).map(f -> opt.getFieldTypes()[f]).toArray();
return opt.getDialect()
.getUpsertStatement(
opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get())
.map(
sql ->
createSimpleRowExecutor(
parseNamedStatement(sql),
opt.getFieldTypes(),
ctx.getExecutionConfig().isObjectReuseEnabled()))
.orElseGet(
() ->
new InsertOrUpdateJdbcExecutor<>(
parseNamedStatement(
opt.getDialect()
.getRowExistsStatement(
opt.getTableName(),
opt.getKeyFields().get())),
parseNamedStatement(
opt.getDialect()
.getInsertIntoStatement(
opt.getTableName(),
opt.getFieldNames())),
parseNamedStatement(
opt.getDialect()
.getUpdateStatement(
opt.getTableName(),
opt.getFieldNames(),
opt.getKeyFields().get())),
createRowJdbcStatementBuilder(pkTypes),
createRowJdbcStatementBuilder(opt.getFieldTypes()),
createRowJdbcStatementBuilder(opt.getFieldTypes()),
createRowKeyExtractor(pkFields),
ctx.getExecutionConfig().isObjectReuseEnabled()
? Row::copy
: Function.identity()));
}
private static String parseNamedStatement(String statement) {
return FieldNamedPreparedStatementImpl.parseNamedStatement(statement, new HashMap<>());
}
private static Function<Row, Row> createRowKeyExtractor(int[] pkFields) {
return row -> getPrimaryKey(row, pkFields);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.connection;
import org.apache.flink.annotation.Internal;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.SQLException;
/** JDBC connection provider. */
@Internal
public interface JdbcConnectionProvider {
/**
* Get existing connection.
*
* @return existing connection
*/
@Nullable
Connection getConnection();
/**
* Check whether possible existing connection is valid or not through {@link
* Connection#isValid(int)}.
*
* @return true if existing connection is valid
* @throws SQLException sql exception throw from {@link Connection#isValid(int)}
*/
boolean isConnectionValid() throws SQLException;
/**
* Get existing connection or establish an new one if there is none.
*
* @return existing connection or newly established connection
* @throws SQLException sql exception
* @throws ClassNotFoundException driver class not found
*/
Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;
/** Close possible existing connection. */
void closeConnection();
/**
* Close possible existing connection and establish an new one.
*
* @return newly established connection
* @throws SQLException sql exception
* @throws ClassNotFoundException driver class not found
*/
Connection reestablishConnection() throws SQLException, ClassNotFoundException;
}
package org.apache.flink.connector.phoenix.internal.connection;
import org.apache.flink.connector.phoenix.JdbcConnectionOptions;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Enumeration;
import java.util.Properties;
/**
* PhoneixJdbcConnectionProvider
*
* @author gy
* @since 2022/3/17 9:04
**/
public class PhoneixJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(PhoneixJdbcConnectionProvider.class);
private static final long serialVersionUID = 1L;
private final JdbcConnectionOptions jdbcOptions;
private transient Driver loadedDriver;
private transient Connection connection;
private Boolean namespaceMappingEnabled;
private Boolean mapSystemTablesEnabled;
public PhoneixJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) {
this.jdbcOptions = jdbcOptions;
}
public PhoneixJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions, boolean namespaceMappingEnabled, boolean mapSystemTablesEnabled) {
this.jdbcOptions = jdbcOptions;
this.namespaceMappingEnabled = namespaceMappingEnabled;
this.mapSystemTablesEnabled = mapSystemTablesEnabled;
}
public Connection getConnection() {
return this.connection;
}
public boolean isConnectionValid() throws SQLException {
return this.connection != null && this.connection.isValid(this.jdbcOptions.getConnectionCheckTimeoutSeconds());
}
private static Driver loadDriver(String driverName) throws SQLException, ClassNotFoundException {
Preconditions.checkNotNull(driverName);
Enumeration drivers = DriverManager.getDrivers();
Driver driver;
do {
if (!drivers.hasMoreElements()) {
Class clazz = Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
try {
return (Driver) clazz.newInstance();
} catch (Exception var4) {
throw new SQLException("Fail to create driver of class " + driverName, var4);
}
}
driver = (Driver) drivers.nextElement();
} while (!driver.getClass().getName().equals(driverName));
return driver;
}
private Driver getLoadedDriver() throws SQLException, ClassNotFoundException {
if (this.loadedDriver == null) {
this.loadedDriver = loadDriver(this.jdbcOptions.getDriverName());
}
return this.loadedDriver;
}
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (this.connection != null) {
return this.connection;
} else {
if (this.jdbcOptions.getDriverName() == null) {
this.connection = DriverManager.getConnection(this.jdbcOptions.getDbURL(), (String) this.jdbcOptions.getUsername().orElse((String) null), (String) this.jdbcOptions.getPassword().orElse((String) null));
} else {
Driver driver = this.getLoadedDriver();
Properties info = new Properties();
this.jdbcOptions.getUsername().ifPresent((user) -> {
info.setProperty("user", user);
});
this.jdbcOptions.getPassword().ifPresent((password) -> {
info.setProperty("password", password);
});
if (this.namespaceMappingEnabled && this.mapSystemTablesEnabled) {
info.setProperty("phoenix.schema.isNamespaceMappingEnabled", "true");
info.setProperty("phoenix.schema.mapSystemTablesToNamespace", "true");
}
this.connection = driver.connect(this.jdbcOptions.getDbURL(), info);
this.connection.setAutoCommit(false);
if (this.connection == null) {
throw new SQLException("No suitable driver found for " + this.jdbcOptions.getDbURL(), "08001");
}
}
return this.connection;
}
}
public void closeConnection() {
if (this.connection != null) {
try {
this.connection.close();
} catch (SQLException var5) {
LOG.warn("JDBC connection close failed.", var5);
} finally {
this.connection = null;
}
}
}
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
this.closeConnection();
return this.getOrEstablishConnection();
}
static {
DriverManager.getDrivers();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.converter;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
import org.apache.flink.table.data.RowData;
import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* Converter that is responsible to convert between JDBC object and Flink SQL internal data
* structure {@link RowData}.
*/
public interface JdbcRowConverter extends Serializable {
/**
* Convert data retrieved from {@link ResultSet} to internal {@link RowData}.
*
* @param resultSet ResultSet from JDBC
*/
RowData toInternal(ResultSet resultSet) throws SQLException;
/**
* Convert data retrieved from Flink internal RowData to JDBC Object.
*
* @param rowData The given internal {@link RowData}.
* @param statement The statement to be filled.
* @return The filled statement.
*/
FieldNamedPreparedStatement toExternal(RowData rowData, FieldNamedPreparedStatement statement)
throws SQLException;
}
package org.apache.flink.connector.phoenix.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* PhoenixRowConverter
*
* @author gy
* @since 2022/3/16 11:21
**/
public class PhoenixRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "Phoenix";
}
public PhoenixRowConverter(RowType rowType) {
super(rowType);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.JdbcStatementBuilder;
import org.apache.flink.connector.phoenix.table.PhoenixUpsertTableSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* {@link JdbcBatchStatementExecutor} that provides upsert semantics by updating row if it exists
* and inserting otherwise. Used in Table API.
*
* @deprecated This has been replaced with {@link TableInsertOrUpdateStatementExecutor}, will remove
* this once {@link PhoenixUpsertTableSink} is removed.
*/
@Internal
public final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchStatementExecutor<R> {
private static final Logger LOG = LoggerFactory.getLogger(InsertOrUpdateJdbcExecutor.class);
private final String existSQL;
private final String insertSQL;
private final String updateSQL;
private final JdbcStatementBuilder<K> existSetter;
private final JdbcStatementBuilder<V> insertSetter;
private final JdbcStatementBuilder<V> updateSetter;
private final Function<R, K> keyExtractor;
private final Function<R, V> valueMapper;
private final Map<K, V> batch;
private transient PreparedStatement existStatement;
private transient PreparedStatement insertStatement;
private transient PreparedStatement updateStatement;
public InsertOrUpdateJdbcExecutor(
@Nonnull String existSQL,
@Nonnull String insertSQL,
@Nonnull String updateSQL,
@Nonnull JdbcStatementBuilder<K> existSetter,
@Nonnull JdbcStatementBuilder<V> insertSetter,
@Nonnull JdbcStatementBuilder<V> updateSetter,
@Nonnull Function<R, K> keyExtractor,
@Nonnull Function<R, V> valueExtractor) {
this.existSQL = checkNotNull(existSQL);
this.updateSQL = checkNotNull(updateSQL);
this.existSetter = checkNotNull(existSetter);
this.insertSQL = checkNotNull(insertSQL);
this.insertSetter = checkNotNull(insertSetter);
this.updateSetter = checkNotNull(updateSetter);
this.keyExtractor = checkNotNull(keyExtractor);
this.valueMapper = checkNotNull(valueExtractor);
this.batch = new HashMap<>();
}
@Override
public void prepareStatements(Connection connection) throws SQLException {
existStatement = connection.prepareStatement(existSQL);
insertStatement = connection.prepareStatement(insertSQL);
updateStatement = connection.prepareStatement(updateSQL);
}
@Override
public void addToBatch(R record) {
batch.put(keyExtractor.apply(record), valueMapper.apply(record));
}
@Override
public void executeBatch(Connection conn) throws SQLException {
if (!batch.isEmpty()) {
for (Map.Entry<K, V> entry : batch.entrySet()) {
processOneRowInBatch(entry.getKey(), entry.getValue());
}
conn.commit();
batch.clear();
}
}
private void processOneRowInBatch(K pk, V row) throws SQLException {
if (exist(pk)) {
updateSetter.accept(updateStatement, row);
updateStatement.executeUpdate();
} else {
insertSetter.accept(insertStatement, row);
insertStatement.executeUpdate();
}
}
private boolean exist(K pk) throws SQLException {
existSetter.accept(existStatement, pk);
try (ResultSet resultSet = existStatement.executeQuery()) {
return resultSet.next();
}
}
@Override
public void closeStatements() throws SQLException {
for (PreparedStatement s :
Arrays.asList(existStatement, insertStatement, updateStatement)) {
if (s != null) {
s.close();
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.JdbcStatementBuilder;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.function.Function;
/** Executes the given JDBC statement in batch for the accumulated records. */
@Internal
public interface JdbcBatchStatementExecutor<T> {
/** Create statements from connection. */
void prepareStatements(Connection connection) throws SQLException;
void addToBatch(T record) throws SQLException;
/** Submits a batch of commands to the database for execution.
* @param conn*/
void executeBatch(Connection conn) throws SQLException;
/** Close JDBC related statements. */
void closeStatements() throws SQLException;
static <T, K> JdbcBatchStatementExecutor<T> keyed(
String sql, Function<T, K> keyExtractor, JdbcStatementBuilder<K> statementBuilder) {
return new KeyedBatchStatementExecutor<>(sql, keyExtractor, statementBuilder);
}
static <T, V> JdbcBatchStatementExecutor<T> simple(
String sql, JdbcStatementBuilder<V> paramSetter, Function<T, V> valueTransformer) {
return new SimpleBatchStatementExecutor<>(sql, paramSetter, valueTransformer);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.connector.phoenix.JdbcStatementBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;
/**
* A {@link JdbcBatchStatementExecutor} that extracts SQL keys from the supplied stream elements and
* executes a SQL query for them.
*/
class KeyedBatchStatementExecutor<T, K> implements JdbcBatchStatementExecutor<T> {
private static final Logger LOG = LoggerFactory.getLogger(KeyedBatchStatementExecutor.class);
private final String sql;
private final JdbcStatementBuilder<K> parameterSetter;
private final Function<T, K> keyExtractor;
private final Set<K> batch;
private transient PreparedStatement st;
/**
* Keep in mind object reuse: if it's on then key extractor may be required to return new
* object.
*/
KeyedBatchStatementExecutor(
String sql, Function<T, K> keyExtractor, JdbcStatementBuilder<K> statementBuilder) {
this.parameterSetter = statementBuilder;
this.keyExtractor = keyExtractor;
this.sql = sql;
this.batch = new HashSet<>();
}
@Override
public void prepareStatements(Connection connection) throws SQLException {
st = connection.prepareStatement(sql);
}
@Override
public void addToBatch(T record) {
batch.add(keyExtractor.apply(record));
}
@Override
public void executeBatch(Connection conn) throws SQLException {
if (!batch.isEmpty()) {
for (K entry : batch) {
parameterSetter.accept(st, entry);
st.executeUpdate();
}
LOG.info("connection commit datasize:" + batch.size());
conn.commit();
batch.clear();
}
}
@Override
public void closeStatements() throws SQLException {
if (st != null) {
st.close();
st = null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.connector.phoenix.JdbcStatementBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
/**
* A {@link JdbcBatchStatementExecutor} that executes supplied statement for given the records
* (without any pre-processing).
*/
class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T> {
private static final Logger LOG = LoggerFactory.getLogger(SimpleBatchStatementExecutor.class);
private final String sql;
private final JdbcStatementBuilder<V> parameterSetter;
private final Function<T, V> valueTransformer;
private final List<V> batch;
private transient PreparedStatement st;
SimpleBatchStatementExecutor(
String sql, JdbcStatementBuilder<V> statementBuilder, Function<T, V> valueTransformer) {
this.sql = sql;
this.parameterSetter = statementBuilder;
this.valueTransformer = valueTransformer;
this.batch = new ArrayList<>();
}
@Override
public void prepareStatements(Connection connection) throws SQLException {
this.st = connection.prepareStatement(sql);
}
@Override
public void addToBatch(T record) {
batch.add(valueTransformer.apply(record));
}
@Override
public void executeBatch(Connection connection) throws SQLException {
if (!batch.isEmpty()) {
for (V r : batch) {
parameterSetter.accept(st, r);
st.executeUpdate();
}
LOG.info("connection commit dataSize:" + batch.size());
connection.commit();
batch.clear();
}
}
@Override
public void closeStatements() throws SQLException {
if (st != null) {
st.close();
st = null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
/**
* Currently, this statement executor is only used for table/sql to buffer insert/update/delete
* events, and reduce them in buffer before submit to external database.
*/
public final class TableBufferReducedStatementExecutor
implements JdbcBatchStatementExecutor<RowData> {
private final JdbcBatchStatementExecutor<RowData> upsertExecutor;
private final JdbcBatchStatementExecutor<RowData> deleteExecutor;
private final Function<RowData, RowData> keyExtractor;
private final Function<RowData, RowData> valueTransform;
// the mapping is [KEY, <+/-, VALUE>]
private final Map<RowData, Tuple2<Boolean, RowData>> reduceBuffer = new HashMap<>();
public TableBufferReducedStatementExecutor(
JdbcBatchStatementExecutor<RowData> upsertExecutor,
JdbcBatchStatementExecutor<RowData> deleteExecutor,
Function<RowData, RowData> keyExtractor,
Function<RowData, RowData> valueTransform) {
this.upsertExecutor = upsertExecutor;
this.deleteExecutor = deleteExecutor;
this.keyExtractor = keyExtractor;
this.valueTransform = valueTransform;
}
@Override
public void prepareStatements(Connection connection) throws SQLException {
upsertExecutor.prepareStatements(connection);
deleteExecutor.prepareStatements(connection);
}
@Override
public void addToBatch(RowData record) throws SQLException {
RowData key = keyExtractor.apply(record);
boolean flag = changeFlag(record.getRowKind());
RowData value = valueTransform.apply(record); // copy or not
reduceBuffer.put(key, Tuple2.of(flag, value));
}
/**
* Returns true if the row kind is INSERT or UPDATE_AFTER, returns false if the row kind is
* DELETE or UPDATE_BEFORE.
*/
private boolean changeFlag(RowKind rowKind) {
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
return true;
case DELETE:
case UPDATE_BEFORE:
return false;
default:
throw new UnsupportedOperationException(
String.format(
"Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
+ " DELETE, but get: %s.",
rowKind));
}
}
@Override
public void executeBatch(Connection conn) throws SQLException {
for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
if (entry.getValue().f0) {
upsertExecutor.addToBatch(entry.getValue().f1);
} else {
// delete by key
deleteExecutor.addToBatch(entry.getKey());
}
}
upsertExecutor.executeBatch(conn);
deleteExecutor.executeBatch(conn);
reduceBuffer.clear();
}
@Override
public void closeStatements() throws SQLException {
upsertExecutor.closeStatements();
deleteExecutor.closeStatements();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.table.data.RowData;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
/**
* Currently, this statement executor is only used for table/sql to buffer records, because the
* {@link PreparedStatement#executeBatch()} may fail and clear buffered records, so we have to
* buffer the records and replay the records when retrying {@link JdbcBatchStatementExecutor#executeBatch(Connection)}.
*/
public final class TableBufferedStatementExecutor implements JdbcBatchStatementExecutor<RowData> {
private final JdbcBatchStatementExecutor<RowData> statementExecutor;
private final Function<RowData, RowData> valueTransform;
private final List<RowData> buffer = new ArrayList<>();
public TableBufferedStatementExecutor(
JdbcBatchStatementExecutor<RowData> statementExecutor,
Function<RowData, RowData> valueTransform) {
this.statementExecutor = statementExecutor;
this.valueTransform = valueTransform;
}
@Override
public void prepareStatements(Connection connection) throws SQLException {
statementExecutor.prepareStatements(connection);
}
@Override
public void addToBatch(RowData record) throws SQLException {
RowData value = valueTransform.apply(record); // copy or not
buffer.add(value);
}
@Override
public void executeBatch(Connection conn) throws SQLException {
for (RowData value : buffer) {
statementExecutor.addToBatch(value);
}
statementExecutor.executeBatch(conn);
buffer.clear();
}
@Override
public void closeStatements() throws SQLException {
statementExecutor.closeStatements();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
import org.apache.flink.connector.phoenix.statement.StatementFactory;
import org.apache.flink.table.data.RowData;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* {@link JdbcBatchStatementExecutor} that provides upsert semantics by updating row if it exists
* and inserting otherwise. Only used in Table/SQL API.
*/
@Internal
public final class TableInsertOrUpdateStatementExecutor
implements JdbcBatchStatementExecutor<RowData> {
private final StatementFactory existStmtFactory;
private final StatementFactory insertStmtFactory;
private final StatementFactory updateStmtFactory;
private final JdbcRowConverter existSetter;
private final JdbcRowConverter insertSetter;
private final JdbcRowConverter updateSetter;
private final Function<RowData, RowData> keyExtractor;
private transient FieldNamedPreparedStatement existStatement;
private transient FieldNamedPreparedStatement insertStatement;
private transient FieldNamedPreparedStatement updateStatement;
public TableInsertOrUpdateStatementExecutor(
StatementFactory existStmtFactory,
StatementFactory insertStmtFactory,
StatementFactory updateStmtFactory,
JdbcRowConverter existSetter,
JdbcRowConverter insertSetter,
JdbcRowConverter updateSetter,
Function<RowData, RowData> keyExtractor) {
this.existStmtFactory = checkNotNull(existStmtFactory);
this.insertStmtFactory = checkNotNull(insertStmtFactory);
this.updateStmtFactory = checkNotNull(updateStmtFactory);
this.existSetter = checkNotNull(existSetter);
this.insertSetter = checkNotNull(insertSetter);
this.updateSetter = checkNotNull(updateSetter);
this.keyExtractor = keyExtractor;
}
@Override
public void prepareStatements(Connection connection) throws SQLException {
existStatement = existStmtFactory.createStatement(connection);
insertStatement = insertStmtFactory.createStatement(connection);
updateStatement = updateStmtFactory.createStatement(connection);
}
@Override
public void addToBatch(RowData record) throws SQLException {
processOneRowInBatch(keyExtractor.apply(record), record);
}
private void processOneRowInBatch(RowData pk, RowData row) throws SQLException {
if (exist(pk)) {
updateSetter.toExternal(row, updateStatement);
updateStatement.addBatch();
} else {
insertSetter.toExternal(row, insertStatement);
insertStatement.addBatch();
}
}
private boolean exist(RowData pk) throws SQLException {
existSetter.toExternal(pk, existStatement);
try (ResultSet resultSet = existStatement.executeQuery()) {
return resultSet.next();
}
}
@Override
public void executeBatch(Connection conn) throws SQLException {
conn.commit();
}
@Override
public void closeStatements() throws SQLException {
for (FieldNamedPreparedStatement s :
Arrays.asList(existStatement, insertStatement, updateStatement)) {
if (s != null) {
s.close();
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
import org.apache.flink.connector.phoenix.statement.StatementFactory;
import org.apache.flink.table.data.RowData;
import java.sql.Connection;
import java.sql.SQLException;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link JdbcBatchStatementExecutor} that simply adds the records into batches of {@link
* java.sql.PreparedStatement} and doesn't buffer records in memory. Only used in Table/SQL API.
*/
public final class TableSimpleStatementExecutor implements JdbcBatchStatementExecutor<RowData> {
private final StatementFactory stmtFactory;
private final JdbcRowConverter converter;
private transient FieldNamedPreparedStatement st;
/**
* Keep in mind object reuse: if it's on then key extractor may be required to return new
* object.
*/
public TableSimpleStatementExecutor(StatementFactory stmtFactory, JdbcRowConverter converter) {
this.stmtFactory = checkNotNull(stmtFactory);
this.converter = checkNotNull(converter);
}
@Override
public void prepareStatements(Connection connection) throws SQLException {
st = stmtFactory.createStatement(connection);
}
@Override
public void addToBatch(RowData record) throws SQLException {
converter.toExternal(record, st);
st.addBatch();
}
@Override
public void executeBatch(Connection conn) throws SQLException {
st.executeBatch();
}
@Override
public void closeStatements() throws SQLException {
if (st != null) {
st.close();
st = null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
/** JDBC sink DML options. */
public class JdbcDmlOptions extends JdbcTypedQueryOptions {
private static final long serialVersionUID = 1L;
private final String[] fieldNames;
@Nullable private final String[] keyFields;
private final String tableName;
private final JdbcDialect dialect;
public static JdbcDmlOptionsBuilder builder() {
return new JdbcDmlOptionsBuilder();
}
private JdbcDmlOptions(
String tableName,
JdbcDialect dialect,
String[] fieldNames,
int[] fieldTypes,
String[] keyFields) {
super(fieldTypes);
this.tableName = Preconditions.checkNotNull(tableName, "table is empty");
this.dialect = Preconditions.checkNotNull(dialect, "dialect name is empty");
this.fieldNames = Preconditions.checkNotNull(fieldNames, "field names is empty");
this.keyFields = keyFields;
}
public String getTableName() {
return tableName;
}
public JdbcDialect getDialect() {
return dialect;
}
public String[] getFieldNames() {
return fieldNames;
}
public Optional<String[]> getKeyFields() {
return Optional.ofNullable(keyFields);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JdbcDmlOptions that = (JdbcDmlOptions) o;
return Arrays.equals(fieldNames, that.fieldNames)
&& Arrays.equals(keyFields, that.keyFields)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(dialect, that.dialect);
}
@Override
public int hashCode() {
int result = Objects.hash(tableName, dialect);
result = 31 * result + Arrays.hashCode(fieldNames);
result = 31 * result + Arrays.hashCode(keyFields);
return result;
}
/** Builder for {@link JdbcDmlOptions}. */
public static class JdbcDmlOptionsBuilder
extends JdbcUpdateQueryOptionsBuilder<JdbcDmlOptionsBuilder> {
private String tableName;
private String[] fieldNames;
private String[] keyFields;
private JdbcDialect dialect;
@Override
protected JdbcDmlOptionsBuilder self() {
return this;
}
public JdbcDmlOptionsBuilder withFieldNames(String field, String... fieldNames) {
this.fieldNames = concat(field, fieldNames);
return this;
}
public JdbcDmlOptionsBuilder withFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
return this;
}
public JdbcDmlOptionsBuilder withKeyFields(String keyField, String... keyFields) {
this.keyFields = concat(keyField, keyFields);
return this;
}
public JdbcDmlOptionsBuilder withKeyFields(String[] keyFields) {
this.keyFields = keyFields;
return this;
}
public JdbcDmlOptionsBuilder withTableName(String tableName) {
this.tableName = tableName;
return self();
}
public JdbcDmlOptionsBuilder withDialect(JdbcDialect dialect) {
this.dialect = dialect;
return self();
}
public JdbcDmlOptions build() {
return new JdbcDmlOptions(tableName, dialect, fieldNames, fieldTypes, keyFields);
}
static String[] concat(String first, String... next) {
if (next == null || next.length == 0) {
return new String[] {first};
} else {
return Stream.concat(Stream.of(new String[] {first}), Stream.of(next))
.toArray(String[]::new);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.util.Preconditions;
import java.util.stream.IntStream;
/** JDBC sink insert options. */
public class JdbcInsertOptions extends JdbcTypedQueryOptions {
private static final long serialVersionUID = 1L;
private final String query;
public JdbcInsertOptions(String query, int[] typesArray) {
super(typesArray);
this.query = Preconditions.checkNotNull(query, "query is empty");
}
public String getQuery() {
return query;
}
public static JdbcInsertOptions from(String query, int firstFieldType, int... nextFieldTypes) {
return new JdbcInsertOptions(query, concat(firstFieldType, nextFieldTypes));
}
private static int[] concat(int first, int... next) {
if (next == null || next.length == 0) {
return new int[] {first};
} else {
return IntStream.concat(IntStream.of(new int[] {first}), IntStream.of(next)).toArray();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.connector.phoenix.JdbcExecutionOptions;
import java.io.Serializable;
import java.util.Objects;
/** Options for the JDBC lookup. */
public class JdbcLookupOptions implements Serializable {
private final long cacheMaxSize;
private final long cacheExpireMs;
private final int maxRetryTimes;
public JdbcLookupOptions(long cacheMaxSize, long cacheExpireMs, int maxRetryTimes) {
this.cacheMaxSize = cacheMaxSize;
this.cacheExpireMs = cacheExpireMs;
this.maxRetryTimes = maxRetryTimes;
}
public long getCacheMaxSize() {
return cacheMaxSize;
}
public long getCacheExpireMs() {
return cacheExpireMs;
}
public int getMaxRetryTimes() {
return maxRetryTimes;
}
public static Builder builder() {
return new Builder();
}
@Override
public boolean equals(Object o) {
if (o instanceof JdbcLookupOptions) {
JdbcLookupOptions options = (JdbcLookupOptions) o;
return Objects.equals(cacheMaxSize, options.cacheMaxSize)
&& Objects.equals(cacheExpireMs, options.cacheExpireMs)
&& Objects.equals(maxRetryTimes, options.maxRetryTimes);
} else {
return false;
}
}
/** Builder of {@link JdbcLookupOptions}. */
public static class Builder {
private long cacheMaxSize = -1L;
private long cacheExpireMs = -1L;
private int maxRetryTimes = JdbcExecutionOptions.DEFAULT_MAX_RETRY_TIMES;
/** optional, lookup cache max size, over this value, the old data will be eliminated. */
public Builder setCacheMaxSize(long cacheMaxSize) {
this.cacheMaxSize = cacheMaxSize;
return this;
}
/** optional, lookup cache expire mills, over this time, the old data will expire. */
public Builder setCacheExpireMs(long cacheExpireMs) {
this.cacheExpireMs = cacheExpireMs;
return this;
}
/** optional, max retry times for jdbc connector. */
public Builder setMaxRetryTimes(int maxRetryTimes) {
this.maxRetryTimes = maxRetryTimes;
return this;
}
public JdbcLookupOptions build() {
return new JdbcLookupOptions(cacheMaxSize, cacheExpireMs, maxRetryTimes);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.connector.phoenix.JdbcConnectionOptions;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.dialect.JdbcDialects;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Options for the JDBC connector. */
public class JdbcOptions extends JdbcConnectionOptions {
private static final long serialVersionUID = 1L;
private String tableName;
private JdbcDialect dialect;
private final @Nullable Integer parallelism;
protected boolean namespaceMappingEnabled;
protected boolean mapSystemTablesEnabled;
protected JdbcOptions(
String dbURL,
String tableName,
String driverName,
String username,
String password,
JdbcDialect dialect,
Integer parallelism,
int connectionCheckTimeoutSeconds,
boolean namespaceMappingEnabled,
boolean mapSystemTablesEnabled
) {
super(dbURL, driverName, username, password, connectionCheckTimeoutSeconds);
this.tableName = tableName;
this.dialect = dialect;
this.parallelism = parallelism;
this.namespaceMappingEnabled = namespaceMappingEnabled;
this.mapSystemTablesEnabled = mapSystemTablesEnabled;
}
protected JdbcOptions(
String dbURL,
String tableName,
String driverName,
String username,
String password,
JdbcDialect dialect,
Integer parallelism,
int connectionCheckTimeoutSeconds) {
super(dbURL, driverName, username, password, connectionCheckTimeoutSeconds);
this.tableName = tableName;
this.dialect = dialect;
this.parallelism = parallelism;
}
public String getTableName() {
return tableName;
}
public JdbcDialect getDialect() {
return dialect;
}
public Integer getParallelism() {
return parallelism;
}
public boolean isNamespaceMappingEnabled() {
return namespaceMappingEnabled;
}
public boolean isMapSystemTablesEnabled() {
return mapSystemTablesEnabled;
}
public static Builder builder() {
return new Builder();
}
@Override
public boolean equals(Object o) {
if (o instanceof JdbcOptions) {
JdbcOptions options = (JdbcOptions) o;
return Objects.equals(url, options.url)
&& Objects.equals(tableName, options.tableName)
&& Objects.equals(driverName, options.driverName)
&& Objects.equals(username, options.username)
&& Objects.equals(password, options.password)
&& Objects.equals(
dialect.getClass().getName(), options.dialect.getClass().getName())
&& Objects.equals(parallelism, options.parallelism)
&& Objects.equals(
connectionCheckTimeoutSeconds, options.connectionCheckTimeoutSeconds)
&& Objects.equals(
namespaceMappingEnabled, options.namespaceMappingEnabled)
&& Objects.equals(
mapSystemTablesEnabled, options.mapSystemTablesEnabled);
} else {
return false;
}
}
@Override
public int hashCode() {
return Objects.hash(
url,
tableName,
driverName,
username,
password,
dialect.getClass().getName(),
parallelism,
connectionCheckTimeoutSeconds,
namespaceMappingEnabled,
mapSystemTablesEnabled
);
}
/** Builder of {@link JdbcOptions}. */
public static class Builder {
private String dbURL;
private String tableName;
private String driverName;
private String username;
private String password;
private JdbcDialect dialect;
private Integer parallelism;
private int connectionCheckTimeoutSeconds = 60;
protected boolean namespaceMappingEnabled;
protected boolean mapSystemTablesEnabled;
/** required, table name. */
public Builder setTableName(String tableName) {
this.tableName = tableName;
return this;
}
/** optional, user name. */
public Builder setUsername(String username) {
this.username = username;
return this;
}
/** optional, password. */
public Builder setPassword(String password) {
this.password = password;
return this;
}
/** optional, connectionCheckTimeoutSeconds. */
public Builder setConnectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
return this;
}
/**
* optional, driver name, dialect has a default driver name, See {@link
* JdbcDialect#defaultDriverName}.
*/
public Builder setDriverName(String driverName) {
this.driverName = driverName;
return this;
}
/** required, JDBC DB url. */
public Builder setDBUrl(String dbURL) {
this.dbURL = dbURL;
return this;
}
/**
* optional, Handle the SQL dialect of jdbc driver. If not set, it will be infer by {@link
* JdbcDialects#get} from DB url.
*/
public Builder setDialect(JdbcDialect dialect) {
this.dialect = dialect;
return this;
}
public Builder setParallelism(Integer parallelism) {
this.parallelism = parallelism;
return this;
}
public Builder setNamespaceMappingEnabled(boolean namespaceMappingEnabled) {
this.namespaceMappingEnabled = namespaceMappingEnabled;
return this;
}
public Builder setMapSystemTablesEnabled(boolean mapSystemTablesEnabled) {
this.mapSystemTablesEnabled = mapSystemTablesEnabled;
return this;
}
public JdbcOptions build() {
checkNotNull(dbURL, "No dbURL supplied.");
checkNotNull(tableName, "No tableName supplied.");
if (this.dialect == null) {
Optional<JdbcDialect> optional = JdbcDialects.get(dbURL);
this.dialect =
optional.orElseGet(
() -> {
throw new NullPointerException(
"Unknown dbURL,can not find proper dialect.");
});
}
if (this.driverName == null) {
Optional<String> optional = dialect.defaultDriverName();
this.driverName =
optional.orElseGet(
() -> {
throw new NullPointerException("No driverName supplied.");
});
}
return new JdbcOptions(
dbURL,
tableName,
driverName,
username,
password,
dialect,
parallelism,
connectionCheckTimeoutSeconds,
namespaceMappingEnabled,
mapSystemTablesEnabled);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.options;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
/** Options for the JDBC scan. */
public class JdbcReadOptions implements Serializable {
private final String query;
private final String partitionColumnName;
private final Long partitionLowerBound;
private final Long partitionUpperBound;
private final Integer numPartitions;
private final int fetchSize;
private final boolean autoCommit;
private JdbcReadOptions(
String query,
String partitionColumnName,
Long partitionLowerBound,
Long partitionUpperBound,
Integer numPartitions,
int fetchSize,
boolean autoCommit) {
this.query = query;
this.partitionColumnName = partitionColumnName;
this.partitionLowerBound = partitionLowerBound;
this.partitionUpperBound = partitionUpperBound;
this.numPartitions = numPartitions;
this.fetchSize = fetchSize;
this.autoCommit = autoCommit;
}
public Optional<String> getQuery() {
return Optional.ofNullable(query);
}
public Optional<String> getPartitionColumnName() {
return Optional.ofNullable(partitionColumnName);
}
public Optional<Long> getPartitionLowerBound() {
return Optional.ofNullable(partitionLowerBound);
}
public Optional<Long> getPartitionUpperBound() {
return Optional.ofNullable(partitionUpperBound);
}
public Optional<Integer> getNumPartitions() {
return Optional.ofNullable(numPartitions);
}
public int getFetchSize() {
return fetchSize;
}
public boolean getAutoCommit() {
return autoCommit;
}
public static Builder builder() {
return new Builder();
}
@Override
public boolean equals(Object o) {
if (o instanceof JdbcReadOptions) {
JdbcReadOptions options = (JdbcReadOptions) o;
return Objects.equals(query, options.query)
&& Objects.equals(partitionColumnName, options.partitionColumnName)
&& Objects.equals(partitionLowerBound, options.partitionLowerBound)
&& Objects.equals(partitionUpperBound, options.partitionUpperBound)
&& Objects.equals(numPartitions, options.numPartitions)
&& Objects.equals(fetchSize, options.fetchSize)
&& Objects.equals(autoCommit, options.autoCommit);
} else {
return false;
}
}
/** Builder of {@link JdbcReadOptions}. */
public static class Builder {
protected String query;
protected String partitionColumnName;
protected Long partitionLowerBound;
protected Long partitionUpperBound;
protected Integer numPartitions;
protected int fetchSize = 0;
protected boolean autoCommit = true;
/** optional, SQL query statement for this JDBC source. */
public Builder setQuery(String query) {
this.query = query;
return this;
}
/** optional, name of the column used for partitioning the input. */
public Builder setPartitionColumnName(String partitionColumnName) {
this.partitionColumnName = partitionColumnName;
return this;
}
/** optional, the smallest value of the first partition. */
public Builder setPartitionLowerBound(long partitionLowerBound) {
this.partitionLowerBound = partitionLowerBound;
return this;
}
/** optional, the largest value of the last partition. */
public Builder setPartitionUpperBound(long partitionUpperBound) {
this.partitionUpperBound = partitionUpperBound;
return this;
}
/**
* optional, the maximum number of partitions that can be used for parallelism in table
* reading.
*/
public Builder setNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
return this;
}
/**
* optional, the number of rows to fetch per round trip. default value is 0, according to
* the jdbc api, 0 means that fetchSize hint will be ignored.
*/
public Builder setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
return this;
}
/** optional, whether to set auto commit on the JDBC driver. */
public Builder setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
return this;
}
public JdbcReadOptions build() {
return new JdbcReadOptions(
query,
partitionColumnName,
partitionLowerBound,
partitionUpperBound,
numPartitions,
fetchSize,
autoCommit);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.options;
import javax.annotation.Nullable;
import java.io.Serializable;
/** Jdbc query type options. */
abstract class JdbcTypedQueryOptions implements Serializable {
@Nullable private final int[] fieldTypes;
JdbcTypedQueryOptions(int[] fieldTypes) {
this.fieldTypes = fieldTypes;
}
public int[] getFieldTypes() {
return fieldTypes;
}
public abstract static class JdbcUpdateQueryOptionsBuilder<
T extends JdbcUpdateQueryOptionsBuilder<T>> {
int[] fieldTypes;
protected abstract T self();
public T withFieldTypes(int[] fieldTypes) {
this.fieldTypes = fieldTypes;
return self();
}
}
}
package org.apache.flink.connector.phoenix.internal.options;
import java.io.Serializable;
import java.util.Objects;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
public class PhoenixJdbcExecutionOptions implements Serializable {
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
private static final int DEFAULT_INTERVAL_MILLIS = 0;
public static final int DEFAULT_SIZE = 5000;
private final long batchIntervalMs;
private final int batchSize;
private final int maxRetries;
private PhoenixJdbcExecutionOptions(long batchIntervalMs, int batchSize, int maxRetries) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchIntervalMs = batchIntervalMs;
this.batchSize = batchSize;
this.maxRetries = maxRetries;
}
public long getBatchIntervalMs() {
return this.batchIntervalMs;
}
public int getBatchSize() {
return this.batchSize;
}
public int getMaxRetries() {
return this.maxRetries;
}
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (o != null && this.getClass() == o.getClass()) {
PhoenixJdbcExecutionOptions that = (PhoenixJdbcExecutionOptions)o;
return this.batchIntervalMs == that.batchIntervalMs && this.batchSize == that.batchSize && this.maxRetries == that.maxRetries;
} else {
return false;
}
}
public int hashCode() {
return Objects.hash(new Object[]{this.batchIntervalMs, this.batchSize, this.maxRetries});
}
public static Builder builder() {
return new Builder();
}
public static PhoenixJdbcExecutionOptions defaults() {
return builder().build();
}
public static final class Builder {
private long intervalMs = 0L;
private int size = 5000;
private int maxRetries = 3;
public Builder() {
}
public Builder withBatchSize(int size) {
this.size = size;
return this;
}
public Builder withBatchIntervalMs(long intervalMs) {
this.intervalMs = intervalMs;
return this;
}
public Builder withMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}
public PhoenixJdbcExecutionOptions build() {
return new PhoenixJdbcExecutionOptions(this.intervalMs, this.size, this.maxRetries);
}
}
}
\ No newline at end of file
package org.apache.flink.connector.phoenix.internal.options;
import java.io.Serializable;
import java.util.Objects;
public class PhoenixJdbcLookupOptions implements Serializable {
private final long cacheMaxSize;
private final long cacheExpireMs;
private final int maxRetryTimes;
public PhoenixJdbcLookupOptions(long cacheMaxSize, long cacheExpireMs, int maxRetryTimes) {
this.cacheMaxSize = cacheMaxSize;
this.cacheExpireMs = cacheExpireMs;
this.maxRetryTimes = maxRetryTimes;
}
public long getCacheMaxSize() {
return this.cacheMaxSize;
}
public long getCacheExpireMs() {
return this.cacheExpireMs;
}
public int getMaxRetryTimes() {
return this.maxRetryTimes;
}
public static Builder builder() {
return new Builder();
}
public boolean equals(Object o) {
if (!(o instanceof PhoenixJdbcLookupOptions)) {
return false;
} else {
PhoenixJdbcLookupOptions options = (PhoenixJdbcLookupOptions)o;
return Objects.equals(this.cacheMaxSize, options.cacheMaxSize) && Objects.equals(this.cacheExpireMs, options.cacheExpireMs) && Objects.equals(this.maxRetryTimes, options.maxRetryTimes);
}
}
public static class Builder {
private long cacheMaxSize = -1L;
private long cacheExpireMs = -1L;
private int maxRetryTimes = 3;
public Builder() {
}
public Builder setCacheMaxSize(long cacheMaxSize) {
this.cacheMaxSize = cacheMaxSize;
return this;
}
public Builder setCacheExpireMs(long cacheExpireMs) {
this.cacheExpireMs = cacheExpireMs;
return this;
}
public Builder setMaxRetryTimes(int maxRetryTimes) {
this.maxRetryTimes = maxRetryTimes;
return this;
}
public PhoenixJdbcLookupOptions build() {
return new PhoenixJdbcLookupOptions(this.cacheMaxSize, this.cacheExpireMs, this.maxRetryTimes);
}
}
}
\ No newline at end of file
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.connector.phoenix.JdbcConnectionOptions;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.dialect.JdbcDialects;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Optional;
/**
* PhoenixJdbcOptions
*
* @author gy
* @since 2022/3/17 9:57
**/
public class PhoenixJdbcOptions extends JdbcConnectionOptions {
private static final long serialVersionUID = 1L;
private String tableName;
private JdbcDialect dialect;
@Nullable
private final Integer parallelism;
//setting phoenix schema isEnabled
private Boolean isNamespaceMappingEnabled;
private Boolean mapSystemTablesToNamespace;
private PhoenixJdbcOptions(String dbURL, String tableName, String driverName, String username, String password, JdbcDialect dialect, Integer parallelism, int connectionCheckTimeoutSeconds,boolean isNamespaceMappingEnabled, boolean mapSystemTablesToNamespace) {
super(dbURL, driverName, username, password, connectionCheckTimeoutSeconds);
this.tableName = tableName;
this.dialect = dialect;
this.parallelism = parallelism;
this.isNamespaceMappingEnabled = isNamespaceMappingEnabled;
this.mapSystemTablesToNamespace = mapSystemTablesToNamespace;
}
public String getTableName() {
return this.tableName;
}
public JdbcDialect getDialect() {
return this.dialect;
}
public Integer getParallelism() {
return this.parallelism;
}
public Boolean getNamespaceMappingEnabled() {
return isNamespaceMappingEnabled;
}
public Boolean getMapSystemTablesToNamespace() {
return mapSystemTablesToNamespace;
}
public static Builder builder() {
return new Builder();
}
public boolean equals(Object o) {
if (!(o instanceof PhoenixJdbcOptions)) {
return false;
} else {
PhoenixJdbcOptions options = (PhoenixJdbcOptions)o;
return Objects.equals(this.url, options.url) && Objects.equals(this.tableName, options.tableName) && Objects.equals(this.driverName, options.driverName) && Objects.equals(this.username, options.username) && Objects.equals(this.password, options.password) && Objects.equals(this.dialect.getClass().getName(), options.dialect.getClass().getName()) && Objects.equals(this.parallelism, options.parallelism) && Objects.equals(this.connectionCheckTimeoutSeconds, options.connectionCheckTimeoutSeconds)&& Objects.equals(this.isNamespaceMappingEnabled, options.isNamespaceMappingEnabled)&& Objects.equals(this.mapSystemTablesToNamespace, options.mapSystemTablesToNamespace);
}
}
public int hashCode() {
return Objects.hash(new Object[]{this.url, this.tableName, this.driverName, this.username, this.password, this.dialect.getClass().getName(), this.parallelism, this.connectionCheckTimeoutSeconds,this.isNamespaceMappingEnabled,this.mapSystemTablesToNamespace});
}
public static class Builder {
private String dbURL;
private String tableName;
private String driverName;
private String username;
private String password;
private JdbcDialect dialect;
private Integer parallelism;
private int connectionCheckTimeoutSeconds = 60;
private Boolean isNamespaceMappingEnabled;
private Boolean mapSystemTablesToNamespace;
public Builder() {
}
public Builder setTableName(String tableName) {
this.tableName = tableName;
return this;
}
public Builder setUsername(String username) {
this.username = username;
return this;
}
public Builder setPassword(String password) {
this.password = password;
return this;
}
public Builder setConnectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
return this;
}
public Builder setDriverName(String driverName) {
this.driverName = driverName;
return this;
}
public Builder setDBUrl(String dbURL) {
this.dbURL = dbURL;
return this;
}
public Builder setDialect(JdbcDialect dialect) {
this.dialect = dialect;
return this;
}
public Builder setParallelism(Integer parallelism) {
this.parallelism = parallelism;
return this;
}
public Builder setNamespaceMappingEnabled(Boolean namespaceMappingEnabled) {
this.isNamespaceMappingEnabled = namespaceMappingEnabled;
return this;
}
public Builder setMapSystemTablesToNamespace(Boolean mapSystemTablesToNamespace) {
this.mapSystemTablesToNamespace = mapSystemTablesToNamespace;
return this;
}
public PhoenixJdbcOptions build() {
Preconditions.checkNotNull(this.dbURL, "No dbURL supplied.");
Preconditions.checkNotNull(this.tableName, "No tableName supplied.");
Optional optional;
if (this.dialect == null) {
optional = JdbcDialects.get(this.dbURL);
this.dialect = (JdbcDialect)optional.orElseGet(() -> {
throw new NullPointerException("Unknown dbURL,can not find proper dialect.");
});
}
if (this.driverName == null) {
optional = this.dialect.defaultDriverName();
this.driverName = (String)optional.orElseGet(() -> {
throw new NullPointerException("No driverName supplied.");
});
}
return new PhoenixJdbcOptions(this.dbURL, this.tableName, this.driverName, this.username, this.password, this.dialect, this.parallelism, this.connectionCheckTimeoutSeconds,this.isNamespaceMappingEnabled,this.mapSystemTablesToNamespace);
}
}
}
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.connector.phoenix.internal.options.JdbcReadOptions;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
public class PhoenixJdbcReadOptions implements Serializable {
private final String query;
private final String partitionColumnName;
private final Long partitionLowerBound;
private final Long partitionUpperBound;
private final Integer numPartitions;
private final int fetchSize;
private final boolean autoCommit;
private PhoenixJdbcReadOptions(String query, String partitionColumnName, Long partitionLowerBound, Long partitionUpperBound, Integer numPartitions, int fetchSize, boolean autoCommit) {
this.query = query;
this.partitionColumnName = partitionColumnName;
this.partitionLowerBound = partitionLowerBound;
this.partitionUpperBound = partitionUpperBound;
this.numPartitions = numPartitions;
this.fetchSize = fetchSize;
this.autoCommit = autoCommit;
}
public Optional<String> getQuery() {
return Optional.ofNullable(this.query);
}
public Optional<String> getPartitionColumnName() {
return Optional.ofNullable(this.partitionColumnName);
}
public Optional<Long> getPartitionLowerBound() {
return Optional.ofNullable(this.partitionLowerBound);
}
public Optional<Long> getPartitionUpperBound() {
return Optional.ofNullable(this.partitionUpperBound);
}
public Optional<Integer> getNumPartitions() {
return Optional.ofNullable(this.numPartitions);
}
public int getFetchSize() {
return this.fetchSize;
}
public boolean getAutoCommit() {
return this.autoCommit;
}
public static Builder builder() {
return new Builder();
}
public boolean equals(Object o) {
if (!(o instanceof JdbcReadOptions)) {
return false;
} else {
PhoenixJdbcReadOptions options = (PhoenixJdbcReadOptions)o;
return Objects.equals(this.query, options.query) && Objects.equals(this.partitionColumnName, options.partitionColumnName) && Objects.equals(this.partitionLowerBound, options.partitionLowerBound) && Objects.equals(this.partitionUpperBound, options.partitionUpperBound) && Objects.equals(this.numPartitions, options.numPartitions) && Objects.equals(this.fetchSize, options.fetchSize) && Objects.equals(this.autoCommit, options.autoCommit);
}
}
public static class Builder {
protected String query;
protected String partitionColumnName;
protected Long partitionLowerBound;
protected Long partitionUpperBound;
protected Integer numPartitions;
protected int fetchSize = 0;
protected boolean autoCommit = true;
public Builder() {
}
public Builder setQuery(String query) {
this.query = query;
return this;
}
public Builder setPartitionColumnName(String partitionColumnName) {
this.partitionColumnName = partitionColumnName;
return this;
}
public Builder setPartitionLowerBound(long partitionLowerBound) {
this.partitionLowerBound = partitionLowerBound;
return this;
}
public Builder setPartitionUpperBound(long partitionUpperBound) {
this.partitionUpperBound = partitionUpperBound;
return this;
}
public Builder setNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
return this;
}
public Builder setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
return this;
}
public Builder setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
return this;
}
public PhoenixJdbcReadOptions build() {
return new PhoenixJdbcReadOptions(this.query, this.partitionColumnName, this.partitionLowerBound, this.partitionUpperBound, this.numPartitions, this.fetchSize, this.autoCommit);
}
}
}
\ No newline at end of file
package org.apache.flink.connector.phoenix.internal.options;/*
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.phoenix.internal.AbstractJdbcOutputFormat;
import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
import java.util.Properties;
*/
/**
* PhoenixSinkFunction
*
* @author gy
* @since 2022/3/22 16:27
**//*
public class PhoenixSinkFunction <T> extends RichSinkFunction<T>
implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcOutputFormat.class);
private final JdbcConnectionProvider jdbcConnectionProvider;
private final JdbcOptions options;
private static Connection connection = null;
private static String tableName = "test.ecgbeats12";
private static PreparedStatement psUp = null;
private static int batchcount = 0;
private static int totalcount = 0;
private static Date startTime;
public PhoenixSinkFunction(JdbcOptions jdbcOptions,JdbcConnectionProvider jdbcConnectionProvider) {
this.options = jdbcOptions;
this.jdbcConnectionProvider = jdbcConnectionProvider;
}
@Override
public void open(Configuration parameters) throws Exception {
Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
//super.open(parameters);
*/
/*RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());*//*
*/
/* Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
Properties properties = new Properties();
properties.put("phoenix.schema.isNamespaceMappingEnabled", "true");
properties.put("phoenix.schema.mapSystemTablesToNamespac", "true");
connection = DriverManager.getConnection("jdbc:phoenix:hd01,hd02,hd03:2181",properties);*//*
connection.setAutoCommit(false);
//使用PrepareStatement进行数据的插入,需要指定好对应的Primary Key
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("upsert into " + tableName + "(ecg_id , bindex , btype , bt_flag , af_flag , bmatch , rr, nrr , detpeak , dettresh ) values(?,?,?,?,?,?,?,?,?,?)");
String sqlUp = sqlBuilder.toString();
psUp = connection.prepareStatement(sqlUp);
this.options.getDialect().
}
@Override
public void invoke(T value, Context context) throws IOException {
psUp.executeUpdate();
//psUp.addBatch();
batchcount++;
totalcount++;
if (batchcount == 1000) {
System.out.println("add batch : "+batchcount);
//Phoenix使用commit()而不是executeBatch()来控制批量更新。
//psUp.executeBatch();
connection.commit();
//psUp.clearBatch();
batchcount = 0;
System.out.println("totalcount : "+totalcount);
}
}
@Override
public void initializeState(FunctionInitializationContext context) {}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
outputFormat.flush();
}
@Override
public void close() {
//psUp.executeBatch();
connection.commit();
//psUp.clearBatch();
Date endTime = new Date();
long l = endTime.getTime() - startTime.getTime();
long day = l / (24 * 60 * 60 * 1000);
long hour = (l / (60 * 60 * 1000) - day * 24);
long min = ((l / (60 * 1000)) - day * 24 * 60 - hour * 60);
long s = (l / 1000 - day * 24 * 60 * 60 - hour * 60 * 60 - min * 60);
System.out.println("========结束写入时间: "+ endTime);
System.out.println("========运行时间: " + day + "天" + hour + "小时" + min + "分" + s + "秒");
if (psUp != null ) {
try {
psUp.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}*/
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.split;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.connector.phoenix.PhoenixInputFormat;
import java.io.Serializable;
/**
* This splits generator actually does nothing but wrapping the query parameters computed by the
* user before creating the {@link PhoenixInputFormat} instance.
*/
@Experimental
public class JdbcGenericParameterValuesProvider implements JdbcParameterValuesProvider {
private final Serializable[][] parameters;
public JdbcGenericParameterValuesProvider(Serializable[][] parameters) {
this.parameters = parameters;
}
@Override
public Serializable[][] getParameterValues() {
// do nothing...precomputed externally
return parameters;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.split;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.connector.phoenix.PhoenixInputFormat;
import java.io.Serializable;
/**
* This interface is used by the {@link PhoenixInputFormat} to compute the list of parallel query to
* run (i.e. splits). Each query will be parameterized using a row of the matrix provided by each
* {@link JdbcParameterValuesProvider} implementation.
*/
@Experimental
public interface JdbcParameterValuesProvider {
/** Returns the necessary parameters array to use for query in parallel a table. */
Serializable[][] getParameterValues();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.statement;
import java.sql.Connection;
import java.sql.SQLException;
/** A factory to create {@link FieldNamedPreparedStatement} with the given {@link Connection}. */
public interface StatementFactory {
/** Creates {@link FieldNamedPreparedStatement} with the given {@link Connection}. */
FieldNamedPreparedStatement createStatement(Connection connection) throws SQLException;
}
package org.apache.flink.connector.phoenix.table;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* PhoenixJdbcSinkFunction
*
* @author gy
* @since 2022/3/17 17:41
**/
public class PhoenixJdbcSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
}
@Override
public void invoke(T value) throws Exception {
}
@Override
public void invoke(T value, Context context) throws Exception {
}
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment