Unverified Commit da3e00bc authored by gaogao110's avatar gaogao110 Committed by GitHub

[Fix][connctor] flink-connector-phoenix and Update PhoenixDynamicTableFactory (#682)

parent 3062cecd
......@@ -129,10 +129,13 @@ public class JdbcBatchingOutputFormat<
() -> {
synchronized (JdbcBatchingOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
//if batch count > 0 to flush
if (batchCount > 0) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
}
......
......@@ -61,7 +61,8 @@ public final class TableSimpleStatementExecutor implements JdbcBatchStatementExe
@Override
public void executeBatch(Connection conn) throws SQLException {
st.executeBatch();
//st.executeBatch();
conn.commit();
}
@Override
......
......@@ -52,8 +52,8 @@ public class PhoenixDynamicTableFactory implements DynamicTableSourceFactory, Dy
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.buffer-flush.interval").durationType().defaultValue(Duration.ofSeconds(1L)).withDescription("The flush interval mills, over this time, asynchronous threads will flush data.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries").intType().defaultValue(3).withDescription("The max retry times if writing records to database failed.");
public static final ConfigOption<Boolean> SCHEMA_NAMESPACE_MAPPING_ENABLE = ConfigOptions.key("phoenix.schema.isNamespaceMappingEnabled").booleanType().defaultValue(false).withDescription("The JDBC phoenix Schema isNamespaceMappingEnabled.");
public static final ConfigOption<Boolean> SCHEMA_MAP_SYSTEMTABLE_ENABLE = ConfigOptions.key("phoenix.schema.mapSystemTablesToNamespace").booleanType().defaultValue(false).withDescription("The JDBC phoenix mapSystemTablesToNamespace.");
public static final ConfigOption<Boolean> SCHEMA_NAMESPACE_MAPPING_ENABLE = ConfigOptions.key("phoenix.schema.isnamespacemappingenabled").booleanType().defaultValue(false).withDescription("The JDBC phoenix Schema isNamespaceMappingEnabled.");
public static final ConfigOption<Boolean> SCHEMA_MAP_SYSTEMTABLE_ENABLE = ConfigOptions.key("phoenix.schema.mapsystemtablestonamespace").booleanType().defaultValue(false).withDescription("The JDBC phoenix mapSystemTablesToNamespace.");
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
......@@ -73,7 +73,6 @@ public class PhoenixDynamicTableFactory implements DynamicTableSourceFactory, Dy
helper.validate();
this.validateConfigOptions(config);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
//return new JdbcDynamicTableSource(this.getJdbcOptions(helper.getOptions()), this.getJdbcReadOptions(helper.getOptions()), this.getJdbcLookupOptions(helper.getOptions()), physicalSchema);
return new PhoenixDynamicTableSource(this.getJdbcOptions(helper.getOptions()), this.getJdbcReadOptions(helper.getOptions()), this.getJdbcLookupOptions(helper.getOptions()),physicalSchema);
}
......@@ -82,10 +81,13 @@ public class PhoenixDynamicTableFactory implements DynamicTableSourceFactory, Dy
private PhoenixJdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
String url = (String)readableConfig.get(URL);
PhoenixJdbcOptions.Builder builder = PhoenixJdbcOptions.builder().setDBUrl(url).setTableName((String)readableConfig.get(TABLE_NAME)).setDialect((JdbcDialect)JdbcDialects.get(url).get()).setParallelism((Integer)readableConfig.getOptional(FactoryUtil.SINK_PARALLELISM).orElse((Integer) null)).setConnectionCheckTimeoutSeconds((int)((Duration)readableConfig.get(MAX_RETRY_TIMEOUT)).getSeconds()).setNamespaceMappingEnabled(readableConfig.get(SCHEMA_NAMESPACE_MAPPING_ENABLE)).setMapSystemTablesToNamespace(readableConfig.get(SCHEMA_MAP_SYSTEMTABLE_ENABLE));
PhoenixJdbcOptions.Builder builder = PhoenixJdbcOptions.builder().setDBUrl(url).setTableName((String)readableConfig.get(TABLE_NAME)).setDialect((JdbcDialect)JdbcDialects.get(url).get()).setParallelism((Integer)readableConfig.getOptional(FactoryUtil.SINK_PARALLELISM).orElse((Integer) null)).setConnectionCheckTimeoutSeconds((int)((Duration)readableConfig.get(MAX_RETRY_TIMEOUT)).getSeconds());
readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
readableConfig.getOptional(SCHEMA_NAMESPACE_MAPPING_ENABLE).ifPresent(builder::setNamespaceMappingEnabled);
readableConfig.getOptional(SCHEMA_MAP_SYSTEMTABLE_ENABLE).ifPresent(builder::setMapSystemTablesToNamespace);
return builder.build();
}
......
......@@ -87,7 +87,7 @@ public class PhoenixJdbcDynamicOutputFormatBuilder implements Serializable {
if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0) {
// upsert query
return new JdbcBatchingOutputFormat<>(
new PhoneixJdbcConnectionProvider(jdbcOptions),
new PhoneixJdbcConnectionProvider(jdbcOptions,jdbcOptions.getNamespaceMappingEnabled(),jdbcOptions.getMapSystemTablesToNamespace()),
executionOptions,
ctx ->
createBufferReduceExecutor(
......@@ -101,7 +101,7 @@ public class PhoenixJdbcDynamicOutputFormatBuilder implements Serializable {
.getInsertIntoStatement(
dmlOptions.getTableName(), dmlOptions.getFieldNames());
return new JdbcBatchingOutputFormat<>(
new PhoneixJdbcConnectionProvider(jdbcOptions),
new PhoneixJdbcConnectionProvider(jdbcOptions,jdbcOptions.getNamespaceMappingEnabled(),jdbcOptions.getMapSystemTablesToNamespace()),
executionOptions,
ctx ->
createSimpleBufferedExecutor(
......
......@@ -52,7 +52,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A lookup function for {@link JdbcDynamicTableSource}. */
/** A lookup function for {@link PhoenixDynamicTableSource}. */
@Internal
public class PhoenixRowDataLookupFunction extends TableFunction<RowData> {
......@@ -208,6 +208,7 @@ public class PhoenixRowDataLookupFunction extends TableFunction<RowData> {
private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
Connection dbConn = connectionProvider.getOrEstablishConnection();
statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, keyNames);
LOG.info("executor query SQL : "+query);
}
@Override
......
......@@ -130,10 +130,13 @@ public class JdbcBatchingOutputFormat<
() -> {
synchronized (JdbcBatchingOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
//if batch count > 0 to flush
if (batchCount > 0) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
}
......@@ -190,6 +193,7 @@ public class JdbcBatchingOutputFormat<
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
LOG.debug("pre flush size = {} , retry times = {}", batchCount,i);
attemptFlush();
//conn.commit();
batchCount = 0;
......
......@@ -61,7 +61,8 @@ public final class TableSimpleStatementExecutor implements JdbcBatchStatementExe
@Override
public void executeBatch(Connection conn) throws SQLException {
st.executeBatch();
//st.executeBatch();
conn.commit();
}
@Override
......
......@@ -52,8 +52,8 @@ public class PhoenixDynamicTableFactory implements DynamicTableSourceFactory, Dy
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.buffer-flush.interval").durationType().defaultValue(Duration.ofSeconds(1L)).withDescription("The flush interval mills, over this time, asynchronous threads will flush data.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries").intType().defaultValue(3).withDescription("The max retry times if writing records to database failed.");
public static final ConfigOption<Boolean> SCHEMA_NAMESPACE_MAPPING_ENABLE = ConfigOptions.key("phoenix.schema.isNamespaceMappingEnabled").booleanType().defaultValue(false).withDescription("The JDBC phoenix Schema isNamespaceMappingEnabled.");
public static final ConfigOption<Boolean> SCHEMA_MAP_SYSTEMTABLE_ENABLE = ConfigOptions.key("phoenix.schema.mapSystemTablesToNamespace").booleanType().defaultValue(false).withDescription("The JDBC phoenix mapSystemTablesToNamespace.");
public static final ConfigOption<Boolean> SCHEMA_NAMESPACE_MAPPING_ENABLE = ConfigOptions.key("phoenix.schema.isnamespacemappingenabled").booleanType().defaultValue(false).withDescription("The JDBC phoenix Schema isNamespaceMappingEnabled.");
public static final ConfigOption<Boolean> SCHEMA_MAP_SYSTEMTABLE_ENABLE = ConfigOptions.key("phoenix.schema.mapsystemtablestonamespace").booleanType().defaultValue(false).withDescription("The JDBC phoenix mapSystemTablesToNamespace.");
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
......@@ -73,7 +73,6 @@ public class PhoenixDynamicTableFactory implements DynamicTableSourceFactory, Dy
helper.validate();
this.validateConfigOptions(config);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
//return new JdbcDynamicTableSource(this.getJdbcOptions(helper.getOptions()), this.getJdbcReadOptions(helper.getOptions()), this.getJdbcLookupOptions(helper.getOptions()), physicalSchema);
return new PhoenixDynamicTableSource(this.getJdbcOptions(helper.getOptions()), this.getJdbcReadOptions(helper.getOptions()), this.getJdbcLookupOptions(helper.getOptions()),physicalSchema);
}
......
......@@ -87,7 +87,7 @@ public class PhoenixJdbcDynamicOutputFormatBuilder implements Serializable {
if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0) {
// upsert query
return new JdbcBatchingOutputFormat<>(
new PhoneixJdbcConnectionProvider(jdbcOptions),
new PhoneixJdbcConnectionProvider(jdbcOptions,jdbcOptions.getNamespaceMappingEnabled(),jdbcOptions.getMapSystemTablesToNamespace()),
executionOptions,
ctx ->
createBufferReduceExecutor(
......@@ -101,7 +101,7 @@ public class PhoenixJdbcDynamicOutputFormatBuilder implements Serializable {
.getInsertIntoStatement(
dmlOptions.getTableName(), dmlOptions.getFieldNames());
return new JdbcBatchingOutputFormat<>(
new PhoneixJdbcConnectionProvider(jdbcOptions),
new PhoneixJdbcConnectionProvider(jdbcOptions,jdbcOptions.getNamespaceMappingEnabled(),jdbcOptions.getMapSystemTablesToNamespace()),
executionOptions,
ctx ->
createSimpleBufferedExecutor(
......
......@@ -52,7 +52,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A lookup function for {@link JdbcDynamicTableSource}. */
/** A lookup function for {@link PhoenixRowDataLookupFunction}. */
@Internal
public class PhoenixRowDataLookupFunction extends TableFunction<RowData> {
......@@ -208,6 +208,7 @@ public class PhoenixRowDataLookupFunction extends TableFunction<RowData> {
private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
Connection dbConn = connectionProvider.getOrEstablishConnection();
statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, keyNames);
LOG.info("executor query SQL : "+query);
}
@Override
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.connector.phoenix.table.PhoenixDynamicTableFactory
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.connector.phoenix.table.PhoenixTableSourceSinkFactory
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment