Commit d8624bbc authored by gaogao110's avatar gaogao110

fix bug Phoenix Connector

parent d52a6071
...@@ -190,7 +190,7 @@ public class JdbcBatchingOutputFormat< ...@@ -190,7 +190,7 @@ public class JdbcBatchingOutputFormat<
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try { try {
attemptFlush(); attemptFlush();
conn.commit(); //conn.commit();
batchCount = 0; batchCount = 0;
break; break;
} catch (SQLException e) { } catch (SQLException e) {
...@@ -236,8 +236,8 @@ public class JdbcBatchingOutputFormat< ...@@ -236,8 +236,8 @@ public class JdbcBatchingOutputFormat<
if (batchCount > 0) { if (batchCount > 0) {
try { try {
flush();
LOG.info("关闭连接前 刷写数据 !!! batchCount: "+batchCount); LOG.info("关闭连接前 刷写数据 !!! batchCount: "+batchCount);
flush();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Writing records to JDBC failed.", e); LOG.warn("Writing records to JDBC failed.", e);
throw new RuntimeException("Writing records to JDBC failed.", e); throw new RuntimeException("Writing records to JDBC failed.", e);
......
...@@ -76,7 +76,7 @@ class TableJdbcUpsertOutputFormat ...@@ -76,7 +76,7 @@ class TableJdbcUpsertOutputFormat
@Override @Override
public void open(int taskNumber, int numTasks) throws IOException { public void open(int taskNumber, int numTasks) throws IOException {
//super.open(taskNumber, numTasks); super.open(taskNumber, numTasks);
try { try {
conn = connectionProvider.getOrEstablishConnection(); conn = connectionProvider.getOrEstablishConnection();
} catch (Exception e) { } catch (Exception e) {
......
...@@ -60,7 +60,7 @@ class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T ...@@ -60,7 +60,7 @@ class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T
@Override @Override
public void addToBatch(T record) { public void addToBatch(T record) {
LOG.error("添加数据:" + record.toString()); LOG.info("添加数据:" + record.toString());
batch.add(valueTransformer.apply(record)); batch.add(valueTransformer.apply(record));
} }
...@@ -73,8 +73,10 @@ class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T ...@@ -73,8 +73,10 @@ class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T
st.executeUpdate(); st.executeUpdate();
} }
//st.executeBatch(); //st.executeBatch();
//batch.clear();
LOG.info("提交数据:" +batch.size() );
connection.commit(); connection.commit();
batch.clear();
} }
} }
......
...@@ -38,7 +38,7 @@ public final class TableBufferedStatementExecutor implements JdbcBatchStatementE ...@@ -38,7 +38,7 @@ public final class TableBufferedStatementExecutor implements JdbcBatchStatementE
private final Function<RowData, RowData> valueTransform; private final Function<RowData, RowData> valueTransform;
private final List<RowData> buffer = new ArrayList<>(); private final List<RowData> buffer = new ArrayList<>();
public TableBufferedStatementExecutor( public TableBufferedStatementExecutor(
JdbcBatchStatementExecutor<RowData> statementExecutor, JdbcBatchStatementExecutor<RowData> statementExecutor,
Function<RowData, RowData> valueTransform) { Function<RowData, RowData> valueTransform) {
this.statementExecutor = statementExecutor; this.statementExecutor = statementExecutor;
......
...@@ -132,6 +132,8 @@ public class PhoenixDynamicTableFactory implements DynamicTableSourceFactory, Dy ...@@ -132,6 +132,8 @@ public class PhoenixDynamicTableFactory implements DynamicTableSourceFactory, Dy
Set<ConfigOption<?>> requiredOptions = new HashSet(); Set<ConfigOption<?>> requiredOptions = new HashSet();
requiredOptions.add(URL); requiredOptions.add(URL);
requiredOptions.add(TABLE_NAME); requiredOptions.add(TABLE_NAME);
requiredOptions.add(SCHEMA_NAMESPACE_MAPPING_ENABLE);
requiredOptions.add(SCHEMA_MAP_SYSTEMTABLE_ENABLE);
return requiredOptions; return requiredOptions;
} }
...@@ -154,8 +156,8 @@ public class PhoenixDynamicTableFactory implements DynamicTableSourceFactory, Dy ...@@ -154,8 +156,8 @@ public class PhoenixDynamicTableFactory implements DynamicTableSourceFactory, Dy
optionalOptions.add(SINK_MAX_RETRIES); optionalOptions.add(SINK_MAX_RETRIES);
optionalOptions.add(FactoryUtil.SINK_PARALLELISM); optionalOptions.add(FactoryUtil.SINK_PARALLELISM);
optionalOptions.add(MAX_RETRY_TIMEOUT); optionalOptions.add(MAX_RETRY_TIMEOUT);
optionalOptions.add(SCHEMA_NAMESPACE_MAPPING_ENABLE); //optionalOptions.add(SCHEMA_NAMESPACE_MAPPING_ENABLE);
optionalOptions.add(SCHEMA_MAP_SYSTEMTABLE_ENABLE); //optionalOptions.add(SCHEMA_MAP_SYSTEMTABLE_ENABLE);
return optionalOptions; return optionalOptions;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*//*
package org.apache.flink.connector.phoenix.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.connector.phoenix.JdbcExecutionOptions;
import org.apache.flink.connector.phoenix.internal.AbstractJdbcOutputFormat;
import org.apache.flink.connector.phoenix.internal.PhoenixBatchingOutputFormat;
import org.apache.flink.connector.phoenix.internal.connection.PhoneixJdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.phoenix.internal.options.JdbcOptions;
import org.apache.flink.connector.phoenix.internal.options.PhoenixSinkFunction;
import org.apache.flink.connector.phoenix.utils.JdbcTypeUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sinks.UpsertStreamTableSink;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import java.util.Arrays;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
*/
/** An upsert {@link UpsertStreamTableSink} for JDBC. *//*
public class PhoenixUpsertTableSink implements UpsertStreamTableSink<Row> {
private final TableSchema schema;
private final JdbcOptions options;
private final int flushMaxSize;
private final long flushIntervalMills;
private final int maxRetryTime;
private String[] keyFields;
private boolean isAppendOnly;
private PhoenixUpsertTableSink(
TableSchema schema,
JdbcOptions options,
int flushMaxSize,
long flushIntervalMills,
int maxRetryTime) {
this.schema = TableSchemaUtils.checkOnlyPhysicalColumns(schema);
this.options = options;
this.flushMaxSize = flushMaxSize;
this.flushIntervalMills = flushIntervalMills;
this.maxRetryTime = maxRetryTime;
}
private PhoenixBatchingOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>>
newFormat() {
if (!isAppendOnly && (keyFields == null || keyFields.length == 0)) {
throw new UnsupportedOperationException("JdbcUpsertTableSink can not support ");
}
// sql types
int[] jdbcSqlTypes =
Arrays.stream(schema.getFieldTypes())
.mapToInt(JdbcTypeUtil::typeInformationToSqlType)
.toArray();
return PhoenixBatchingOutputFormat.builder()
.setOptions(options)
.setFieldNames(schema.getFieldNames())
.setFlushMaxSize(flushMaxSize)
.setFlushIntervalMills(flushIntervalMills)
.setMaxRetryTimes(maxRetryTime)
.setFieldTypes(jdbcSqlTypes)
.setKeyFields(keyFields)
.build();
}
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream
.addSink(new PhoenixSinkFunction<>(this.options,new PhoneixJdbcConnectionProvider(this.options,this.options.isNamespaceMappingEnabled(),this.options.isMapSystemTablesEnabled())))
.setParallelism(dataStream.getParallelism())
.name(
TableConnectorUtils.generateRuntimeName(
this.getClass(), schema.getFieldNames()));
}
@Override
public void setKeyFields(String[] keys) {
this.keyFields = keys;
}
@Override
public void setIsAppendOnly(Boolean isAppendOnly) {
this.isAppendOnly = isAppendOnly;
}
@Override
public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
}
@Override
public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
}
@Override
public String[] getFieldNames() {
return schema.getFieldNames();
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return schema.getFieldTypes();
}
@Override
public TableSink<Tuple2<Boolean, Row>> configure(
String[] fieldNames, TypeInformation<?>[] fieldTypes) {
if (!Arrays.equals(getFieldNames(), fieldNames)
|| !Arrays.equals(getFieldTypes(), fieldTypes)) {
throw new ValidationException(
"Reconfiguration with different fields is not allowed. "
+ "Expected: "
+ Arrays.toString(getFieldNames())
+ " / "
+ Arrays.toString(getFieldTypes())
+ ". "
+ "But was: "
+ Arrays.toString(fieldNames)
+ " / "
+ Arrays.toString(fieldTypes));
}
PhoenixUpsertTableSink copy =
new PhoenixUpsertTableSink(
schema, options, flushMaxSize, flushIntervalMills, maxRetryTime);
copy.keyFields = keyFields;
return copy;
}
public static Builder builder() {
return new Builder();
}
@Override
public boolean equals(Object o) {
if (o instanceof PhoenixUpsertTableSink) {
PhoenixUpsertTableSink sink = (PhoenixUpsertTableSink) o;
return Objects.equals(schema, sink.schema)
&& Objects.equals(options, sink.options)
&& Objects.equals(flushMaxSize, sink.flushMaxSize)
&& Objects.equals(flushIntervalMills, sink.flushIntervalMills)
&& Objects.equals(maxRetryTime, sink.maxRetryTime)
&& Arrays.equals(keyFields, sink.keyFields)
&& Objects.equals(isAppendOnly, sink.isAppendOnly);
} else {
return false;
}
}
*/
/** Builder for a {@link PhoenixUpsertTableSink}. *//*
public static class Builder {
protected TableSchema schema;
private JdbcOptions options;
protected int flushMaxSize = AbstractJdbcOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
protected long flushIntervalMills = AbstractJdbcOutputFormat.DEFAULT_FLUSH_INTERVAL_MILLS;
protected int maxRetryTimes = JdbcExecutionOptions.DEFAULT_MAX_RETRY_TIMES;
*/
/** required, table schema of this table source. *//*
public Builder setTableSchema(TableSchema schema) {
this.schema = JdbcTypeUtil.normalizeTableSchema(schema);
return this;
}
*/
/** required, jdbc options. *//*
public Builder setOptions(JdbcOptions options) {
this.options = options;
return this;
}
*/
/**
* optional, flush max size (includes all append, upsert and delete records), over this
* number of records, will flush data.
*//*
public Builder setFlushMaxSize(int flushMaxSize) {
this.flushMaxSize = flushMaxSize;
return this;
}
*/
/** optional, flush interval mills, over this time, asynchronous threads will flush data. *//*
public Builder setFlushIntervalMills(long flushIntervalMills) {
this.flushIntervalMills = flushIntervalMills;
return this;
}
*/
/** optional, max retry times for jdbc connector. *//*
public Builder setMaxRetryTimes(int maxRetryTimes) {
this.maxRetryTimes = maxRetryTimes;
return this;
}
public PhoenixUpsertTableSink build() {
checkNotNull(schema, "No schema supplied.");
checkNotNull(options, "No options supplied.");
return new PhoenixUpsertTableSink(
schema, options, flushMaxSize, flushIntervalMills, maxRetryTimes);
}
}
}
*/
...@@ -125,5 +125,13 @@ ...@@ -125,5 +125,13 @@
<artifactId>dlink-metadata-phoenix</artifactId> <artifactId>dlink-metadata-phoenix</artifactId>
<scope>${scope.runtime}</scope> <scope>${scope.runtime}</scope>
</dependency> </dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-phoenix-1.13</artifactId>
<version>0.6.0</version>
<scope>${scope.runtime}</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment