Unverified Commit ee28a13b authored by Kerwin's avatar Kerwin Committed by GitHub

Added dlink-connectors module code style. (#913)

parent 82f4b640
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.doris.flink.cfg;
import org.apache.doris.flink.table.DorisDynamicOutputFormat;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
......@@ -28,7 +48,6 @@ public class GenericDorisSinkFunction<T> extends RichSinkFunction<T>
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@Override
public void invoke(T value, Context context) throws Exception {
outputFormat.writeRecord(value);
......
......@@ -17,12 +17,11 @@
*
*/
package org.apache.doris.flink.serialization;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.shaded.org.apache.arrow.memory.RootAllocator;
import org.apache.doris.shaded.org.apache.arrow.vector.BigIntVector;
import org.apache.doris.shaded.org.apache.arrow.vector.BitVector;
import org.apache.doris.shaded.org.apache.arrow.vector.DecimalVector;
......@@ -37,15 +36,10 @@ import org.apache.doris.shaded.org.apache.arrow.vector.VarCharVector;
import org.apache.doris.shaded.org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.doris.shaded.org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.doris.shaded.org.apache.arrow.vector.types.Types;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
......@@ -54,6 +48,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* row batch data container.
*/
......@@ -141,8 +138,7 @@ public class RowBatch {
private void addValueToRow(int rowIndex, Object obj) {
if (rowIndex > rowCountInOneBatch) {
String errMsg = "Get row offset: " + rowIndex + " larger than row size: " +
rowCountInOneBatch;
String errMsg = "Get row offset: " + rowIndex + " larger than row size: " + rowCountInOneBatch;
logger.error(errMsg);
throw new NoSuchElementException(errMsg);
}
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
......@@ -25,7 +24,11 @@ import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
......@@ -25,7 +24,11 @@ import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
......
......@@ -17,12 +17,8 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import oracle.sql.CLOB;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
......@@ -37,6 +33,8 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import oracle.sql.CLOB;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
......@@ -96,21 +94,21 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (val) -> {
if(val instanceof LocalDateTime){
if (val instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime)val);
}else if(val instanceof oracle.sql.TIMESTAMP){
} else if (val instanceof oracle.sql.TIMESTAMP) {
return TimestampData.fromTimestamp(Timestamp.valueOf(((oracle.sql.TIMESTAMP) val).stringValue()));
}else{
} else {
return TimestampData.fromTimestamp((Timestamp) val);
}
};
case CHAR:
case VARCHAR:
return (val) -> {
if(val instanceof CLOB){
if (val instanceof CLOB) {
CLOB clob = (CLOB) val;
return StringData.fromString(clob == null ? null : clob.stringValue());
}else {
} else {
return StringData.fromString((String) val);
}
};
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
......@@ -26,7 +24,11 @@ import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
......@@ -25,7 +24,11 @@ import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
......
......@@ -17,11 +17,8 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import oracle.sql.CLOB;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
......@@ -36,6 +33,8 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import oracle.sql.CLOB;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
......@@ -95,21 +94,21 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (val) -> {
if(val instanceof LocalDateTime){
if (val instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime)val);
}else if(val instanceof oracle.sql.TIMESTAMP){
} else if (val instanceof oracle.sql.TIMESTAMP) {
return TimestampData.fromTimestamp(Timestamp.valueOf(((oracle.sql.TIMESTAMP) val).stringValue()));
}else{
} else {
return TimestampData.fromTimestamp((Timestamp) val);
}
};
case CHAR:
case VARCHAR:
return (val) -> {
if(val instanceof CLOB){
if (val instanceof CLOB) {
CLOB clob = (CLOB) val;
return StringData.fromString(clob == null ? null : clob.stringValue());
}else {
} else {
return StringData.fromString((String) val);
}
};
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
......@@ -26,7 +24,11 @@ import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
......@@ -25,7 +24,11 @@ import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
......
......@@ -17,11 +17,8 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import oracle.sql.CLOB;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
......@@ -36,6 +33,8 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import oracle.sql.CLOB;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
......@@ -95,21 +94,21 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (val) -> {
if(val instanceof LocalDateTime){
if (val instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime)val);
}else if(val instanceof oracle.sql.TIMESTAMP){
} else if (val instanceof oracle.sql.TIMESTAMP) {
return TimestampData.fromTimestamp(Timestamp.valueOf(((oracle.sql.TIMESTAMP) val).stringValue()));
}else{
} else {
return TimestampData.fromTimestamp((Timestamp) val);
}
};
case CHAR:
case VARCHAR:
return (val) -> {
if(val instanceof CLOB){
if (val instanceof CLOB) {
CLOB clob = (CLOB) val;
return StringData.fromString(clob == null ? null : clob.stringValue());
}else {
} else {
return StringData.fromString((String) val);
}
};
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
......
......@@ -17,9 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
......@@ -25,7 +24,11 @@ import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
......@@ -25,7 +24,11 @@ import org.apache.flink.connector.jdbc.internal.converter.SQLServerRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
......
......@@ -17,9 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
......
......@@ -17,11 +17,8 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import oracle.sql.CLOB;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
......@@ -36,6 +33,8 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import oracle.sql.CLOB;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
......@@ -95,21 +94,21 @@ public class OracleRowConverter extends AbstractJdbcRowConverter {
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (val) -> {
if(val instanceof LocalDateTime){
if (val instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime)val);
}else if(val instanceof oracle.sql.TIMESTAMP){
} else if (val instanceof oracle.sql.TIMESTAMP) {
return TimestampData.fromTimestamp(Timestamp.valueOf(((oracle.sql.TIMESTAMP) val).stringValue()));
}else{
} else {
return TimestampData.fromTimestamp((Timestamp) val);
}
};
case CHAR:
case VARCHAR:
return (val) -> {
if(val instanceof CLOB){
if (val instanceof CLOB) {
CLOB clob = (CLOB) val;
return StringData.fromString(clob == null ? null : clob.stringValue());
}else {
} else {
return StringData.fromString((String) val);
}
};
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
......
......@@ -17,16 +17,16 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Optional;
import javax.annotation.Nullable;
/** JDBC connection options. */
@PublicEvolving
public class JdbcConnectionOptions implements Serializable {
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.commons.lang.StringUtils;
......@@ -38,14 +37,23 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.*;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
* using the supplied InputFormatBuilder. A valid RowTypeInfo must be properly configured in the
......@@ -145,13 +153,13 @@ public class PhoenixInputFormat extends RichInputFormat<Row, InputSplit>
dbConn.setAutoCommit(autoCommit);
}
LOG.debug("openInputFormat query :" +queryTemplate);
LOG.debug("openInputFormat query :" + queryTemplate);
//删除 ` 号 phoenix中不支持
String initQuery = StringUtils.remove(queryTemplate, "\\`");
LOG.debug("openInputFormat initQuery :" +initQuery);
LOG.debug("openInputFormat initQuery :" + initQuery);
//将 " 双引号替换成 ' 单引号
String replaceQuery = StringUtils.replace(initQuery, "\"", "'");
LOG.info("openInputFormat replaceQuery :" +replaceQuery);
LOG.info("openInputFormat replaceQuery :" + replaceQuery);
statement = dbConn.prepareStatement(replaceQuery, resultSetType, resultSetConcurrency);
if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
......@@ -432,7 +440,6 @@ public class PhoenixInputFormat extends RichInputFormat<Row, InputSplit>
return this;
}
public PhoenixInputFormat finish() {
format.connectionProvider =
//new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.dialect;
import org.apache.flink.table.api.TableSchema;
......
......@@ -17,10 +17,10 @@
*
*/
package org.apache.flink.connector.phoenix.dialect;
import static java.lang.String.format;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
......@@ -32,8 +32,6 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.lang.String.format;
/** Handle the SQL dialect of jdbc driver. */
@Internal
public interface JdbcDialect extends Serializable {
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.dialect;
import java.util.Arrays;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.dialect;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
......@@ -127,7 +126,6 @@ public class PhoenixDialect extends AbstractDialect {
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
......
......@@ -17,20 +17,20 @@
*
*/
package org.apache.flink.connector.phoenix.internal;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Flushable;
import java.io.IOException;
import java.sql.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Base jdbc outputFormat. */
public abstract class AbstractJdbcOutputFormat<T> extends RichOutputFormat<T> implements Flushable {
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal;
import org.apache.flink.annotation.Internal;
......@@ -29,9 +28,10 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import java.io.IOException;
import javax.annotation.Nonnull;
/** A generic SinkFunction for JDBC. */
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
......
......@@ -17,10 +17,11 @@
*
*/
package org.apache.flink.connector.phoenix.internal;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -36,10 +37,7 @@ import org.apache.flink.connector.phoenix.utils.JdbcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
......@@ -51,8 +49,10 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A JDBC outputFormat that supports batching records before writing records to database. */
@Internal
......@@ -148,7 +148,6 @@ public class JdbcBatchingOutputFormat<
TimeUnit.MILLISECONDS);
}
}
private JdbcExec createAndOpenStatementExecutor(
......@@ -242,7 +241,7 @@ public class JdbcBatchingOutputFormat<
if (batchCount > 0) {
try {
LOG.info("关闭连接前 刷写数据 !!! batchCount: "+batchCount);
LOG.info("关闭连接前 刷写数据 !!! batchCount: " + batchCount);
flush();
} catch (Exception e) {
LOG.warn("Writing records to JDBC failed.", e);
......@@ -338,7 +337,6 @@ public class JdbcBatchingOutputFormat<
.withFieldTypes(fieldTypes)
.build();
if (dml.getKeyFields().isPresent() && dml.getKeyFields().get().length > 0) {
return new TableJdbcUpsertOutputFormat(
new PhoneixJdbcConnectionProvider(options,this.options.isNamespaceMappingEnabled(),this.options.isMapSystemTablesEnabled()),
......
......@@ -17,9 +17,12 @@
*
*/
package org.apache.flink.connector.phoenix.internal;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.getPrimaryKey;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkArgument;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -31,9 +34,6 @@ import org.apache.flink.connector.phoenix.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
......@@ -41,9 +41,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.function.Function;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.getPrimaryKey;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkArgument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TableJdbcUpsertOutputFormat
extends JdbcBatchingOutputFormat<
......@@ -123,8 +122,6 @@ class TableJdbcUpsertOutputFormat
}
}
@Override
public synchronized void close() {
try {
......
......@@ -17,16 +17,15 @@
*
*/
package org.apache.flink.connector.phoenix.internal.connection;
import org.apache.flink.annotation.Internal;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.SQLException;
import javax.annotation.Nullable;
/** JDBC connection provider. */
@Internal
public interface JdbcConnectionProvider {
......
......@@ -17,13 +17,10 @@
*
*/
package org.apache.flink.connector.phoenix.internal.connection;
import org.apache.flink.connector.phoenix.JdbcConnectionOptions;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.sql.Connection;
......@@ -33,6 +30,9 @@ import java.sql.SQLException;
import java.util.Enumeration;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* PhoneixJdbcConnectionProvider
*
......@@ -103,7 +103,9 @@ public class PhoneixJdbcConnectionProvider implements JdbcConnectionProvider, Se
return this.connection;
} else {
if (this.jdbcOptions.getDriverName() == null) {
this.connection = DriverManager.getConnection(this.jdbcOptions.getDbURL(), (String) this.jdbcOptions.getUsername().orElse((String) null), (String) this.jdbcOptions.getPassword().orElse((String) null));
this.connection = DriverManager.getConnection(this.jdbcOptions.getDbURL(),
(String) this.jdbcOptions.getUsername().orElse((String) null),
(String) this.jdbcOptions.getPassword().orElse((String) null));
} else {
Driver driver = this.getLoadedDriver();
Properties info = new Properties();
......
......@@ -19,22 +19,35 @@
package org.apache.flink.connector.phoenix.internal.converter;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
import org.apache.flink.connector.phoenix.utils.JdbcTypeUtil;
import org.apache.flink.table.data.*;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.*;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Base class for all converters that convert between JDBC object and Flink internal object. */
public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
......
......@@ -17,9 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal.converter;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
......
......@@ -17,10 +17,8 @@
*
*/
package org.apache.flink.connector.phoenix.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
......
......@@ -17,17 +17,13 @@
*
*/
package org.apache.flink.connector.phoenix.internal.executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.JdbcStatementBuilder;
import org.apache.flink.connector.phoenix.table.PhoenixUpsertTableSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.sql.Connection;
import java.sql.PreparedStatement;
......@@ -38,7 +34,10 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link JdbcBatchStatementExecutor} that provides upsert semantics by updating row if it exists
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.annotation.Internal;
......
......@@ -17,15 +17,10 @@
*
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.connector.phoenix.JdbcStatementBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
......@@ -33,6 +28,9 @@ import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link JdbcBatchStatementExecutor} that extracts SQL keys from the supplied stream elements and
* executes a SQL query for them.
......
......@@ -17,14 +17,10 @@
*
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.connector.phoenix.JdbcStatementBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
......@@ -32,6 +28,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link JdbcBatchStatementExecutor} that executes supplied statement for given the records
* (without any pre-processing).
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.api.java.tuple.Tuple2;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.table.data.RowData;
......
......@@ -17,10 +17,10 @@
*
*/
package org.apache.flink.connector.phoenix.internal.executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
......@@ -33,8 +33,6 @@ import java.sql.SQLException;
import java.util.Arrays;
import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* {@link JdbcBatchStatementExecutor} that provides upsert semantics by updating row if it exists
* and inserting otherwise. Only used in Table/SQL API.
......
......@@ -17,10 +17,10 @@
*
*/
package org.apache.flink.connector.phoenix.internal.executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
import org.apache.flink.connector.phoenix.statement.StatementFactory;
......@@ -29,8 +29,6 @@ import org.apache.flink.table.data.RowData;
import java.sql.Connection;
import java.sql.SQLException;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link JdbcBatchStatementExecutor} that simply adds the records into batches of {@link
* java.sql.PreparedStatement} and doesn't buffer records in memory. Only used in Table/SQL API.
......
......@@ -17,18 +17,18 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nullable;
/** JDBC sink DML options. */
public class JdbcDmlOptions extends JdbcTypedQueryOptions {
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.util.Preconditions;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.connector.phoenix.JdbcExecutionOptions;
......
......@@ -17,19 +17,18 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.connector.phoenix.JdbcConnectionOptions;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.dialect.JdbcDialects;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
import javax.annotation.Nullable;
/** Options for the JDBC connector. */
public class JdbcOptions extends JdbcConnectionOptions {
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import java.io.Serializable;
......
......@@ -17,13 +17,12 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import javax.annotation.Nullable;
import java.io.Serializable;
import javax.annotation.Nullable;
/** Jdbc query type options. */
abstract class JdbcTypedQueryOptions implements Serializable {
......
......@@ -17,14 +17,12 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import java.io.Serializable;
import java.util.Objects;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.Objects;
public class PhoenixJdbcExecutionOptions implements Serializable {
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import java.io.Serializable;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.connector.phoenix.JdbcConnectionOptions;
......@@ -25,10 +24,11 @@ import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.dialect.JdbcDialects;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.Nullable;
/**
* PhoenixJdbcOptions
*
......@@ -45,7 +45,8 @@ public class PhoenixJdbcOptions extends JdbcConnectionOptions {
private Boolean isNamespaceMappingEnabled;
private Boolean mapSystemTablesToNamespace;
private PhoenixJdbcOptions(String dbURL, String tableName, String driverName, String username, String password, JdbcDialect dialect, Integer parallelism, int connectionCheckTimeoutSeconds,boolean isNamespaceMappingEnabled, boolean mapSystemTablesToNamespace) {
private PhoenixJdbcOptions(String dbURL, String tableName, String driverName, String username, String password, JdbcDialect dialect,
Integer parallelism, int connectionCheckTimeoutSeconds, boolean isNamespaceMappingEnabled, boolean mapSystemTablesToNamespace) {
super(dbURL, driverName, username, password, connectionCheckTimeoutSeconds);
this.tableName = tableName;
this.dialect = dialect;
......@@ -82,13 +83,23 @@ public class PhoenixJdbcOptions extends JdbcConnectionOptions {
if (!(o instanceof PhoenixJdbcOptions)) {
return false;
} else {
PhoenixJdbcOptions options = (PhoenixJdbcOptions)o;
return Objects.equals(this.url, options.url) && Objects.equals(this.tableName, options.tableName) && Objects.equals(this.driverName, options.driverName) && Objects.equals(this.username, options.username) && Objects.equals(this.password, options.password) && Objects.equals(this.dialect.getClass().getName(), options.dialect.getClass().getName()) && Objects.equals(this.parallelism, options.parallelism) && Objects.equals(this.connectionCheckTimeoutSeconds, options.connectionCheckTimeoutSeconds)&& Objects.equals(this.isNamespaceMappingEnabled, options.isNamespaceMappingEnabled)&& Objects.equals(this.mapSystemTablesToNamespace, options.mapSystemTablesToNamespace);
PhoenixJdbcOptions options = (PhoenixJdbcOptions) o;
return Objects.equals(this.url, options.url)
&& Objects.equals(this.tableName, options.tableName)
&& Objects.equals(this.driverName, options.driverName)
&& Objects.equals(this.username, options.username)
&& Objects.equals(this.password, options.password)
&& Objects.equals(this.dialect.getClass().getName(), options.dialect.getClass().getName())
&& Objects.equals(this.parallelism, options.parallelism)
&& Objects.equals(this.connectionCheckTimeoutSeconds, options.connectionCheckTimeoutSeconds)
&& Objects.equals(this.isNamespaceMappingEnabled, options.isNamespaceMappingEnabled)
&& Objects.equals(this.mapSystemTablesToNamespace, options.mapSystemTablesToNamespace);
}
}
public int hashCode() {
return Objects.hash(new Object[]{this.url, this.tableName, this.driverName, this.username, this.password, this.dialect.getClass().getName(), this.parallelism, this.connectionCheckTimeoutSeconds,this.isNamespaceMappingEnabled,this.mapSystemTablesToNamespace});
return Objects.hash(new Object[] {this.url, this.tableName, this.driverName, this.username, this.password, this.dialect.getClass().getName(),
this.parallelism, this.connectionCheckTimeoutSeconds, this.isNamespaceMappingEnabled, this.mapSystemTablesToNamespace});
}
public static class Builder {
......@@ -162,21 +173,20 @@ public class PhoenixJdbcOptions extends JdbcConnectionOptions {
Optional optional;
if (this.dialect == null) {
optional = JdbcDialects.get(this.dbURL);
this.dialect = (JdbcDialect)optional.orElseGet(() -> {
this.dialect = (JdbcDialect) optional.orElseGet(() -> {
throw new NullPointerException("Unknown dbURL,can not find proper dialect.");
});
}
if (this.driverName == null) {
optional = this.dialect.defaultDriverName();
this.driverName = (String)optional.orElseGet(() -> {
this.driverName = (String) optional.orElseGet(() -> {
throw new NullPointerException("No driverName supplied.");
});
}
return new PhoenixJdbcOptions(this.dbURL, this.tableName, this.driverName, this.username, this.password, this.dialect, this.parallelism, this.connectionCheckTimeoutSeconds,this.isNamespaceMappingEnabled,this.mapSystemTablesToNamespace);
return new PhoenixJdbcOptions(this.dbURL, this.tableName, this.driverName, this.username, this.password, this.dialect,
this.parallelism, this.connectionCheckTimeoutSeconds, this.isNamespaceMappingEnabled, this.mapSystemTablesToNamespace);
}
}
}
......@@ -17,11 +17,8 @@
*
*/
package org.apache.flink.connector.phoenix.internal.options;
import org.apache.flink.connector.phoenix.internal.options.JdbcReadOptions;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
......@@ -83,7 +80,13 @@ public class PhoenixJdbcReadOptions implements Serializable {
return false;
} else {
PhoenixJdbcReadOptions options = (PhoenixJdbcReadOptions)o;
return Objects.equals(this.query, options.query) && Objects.equals(this.partitionColumnName, options.partitionColumnName) && Objects.equals(this.partitionLowerBound, options.partitionLowerBound) && Objects.equals(this.partitionUpperBound, options.partitionUpperBound) && Objects.equals(this.numPartitions, options.numPartitions) && Objects.equals(this.fetchSize, options.fetchSize) && Objects.equals(this.autoCommit, options.autoCommit);
return Objects.equals(this.query, options.query)
&& Objects.equals(this.partitionColumnName, options.partitionColumnName)
&& Objects.equals(this.partitionLowerBound, options.partitionLowerBound)
&& Objects.equals(this.partitionUpperBound, options.partitionUpperBound)
&& Objects.equals(this.numPartitions, options.numPartitions)
&& Objects.equals(this.fetchSize, options.fetchSize)
&& Objects.equals(this.autoCommit, options.autoCommit);
}
}
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.split;
import org.apache.flink.annotation.Experimental;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.split;
import org.apache.flink.annotation.Experimental;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.split;
import org.apache.flink.annotation.Experimental;
......
......@@ -17,11 +17,16 @@
*
*/
package org.apache.flink.connector.phoenix.statement;
import java.math.BigDecimal;
import java.sql.*;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
/**
* This is a wrapper around {@link PreparedStatement} and allows the users to set parameters by name
......
......@@ -17,20 +17,24 @@
*
*/
package org.apache.flink.connector.phoenix.statement;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import java.math.BigDecimal;
import java.sql.*;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Simple implementation of {@link FieldNamedPreparedStatement}. */
public class FieldNamedPreparedStatementImpl implements FieldNamedPreparedStatement {
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.statement;
import java.sql.Connection;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -62,9 +61,9 @@ public class PhoenixDynamicTableSink implements DynamicTableSink {
return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).addContainedKind(RowKind.UPDATE_AFTER).build();
}
private void validatePrimaryKey(ChangelogMode requestedMode) {
Preconditions.checkState(ChangelogMode.insertOnly().equals(requestedMode) || this.dmlOptions.getKeyFields().isPresent(), "please declare primary key for sink table when query contains update/delete record.");
Preconditions.checkState(ChangelogMode.insertOnly().equals(requestedMode)
|| this.dmlOptions.getKeyFields().isPresent(), "please declare primary key for sink table when query contains update/delete record.");
}
@Override
......@@ -86,7 +85,7 @@ public class PhoenixDynamicTableSink implements DynamicTableSink {
@Override
public String asSummaryString() {
return "Phoenix Table Sink " ;
return "Phoenix Table Sink ";
}
public boolean equals(Object o) {
......@@ -96,7 +95,11 @@ public class PhoenixDynamicTableSink implements DynamicTableSink {
return false;
} else {
PhoenixDynamicTableSink that = (PhoenixDynamicTableSink)o;
return Objects.equals(this.jdbcOptions, that.jdbcOptions) && Objects.equals(this.executionOptions, that.executionOptions) && Objects.equals(this.dmlOptions, that.dmlOptions) && Objects.equals(this.tableSchema, that.tableSchema) && Objects.equals(this.dialectName, that.dialectName);
return Objects.equals(this.jdbcOptions, that.jdbcOptions)
&& Objects.equals(this.executionOptions, that.executionOptions)
&& Objects.equals(this.dmlOptions, that.dmlOptions)
&& Objects.equals(this.tableSchema, that.tableSchema)
&& Objects.equals(this.dialectName, that.dialectName);
}
}
......
......@@ -17,10 +17,8 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcReadOptions;
......@@ -28,7 +26,11 @@ import org.apache.flink.connector.phoenix.internal.options.PhoenixJdbcOptions;
import org.apache.flink.connector.phoenix.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.*;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.types.logical.RowType;
......@@ -126,10 +128,12 @@ public class PhoenixDynamicTableSource implements ScanTableSource, LookupTableSo
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
public boolean supportsNestedProjection() {
return false;
}
@Override
public void applyProjection(int[][] projectedFields) {
this.physicalSchema = TableSchemaUtils.projectSchema(this.physicalSchema, projectedFields);
......@@ -150,7 +154,10 @@ public class PhoenixDynamicTableSource implements ScanTableSource, LookupTableSo
return false;
} else {
PhoenixDynamicTableSource that = (PhoenixDynamicTableSource)o;
return Objects.equals(this.options, that.options) && Objects.equals(this.physicalSchema, that.physicalSchema) && Objects.equals(this.dialectName, that.dialectName) && Objects.equals(this.limit, that.limit);
return Objects.equals(this.options, that.options)
&& Objects.equals(this.physicalSchema, that.physicalSchema)
&& Objects.equals(this.dialectName, that.dialectName)
&& Objects.equals(this.limit, that.limit);
}
}
......
......@@ -17,26 +17,28 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import static org.apache.flink.table.data.RowData.createFieldGetter;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.connector.phoenix.JdbcExecutionOptions;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.internal.JdbcBatchingOutputFormat;
import org.apache.flink.connector.phoenix.internal.connection.PhoneixJdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.phoenix.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.phoenix.internal.executor.TableBufferReducedStatementExecutor;
import org.apache.flink.connector.phoenix.internal.executor.TableBufferedStatementExecutor;
import org.apache.flink.connector.phoenix.internal.executor.TableInsertOrUpdateStatementExecutor;
import org.apache.flink.connector.phoenix.internal.executor.TableSimpleStatementExecutor;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
import org.apache.flink.connector.phoenix.JdbcExecutionOptions;
import org.apache.flink.connector.phoenix.internal.connection.PhoneixJdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.phoenix.internal.executor.TableBufferedStatementExecutor;
import org.apache.flink.connector.phoenix.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.phoenix.internal.options.PhoenixJdbcOptions;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
......@@ -47,10 +49,6 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.function.Function;
import static org.apache.flink.table.data.RowData.createFieldGetter;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* PhoenixJdbcDynamicOutputFormatBuilder
*
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
......@@ -36,14 +35,22 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.*;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* PhoenixJdbcRowDataInputFormat
*
......@@ -127,7 +134,7 @@ public class PhoenixJdbcRowDataInputFormat extends RichInputFormat<RowData, Inpu
public void open(InputSplit inputSplit) throws IOException {
try {
if (inputSplit != null && this.parameterValues != null) {
for(int i = 0; i < this.parameterValues[inputSplit.getSplitNumber()].length; ++i) {
for (int i = 0; i < this.parameterValues[inputSplit.getSplitNumber()].length; ++i) {
Object param = this.parameterValues[inputSplit.getSplitNumber()][i];
if (param instanceof String) {
this.statement.setString(i + 1, (String)param);
......@@ -219,7 +226,7 @@ public class PhoenixJdbcRowDataInputFormat extends RichInputFormat<RowData, Inpu
} else {
GenericInputSplit[] ret = new GenericInputSplit[this.parameterValues.length];
for(int i = 0; i < ret.length; ++i) {
for (int i = 0; i < ret.length; ++i) {
ret[i] = new GenericInputSplit(i, ret.length);
}
......@@ -248,7 +255,6 @@ public class PhoenixJdbcRowDataInputFormat extends RichInputFormat<RowData, Inpu
private boolean namespaceMappingEnabled;
private boolean mapSystemTablesEnabled;
public Builder() {
}
......@@ -317,6 +323,7 @@ public class PhoenixJdbcRowDataInputFormat extends RichInputFormat<RowData, Inpu
this.namespaceMappingEnabled = namespaceMappingEnabled;
return this;
}
public Builder setMapSystemTablesToNamespace(Boolean mapSystemTablesEnabled) {
this.mapSystemTablesEnabled = mapSystemTablesEnabled;
return this;
......@@ -332,7 +339,9 @@ public class PhoenixJdbcRowDataInputFormat extends RichInputFormat<RowData, Inpu
PhoenixJdbcRowDataInputFormat.LOG.debug("No input splitting configured (data will be read with parallelism 1).");
}
return new PhoenixJdbcRowDataInputFormat(new PhoneixJdbcConnectionProvider(this.connOptionsBuilder.build(),this.namespaceMappingEnabled,this.mapSystemTablesEnabled), this.fetchSize, this.autoCommit, this.parameterValues, this.queryTemplate, this.resultSetType, this.resultSetConcurrency, this.rowConverter, this.rowDataTypeInfo,this.namespaceMappingEnabled,this.mapSystemTablesEnabled);
return new PhoenixJdbcRowDataInputFormat(new PhoneixJdbcConnectionProvider(this.connOptionsBuilder.build(),this.namespaceMappingEnabled,this.mapSystemTablesEnabled),
this.fetchSize, this.autoCommit, this.parameterValues, this.queryTemplate, this.resultSetType, this.resultSetConcurrency, this.rowConverter, this.rowDataTypeInfo,
this.namespaceMappingEnabled,this.mapSystemTablesEnabled);
}
}
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import org.apache.flink.runtime.state.FunctionInitializationContext;
......
......@@ -17,9 +17,12 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.getFieldFromResultSet;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
......@@ -35,8 +38,6 @@ import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
......@@ -49,9 +50,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.getFieldFromResultSet;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link TableFunction} to query fields from JDBC by keys. The query template like:
......
......@@ -17,9 +17,11 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
......@@ -39,8 +41,6 @@ import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
......@@ -51,8 +51,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A lookup function for {@link PhoenixDynamicTableSource}. */
@Internal
......@@ -210,7 +210,7 @@ public class PhoenixRowDataLookupFunction extends TableFunction<RowData> {
private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
Connection dbConn = connectionProvider.getOrEstablishConnection();
statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, keyNames);
LOG.info("executor query SQL : "+query);
LOG.info("executor query SQL : " + query);
}
@Override
......
......@@ -17,9 +17,11 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.phoenix.PhoenixInputFormat;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
......@@ -46,9 +48,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Objects;
import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** {@link TableSource} for JDBC. */
public class PhoenixTableSource
implements StreamTableSource<Row>, ProjectableTableSource<Row>, LookupableTableSource<Row> {
......
......@@ -17,10 +17,41 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_DRIVER;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_LOOKUP_CACHE_MAX_ROWS;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_LOOKUP_CACHE_TTL;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_LOOKUP_MAX_RETRIES;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_PASSWORD;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_READ_FETCH_SIZE;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_READ_PARTITION_COLUMN;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_READ_PARTITION_LOWER_BOUND;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_READ_PARTITION_NUM;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_READ_PARTITION_UPPER_BOUND;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_READ_QUERY;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_TABLE;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_TYPE_VALUE_JDBC;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_URL;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_USERNAME;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_WRITE_FLUSH_INTERVAL;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_WRITE_FLUSH_MAX_ROWS;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.CONNECTOR_WRITE_MAX_RETRIES;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.PHOENIX_SCHEMA_MAP_SYSTEMTABLE_ENABLE;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.PHOENIX_SCHEMA_NAMESPACE_MAPPING_ENABLE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.phoenix.dialect.JdbcDialects;
import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;
......@@ -37,17 +68,11 @@ import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import java.util.*;
import static org.apache.flink.connector.phoenix.utils.PhoenixJdbcValidator.*;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.*;
import static org.apache.flink.table.descriptors.Schema.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class PhoenixTableSourceSinkFactory
implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {
......@@ -115,10 +140,6 @@ public class PhoenixTableSourceSinkFactory
// comment
//properties.add(COMMENT);
return properties;
}
......
......@@ -17,10 +17,10 @@
*
*/
package org.apache.flink.connector.phoenix.table;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -46,8 +46,6 @@ import org.apache.flink.types.Row;
import java.util.Arrays;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** An upsert {@link UpsertStreamTableSink} for JDBC. */
public class PhoenixUpsertTableSink implements UpsertStreamTableSink<Row> {
......
......@@ -17,10 +17,19 @@
*
*/
package org.apache.flink.connector.phoenix.utils;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
......@@ -37,9 +46,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.*;
import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
/** Utils for jdbc type. */
@Internal
public class JdbcTypeUtil {
......
......@@ -17,19 +17,17 @@
*
*/
package org.apache.flink.connector.phoenix.utils;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Utils for jdbc connectors. */
public class JdbcUtils {
......
......@@ -17,9 +17,10 @@
*
*/
package org.apache.flink.connector.phoenix.utils;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.dialect.JdbcDialects;
......@@ -31,8 +32,6 @@ import org.apache.flink.util.Preconditions;
import java.util.Optional;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
/** The validator for JDBC. */
@Internal
public class PhoenixJdbcValidator extends ConnectorDescriptorValidator {
......
......@@ -17,16 +17,16 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Optional;
import javax.annotation.Nullable;
/** JDBC connection options. */
@PublicEvolving
public class JdbcConnectionOptions implements Serializable {
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix;
import org.apache.commons.lang.StringUtils;
......@@ -39,14 +37,23 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.*;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
* using the supplied InputFormatBuilder. A valid RowTypeInfo must be properly configured in the
......@@ -146,13 +153,13 @@ public class PhoenixInputFormat extends RichInputFormat<Row, InputSplit>
dbConn.setAutoCommit(autoCommit);
}
LOG.debug("openInputFormat query :" +queryTemplate);
LOG.debug("openInputFormat query :" + queryTemplate);
//删除 ` 号 phoenix中不支持
String initQuery = StringUtils.remove(queryTemplate, "\\`");
LOG.debug("openInputFormat initQuery :" +initQuery);
LOG.debug("openInputFormat initQuery :" + initQuery);
//将 " 双引号替换成 ' 单引号
String replaceQuery = StringUtils.replace(initQuery, "\"", "'");
LOG.info("openInputFormat replaceQuery :" +replaceQuery);
LOG.info("openInputFormat replaceQuery :" + replaceQuery);
statement = dbConn.prepareStatement(replaceQuery, resultSetType, resultSetConcurrency);
if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
......@@ -433,7 +440,6 @@ public class PhoenixInputFormat extends RichInputFormat<Row, InputSplit>
return this;
}
public PhoenixInputFormat finish() {
format.connectionProvider =
//new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.dialect;
import org.apache.flink.table.api.TableSchema;
......
......@@ -17,10 +17,10 @@
*
*/
package org.apache.flink.connector.phoenix.dialect;
import static java.lang.String.format;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
......@@ -32,8 +32,6 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.lang.String.format;
/** Handle the SQL dialect of jdbc driver. */
@Internal
public interface JdbcDialect extends Serializable {
......
......@@ -17,8 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.dialect;
import java.util.Arrays;
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.dialect;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
......@@ -121,7 +120,6 @@ public class PhoenixDialect extends AbstractDialect {
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
......
......@@ -17,20 +17,20 @@
*
*/
package org.apache.flink.connector.phoenix.internal;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Flushable;
import java.io.IOException;
import java.sql.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Base jdbc outputFormat. */
public abstract class AbstractJdbcOutputFormat<T> extends RichOutputFormat<T> implements Flushable {
......
......@@ -17,7 +17,6 @@
*
*/
package org.apache.flink.connector.phoenix.internal;
import org.apache.flink.annotation.Internal;
......@@ -29,9 +28,10 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import java.io.IOException;
import javax.annotation.Nonnull;
/** A generic SinkFunction for JDBC. */
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
......
......@@ -17,9 +17,11 @@
*
*/
package org.apache.flink.connector.phoenix.internal;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -32,14 +34,10 @@ import org.apache.flink.connector.phoenix.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcOptions;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.connector.phoenix.utils.JdbcUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
......@@ -51,8 +49,10 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A JDBC outputFormat that supports batching records before writing records to database. */
@Internal
......@@ -148,7 +148,6 @@ public class JdbcBatchingOutputFormat<
TimeUnit.MILLISECONDS);
}
}
private JdbcExec createAndOpenStatementExecutor(
......@@ -243,7 +242,7 @@ public class JdbcBatchingOutputFormat<
if (batchCount > 0) {
try {
LOG.info("关闭连接前 刷写数据 !!! batchCount: "+batchCount);
LOG.info("关闭连接前 刷写数据 !!! batchCount: " + batchCount);
flush();
} catch (Exception e) {
LOG.warn("Writing records to JDBC failed.", e);
......@@ -339,7 +338,6 @@ public class JdbcBatchingOutputFormat<
.withFieldTypes(fieldTypes)
.build();
if (dml.getKeyFields().isPresent() && dml.getKeyFields().get().length > 0) {
return new TableJdbcUpsertOutputFormat(
new PhoneixJdbcConnectionProvider(options,this.options.isNamespaceMappingEnabled(),this.options.isMapSystemTablesEnabled()),
......
......@@ -19,6 +19,10 @@
package org.apache.flink.connector.phoenix.internal;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.getPrimaryKey;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkArgument;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -30,9 +34,6 @@ import org.apache.flink.connector.phoenix.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
......@@ -40,9 +41,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.function.Function;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.getPrimaryKey;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkArgument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TableJdbcUpsertOutputFormat
extends JdbcBatchingOutputFormat<
......@@ -122,8 +122,6 @@ class TableJdbcUpsertOutputFormat
}
}
@Override
public synchronized void close() {
try {
......
......@@ -17,16 +17,15 @@
*
*/
package org.apache.flink.connector.phoenix.internal.connection;
import org.apache.flink.annotation.Internal;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.SQLException;
import javax.annotation.Nullable;
/** JDBC connection provider. */
@Internal
public interface JdbcConnectionProvider {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment