Unverified Commit 5fe68097 authored by DarrenDa's avatar DarrenDa Committed by GitHub

add dlink-connector-pulsar (#723)

* add dlink-connector-pulsar

* replace json resolve
Co-authored-by: 's avatarfada.yu <fada.yu@zhaopin.com.cn>
parent 04937cb6
## dlink-connector-pulsar
> 概要说明:
> 实现依附:https://gitee.com/apache/flink/tree/release-1.14/flink-connectors
* Flink 官方自1.14版本支持 Flink-pulsar-connector(目前未支持 Flink-sql)
* 在此版本前,自主实现了Flink-pulsar-connector,本次Flink-sql的实现向官方Flink-connector-pulsar对齐,更好的兼容使用,实现性能最优!
* 就生产经验,避坑处理
* 本次Pulsar版本使用版本:2.8.2 Flink版本:1.14.3
* Pulsar-connector应用广泛,在消息队列的使用中,FlinkSql的开发中具有总要作用意义。
## ★详情介绍 Pulsar-SQL Connector
### Dependencies
In order to use the Pulsar connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
* Maven dependency
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-Pulsar_2.11</artifactId>
<version>1.14.3</version>
</dependency>
```
### How to create a Pulsar table
```
CREATE TABLE source_pulsar_n(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR,
b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
) WITH (
'connector' = 'pulsar',
'connector.version' = 'universal',
'connector.topic' = 'persistent://dlink/dev/context.pulsar',
'connector.service-url' = 'pulsar://pulsar-dlink-n.stream.com:6650',
'connector.subscription-name' = 'tmp_print_detail',
'connector.subscription-type' = 'Shared',
'connector.subscription-initial-position' = 'Latest',
'update-mode' = 'append',
'format' = 'json',
'format.derive-schema' = 'true'
);
```
### Data Type Mapping
Pulsar stores message keys and values as bytes, so Pulsar doesn’t have schema or data types. The Pulsar messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.
### Connector Options
| Option | Required | Default | Type | Description |
| --------------------------------------- | ----------------- | ------- | ------ | ------------------------------------------------------------ |
| connector | required | (none) | String | Specify what connector to use, for pulsar use `'pulsar'`. |
| connector.version | required | (none) | String | universal |
| connector.topic | required for sink | (none) | String | Topic name(s) to read data from when the table is used as source |
| connector.service-url | optional | (none) | String | The address of the pulsar |
| connector.subscription-name | required | (none) | String | The subscription name of the Pulsar |
| connector.subscription-type | required | (none) | String | A subscription model of the Pulsar【Shared、Exclusive、Key_Shared、Failover】 |
| connector.subscription-initial-position | required | (none) | String | initial-position[EARLIEST、LATEST、TIMESTAMP] |
| update-mode | optional | (none) | String | append or upsert |
| format | optional | (none) | String | json、csv...... |
| format.derive-schema | optional | (none) | String | ture or false |
| | | | | |
## 🚀 快速上手
```shell
git clone https://github.com/DataLinkDC/dlink.git
cd dlink-connector/dlink-connector-pulsar-1.14
mvn clean install -DskipTests -Dflink.version=$version
```
## 🎉 Features
* Key and Value Formats
Both the key and value part of a Pulsar record can be serialized to and deserialized from raw bytes using one of the given
* Value Format
Since a key is optional in Pulsar records, the following statement reads and writes records with a configured value format but without a key format. The 'format' option is a synonym for 'value.format'. All format options are prefixed with the format identifier.
## 👻 使用
```sql
-- Pulsar多集群形式,
-- 此处分 n、b 两个集群
--声明数据源
CREATE TABLE source_pulsar_n(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR,
createTime BIGINT,
b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
) WITH (
'connector' = 'pulsar',
'connector.version' = 'universal',
'connector.topic' = 'persistent://dlink/dev/context.pulsar',
'connector.service-url' = 'pulsar://pulsar-dlink-n.stream.com:6650',
'connector.subscription-name' = 'tmp_print_detail',
'connector.subscription-type' = 'Shared',
'connector.subscription-initial-position' = 'Latest',
'update-mode' = 'append',
'format' = 'json',
'format.derive-schema' = 'true'
);
CREATE TABLE source_pulsar_b(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR,
createTime BIGINT,
b_create_im_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
) WITH (
'connector' = 'pulsar',
'connector.version' = 'universal',
'connector.topic' = 'persistent://dlink/dev/context.pulsar',
'connector.service-url' = 'pulsar://pulsar-dlink-b.stream.com:6650',
'connector.subscription-name' = 'tmp_print_detail',
'connector.subscription-type' = 'Shared',
'connector.subscription-initial-position' = 'Latest',
'update-mode' = 'append',
'format' = 'json',
'format.derive-schema' = 'true'
);
-- 合并数据源
create view pulsar_source_all AS
select
requestId ,
`timestamp`,
`date`,
appId,
appName,
forwardTimeMs,
processingTim,
errCode,
userIp,
b_create_time
from source_pulsar_n
union all
select
requestId ,
`timestamp`,
`date`,
appId,
appName,
forwardTimeMs,
processingTim,
errCode,
userIp,
b_create_time
from source_pulsar_b;
-- 创建 sink
create table sink_pulsar_result(
requestId VARCHAR,
`timestamp` BIGINT,
`date` VARCHAR,
appId VARCHAR,
appName VARCHAR,
forwardTimeMs VARCHAR,
processingTimeMs INT,
errCode VARCHAR,
userIp VARCHAR
) with (
'connector' = 'print'
);
-- 执行逻辑
-- 查看 pulsar主题明细数据
insert into sink_pulsar_result
select
requestId ,
`timestamp`,
`date`,
appId,
appName,
forwardTimeMs,
processingTim,
errCode,
userIp,
b_create_time
from pulsar_source_all;
```
### 介绍
与Kafka对比
| 对比方面 | Kafka | Pulsar |
| --------------------------------------- | ----------------- | ------- |
| 模型概念 | producer – topic – consumer group – consumer | producer – topic -subsciption- consumer | Stri
| 消费模式 | 主要集中在流(Stream) 模式, 对单个partition是独占消费, 没有共享(Queue)的消费模式 | 提供了统一的消息模型和API. 流(Stream) 模式 – 独占和故障切换订阅方式 ; 队列(Queue)模式 – 共享订阅的方式 |
| 消息确认 | 使用偏移量 offset for sink | 使用专门的cursor管理. 累积确认和kafka效果一样; 提供单条或选择性确认 |
| 消息保留 | 根据设置的保留期来删除消息, 有可能消息没被消费, 过期后被删除, 不支持TTL | 消息只有被所有订阅消费后才会删除, 不会丢失数据,. 也运行设置保留期, 保留被消费的数据 . 支持TTL |
根本区别:Apache Pulsar和Apache Kafka之间的根本区别在于Apache Kafka是以分区为存储中心,而Apache Pulsar是以Segment为存储中心
性能对比:Pulsar性能比Kafka强许多,速度是Kafka的五倍,延迟降低了40%
### Pulsar补充介绍(消息体)
消息队列的读写......
核心概念
3.1 Messages(消息)
####Value / data payload:
消息携带的数据,所有 Pulsar 的消息携带原始 bytes,但是消息数据也需要遵循数据 schemas。
####Key:
消息可以被 Key 打标签。这可以对 topic 压缩之类的事情起作用。
####Properties:
可选的,用户定义属性的 key/value map。
####Producer name:
生产消息的 producer 的名称(producer 被自动赋予默认名称,但你也可以自己指定。)
#### Sequence ID:
在 topic 中,每个 Pulsar 消息属于一个有序的序列。消息的 sequence ID 是它在序列中的次序。
####Publish time:
消息发布的时间戳
####Event time:
可选的时间戳,应用可以附在消息上,代表某个事件发生的时间,例如,消息被处理时。如果没有明确的设置,那么 event time 为0。
####TypedMessageBuilder:
它用于构造消息。您可以使用TypedMessageBuilder设置消息属性,比如消息键、消息值。设置TypedMessageBuilder时,将键设置为字符串。如果您将键设置为其他类型,例如,AVRO对象,则键将作为字节发送,并且很难从消费者处取回AVRO对象。
### Subscriptions(订阅模式)
* 1 Exclusive(独占模式)
* 2 Failover(灾备模式)
* 3 Shared(共享模式)
* 4 Key_Shared(Key 共享模式)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-connectors</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-connector-pulsar-1.14</artifactId>
<properties>
<pulsar.version>2.8.2</pulsar.version>
<!--<scala.version>2.11.12</scala.version>-->
<scala.binary.version>2.11</scala.binary.version>
<hbase.version>1.4.3</hbase.version>
<hadoop.version>2.4.1</hadoop.version>
<!--<flink.shaded.version>9.0</flink.shaded.version>-->
<fastjson.version>1.2.7</fastjson.version>
<flink.version>1.14.5</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
<version>0.6.6-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- test -->
<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-streaming-java_2.11</artifactId>-->
<!--<version>${flink.version}</version>-->
<!--&lt;!&ndash;<scope>test</scope>&ndash;&gt;-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink-java dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink-scala dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Table ecosystem -->
<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<!-- Remove unneeded dependency, which is conflicting with our jetty-util version. -->
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-include-yarn_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--Test dependencies are only available for Hadoop-2.-->
<!--<dependency>-->
<!--<groupId>org.apache.hbase</groupId>-->
<!--<artifactId>hbase-server</artifactId>-->
<!--<version>${hbase.version}</version>-->
<!--<type>test-jar</type>-->
<!--<scope>test</scope>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<!-- This dependency is no longer shipped with the JDK since Java 9.-->
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
<exclusion>
<!-- This dependency is no longer shipped with the JDK since Java 9.-->
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.hbase</groupId>-->
<!--<artifactId>hbase-hadoop-compat</artifactId>-->
<!--<version>${hbase.version}</version>-->
<!--<scope>test</scope>-->
<!--<type>test-jar</type>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.hbase</groupId>-->
<!--<artifactId>hbase-hadoop2-compat</artifactId>-->
<!--<version>${hbase.version}</version>-->
<!--<scope>test</scope>-->
<!--<type>test-jar</type>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.pulsar</groupId>-->
<!--<artifactId>pulsar-client</artifactId>-->
<!--<version>${pulsar.version}</version>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>com.google.protobuf</groupId>-->
<!--<artifactId>protobuf-java</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.2</version>
<executions>
<execution>
<id>copy-resources</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<!--suppress UnresolvedMavenProperty -->
<copy todir="${buildJarPath}/../build/connectors">
<fileset dir="target/">
<include name="${project.artifactId}-${project.version}.jar"/>
</fileset>
</copy>
<!--<move file="${buildJarPath}/../build/connectors/${project.artifactId}-${project.version}.jar"-->
<!--tofile="${buildJarPath}/../build/connectors/${project.artifactId}.jar" />-->
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.connector.pulsar;
import com.sun.istack.internal.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* @author DarrenDa
* * @version 1.0
* * @Desc:
**/
/**
* A version-agnostic Pulsar {@link DynamicTableSink}.
*/
@Internal
public class PulsarDynamicSink implements DynamicTableSink {
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
/**
* Metadata that is appended at the end of a physical sink row.
*/
protected List<String> metadataKeys;
// --------------------------------------------------------------------------------------------
// Format attributes
// --------------------------------------------------------------------------------------------
/**
* Data type of consumed data type.
*/
protected DataType consumedDataType;
/**
* Data type to configure the formats.
*/
protected final DataType physicalDataType;
/**
* Optional format for encoding to Pulsar.
*/
protected final @Nullable
EncodingFormat<SerializationSchema<RowData>> encodingFormat;
// --------------------------------------------------------------------------------------------
// Pulsar-specific attributes
// --------------------------------------------------------------------------------------------
/**
* The Pulsar topic to write to.
*/
protected final String topic;
/**
* The Pulsar service url config.
*/
protected final String serviceUrl;
/**
* The Pulsar update mode to.
*/
protected final String updateMode;
/**
* Properties for the Pulsar producer.
*/
protected final Properties pulsarProducerProperties;
/**
* Properties for the Pulsar producer.
*/
protected final Properties pulsarClientProperties;
/**
* Properties for the Pulsar producer parallelism.
*/
protected final Integer sinkParallelism;
public PulsarDynamicSink(
DataType physicalDataType,
@Nullable EncodingFormat<SerializationSchema<RowData>> encodingFormat,
String topic,
String service_url,
String update_mode,
Properties pulsarProducerProperties,
Properties pulsarClientProperties,
Integer sinkParallelism) {
// Format attributes
this.physicalDataType =
checkNotNull(physicalDataType, "Physical data type must not be null.");
this.encodingFormat = encodingFormat;
// Mutable attributes
this.metadataKeys = Collections.emptyList();
// Pulsar-specific attributes
this.topic = checkNotNull(topic, "Topic must not be null.");
this.serviceUrl = checkNotNull(service_url, "Service url must not be null.");
this.updateMode = checkNotNull(update_mode, "Update mode must not be null.");
this.pulsarProducerProperties = checkNotNull(pulsarProducerProperties, "pulsarProducerProperties must not be null.");
this.pulsarClientProperties = checkNotNull(pulsarClientProperties, "pulsarClientProperties must not be null.");
this.sinkParallelism = sinkParallelism;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
if (updateMode.equals("append")) {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.build();
} else {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
// .addContainedKind(RowKind.UPDATE_BEFORE)
// .addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
}
// return encodingFormat.getChangelogMode();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
SerializationSchema<RowData> runtimeEncoder = encodingFormat.createRuntimeEncoder(context, physicalDataType);
PulsarSinkFunction<RowData> sinkFunction =
new PulsarSinkFunction<>(
topic,
serviceUrl,
pulsarProducerProperties,
pulsarClientProperties,
runtimeEncoder);
//sink的并行度设置
if (sinkParallelism != null) {
return SinkFunctionProvider.of(sinkFunction, sinkParallelism);
} else {
return SinkFunctionProvider.of(sinkFunction);
}
}
@Override
public DynamicTableSink copy() {
final PulsarDynamicSink copy =
new PulsarDynamicSink(
physicalDataType,
encodingFormat,
topic,
serviceUrl,
updateMode,
pulsarProducerProperties,
pulsarClientProperties,
sinkParallelism);
copy.metadataKeys = metadataKeys;
return copy;
}
@Override
public String asSummaryString() {
return "Pulsar table sink";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.connector.pulsar;
import com.dlink.connector.pulsar.util.PulsarConnectorOptions;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* @author DarrenDa
* * @version 1.0
* * @Desc:
**/
/** A version-agnostic Pulsar {@link ScanTableSource}. */
@Internal
public class PulsarDynamicSource
implements ScanTableSource, SupportsWatermarkPushDown {
private static final Logger LOG = LoggerFactory.getLogger(PulsarDynamicSource.class);
private final String serviceUrl;
private final String adminUrl;
private final String subscriptionName;
private final SubscriptionType subscriptionType;
private final PulsarConnectorOptions.ScanStartupMode startupMode;
private final String topic;
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
private final DataType producedDataType;
private final String tableIdentifier;
private final Properties properties;
private final Long timestamp;
private final Integer sourceParallelism;
/** Watermark strategy that is used to generate per-partition watermark. */
protected WatermarkStrategy<RowData> watermarkStrategy;
public PulsarDynamicSource(
String serviceUrl,
String adminUrl,
String subscriptionName,
SubscriptionType subscriptionType,
PulsarConnectorOptions.ScanStartupMode startupMode,
Long timestamp,
String topic,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DataType producedDataType,
String tableIdentifier,
Properties properties,
Integer sourceParallelism) {
this.serviceUrl = serviceUrl;
this.adminUrl = adminUrl;
this.subscriptionName = subscriptionName;
this.subscriptionType = subscriptionType;
this.startupMode = startupMode;
this.timestamp = timestamp;
this.topic = topic;
this.decodingFormat = decodingFormat;
this.producedDataType = producedDataType;
this.tableIdentifier = tableIdentifier;
this.properties = properties;
this.sourceParallelism = sourceParallelism;
this.watermarkStrategy = null;
}
@Override
public ChangelogMode getChangelogMode() {
// in our example the format decides about the changelog mode
// but it could also be the source itself
return decodingFormat.getChangelogMode();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
// create runtime classes that are shipped to the cluster
final DeserializationSchema<RowData> deserializer =
decodingFormat.createRuntimeDecoder(runtimeProviderContext, producedDataType);
final PulsarSource<RowData> pulsarSource =
createPulsarSource(deserializer);
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
if (watermarkStrategy == null) {
LOG.info("WatermarkStrategy 为空");
watermarkStrategy = WatermarkStrategy.noWatermarks();
} else {
LOG.info("WatermarkStrategy 不为空");
}
DataStreamSource<RowData> rowDataDataStreamSource = execEnv.fromSource(
pulsarSource, watermarkStrategy, "PulsarSource-" + tableIdentifier);
//设置source并行度
if (sourceParallelism != null) {
rowDataDataStreamSource.setParallelism(sourceParallelism);
}
return rowDataDataStreamSource;
}
@Override
public boolean isBounded() {
return pulsarSource.getBoundedness() == Boundedness.BOUNDED;
}
};
}
@Override
public DynamicTableSource copy() {
return new PulsarDynamicSource(
serviceUrl,
adminUrl,
subscriptionName,
subscriptionType,
startupMode,
timestamp,
topic,
decodingFormat,
producedDataType,
tableIdentifier,
properties,
sourceParallelism
);
}
@Override
public String asSummaryString() {
return "Pulsar Table Source";
}
//---------------------------------------------------------------------------------------------
protected PulsarSource<RowData> createPulsarSource(
DeserializationSchema<RowData> deserializer) {
final PulsarSourceBuilder<RowData> pulsarSourceBuilder = PulsarSource.builder();
pulsarSourceBuilder
.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
.setServiceUrl(serviceUrl)
.setAdminUrl(adminUrl)
.setTopics(topic)
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(deserializer))
.setConfig(Configuration.fromMap((Map) properties))
.setSubscriptionName(subscriptionName);
switch (subscriptionType) {
case Shared:
pulsarSourceBuilder.setSubscriptionType(SubscriptionType.Shared);
break;
case Exclusive:
pulsarSourceBuilder.setSubscriptionType(SubscriptionType.Exclusive);
break;
case Key_Shared:
pulsarSourceBuilder.setSubscriptionType(SubscriptionType.Key_Shared);
break;
case Failover:
pulsarSourceBuilder.setSubscriptionType(SubscriptionType.Failover);
break;
default:
throw new TableException(
"Unsupported subscriptionType. Validator should have checked that.");
}
switch (startupMode) {
case EARLIEST:
pulsarSourceBuilder.setStartCursor(StartCursor.earliest());
break;
case LATEST:
pulsarSourceBuilder.setStartCursor(StartCursor.latest());
break;
case TIMESTAMP:
checkNotNull(timestamp, "No timestamp supplied.");
pulsarSourceBuilder.setStartCursor(StartCursor.fromMessageTime(timestamp));
break;
default:
throw new TableException(
"Unsupported startup mode. Validator should have checked that.");
}
return pulsarSourceBuilder.build();
}
@Override
public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.connector.pulsar;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.format.Format;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.*;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.flink.table.types.DataType;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import static com.dlink.connector.pulsar.util.PulsarConnectorOptions.*;
import static com.dlink.connector.pulsar.util.PulsarConnectorOptionsUtil.*;
/**
* Factory for creating configured instances of {@link PulsarDynamicSource} and {
*
* @author DarrenDa
* * @version 1.0
* * @Desc:
* @link PulsarDynamicSink}.
*/
@Internal
public class PulsarDynamicTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(PulsarDynamicTableFactory.class);
public static final String IDENTIFIER = "pulsar";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(SERVICE_URL);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(ADMIN_URL);
options.add(SUBSCRIPTION_NAME);
options.add(SUBSCRIPTION_TYPE);
options.add(SUBSCRIPTION_INITIAL_POSITION);
options.add(SUBSCRIPTION_INITIAL_POSITION_TIMESTAMP);
options.add(FactoryUtil.FORMAT);
options.add(TOPIC);
options.add(UPDATE_MODE);
options.add(SOURCE_PARALLELISM);
options.add(SINK_PARALLELISM);
options.add(VERSION);
options.add(DERIVE_SCHEMA);
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
// discover a suitable decoding format
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
helper.discoverDecodingFormat(
DeserializationFormatFactory.class, FactoryUtil.FORMAT);
// validate all options
// helper.validate();
helper.validateExcept(PROPERTIES_PREFIX, PROPERTIES_CLIENT_PREFIX);
// get the validated options
final ReadableConfig tableOptions = helper.getOptions();
final String serviceUrl = tableOptions.get(SERVICE_URL);
final String adminUrl = tableOptions.get(ADMIN_URL);
final String subscriptionName = tableOptions.get(SUBSCRIPTION_NAME);
final SubscriptionType subscriptionType = tableOptions.get(SUBSCRIPTION_TYPE);
final ScanStartupMode startupMode = tableOptions.get(SUBSCRIPTION_INITIAL_POSITION);
final Long timestamp = tableOptions.get(SUBSCRIPTION_INITIAL_POSITION_TIMESTAMP);
final String topic = tableOptions.get(TOPIC);
final Integer sourceParallelism = tableOptions.get(SOURCE_PARALLELISM);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// create and return dynamic table source
return new PulsarDynamicSource(
serviceUrl,
adminUrl,
subscriptionName,
subscriptionType,
startupMode,
timestamp,
topic,
decodingFormat,
producedDataType,
context.getObjectIdentifier().asSummaryString(),
getPulsarProperties(context.getCatalogTable().getOptions(), PROPERTIES_PREFIX),
sourceParallelism
);
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(
this, context);
final ReadableConfig tableOptions = helper.getOptions();
final String update_mode = tableOptions.get(UPDATE_MODE);
final Integer sinkParallelism = tableOptions.get(SINK_PARALLELISM);
helper.validateExcept(PROPERTIES_PREFIX, PROPERTIES_CLIENT_PREFIX);
final EncodingFormat<SerializationSchema<RowData>> encodingFormat =
helper.discoverEncodingFormat(
SerializationFormatFactory.class, FactoryUtil.FORMAT);
//校验sql建表时是否指定主键约束
//我们一般使用flink自动推导出来的主键,不显式设置主键约束,所以这个校验方法暂时不使用
// validatePKConstraints(
// update_mode, context.getObjectIdentifier(), context.getCatalogTable(), encodingFormat);
final DataType physicalDataType =
context.getCatalogTable().getSchema().toPhysicalRowDataType();
return createPulsarTableSink(
physicalDataType,
encodingFormat,
tableOptions.get(TOPIC),
tableOptions.get(SERVICE_URL),
update_mode,
getPulsarProperties(context.getCatalogTable().getOptions(), PROPERTIES_PREFIX),
getPulsarProperties(context.getCatalogTable().getOptions(), PROPERTIES_CLIENT_PREFIX),
sinkParallelism
);
}
//校验sql建表时是否指定主键约束
private static void validatePKConstraints(
@Nullable String update_mode, ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
if (!update_mode.equals("append") && !update_mode.equals("upsert")) {
throw new ValidationException(
String.format(
"The Pulsar table '%s' with update-mode should be 'append' or 'upsert'",
tableName.asSummaryString()));
} else if (catalogTable.getSchema().getPrimaryKey().isPresent()
&& update_mode.equals("append")) {
throw new ValidationException(
String.format(
"The Pulsar table '%s' with append update-mode doesn't support defining PRIMARY KEY constraint"
+ " on the table, because it can't guarantee the semantic of primary key.",
tableName.asSummaryString()));
} else if (!catalogTable.getSchema().getPrimaryKey().isPresent()
&& update_mode.equals("upsert")) {
throw new ValidationException(
"'upsert' tables require to define a PRIMARY KEY constraint. "
+ "The PRIMARY KEY specifies which columns should be read from or write to the Pulsar message key. "
+ "The PRIMARY KEY also defines records in the 'upsert' table should update or delete on which keys.");
}
}
protected PulsarDynamicSink createPulsarTableSink(
DataType physicalDataType,
@Nullable EncodingFormat<SerializationSchema<RowData>> encodingFormat,
String topic,
String service_url,
String update_mode,
Properties pulsarProducerProperties,
Properties pulsarClientProperties,
Integer sinkParallelism) {
return new PulsarDynamicSink(
physicalDataType,
encodingFormat,
topic,
service_url,
update_mode,
pulsarProducerProperties,
pulsarClientProperties,
sinkParallelism);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.connector.pulsar;
import com.dlink.connector.pulsar.util.PulsarConnectionHolder;
import com.dlink.connector.pulsar.util.PulsarProducerHolder;
import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
/**
* The sink function for Pulsar.
*
* @author DarrenDa
* * @version 1.0
* * @Desc:
*/
@Internal
public class PulsarSinkFunction<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkFunction.class);
private final String topic;
private final String serviceUrl;
private final Properties pulsarProducerProperties;
private final Properties pulsarClientProperties;
SerializationSchema<T> runtimeEncoder;
// private transient PulsarClient pulsarClient;
private transient Producer producer;
private transient volatile boolean closed = false;
/**
* Flag indicating whether to accept failures (and log them), or to fail on failures. Default is False.
*/
protected boolean logFailuresOnly;
/**
* If true, the producer will wait until all outstanding records have been send to the broker. Default is True.
*/
protected boolean flushOnCheckpoint = true;
/**
* The callback than handles error propagation or logging callbacks.
*/
protected transient BiConsumer<MessageId, Throwable> sendCallback;
/**
* Errors encountered in the async producer are stored here.
*/
protected transient volatile Exception asyncException;
/**
* Lock for accessing the pending records.
*/
protected final SerializableObject pendingRecordsLock = new SerializableObject();
/**
* Number of unacknowledged records.
*/
protected long pendingRecords;
public PulsarSinkFunction(
String topic,
String serviceUrl,
Properties pulsarProducerProperties,
Properties pulsarClientProperties,
SerializationSchema<T> runtimeEncoder
) {
this.topic = topic;
this.serviceUrl = serviceUrl;
this.pulsarProducerProperties = pulsarProducerProperties;
this.pulsarClientProperties = pulsarClientProperties;
this.runtimeEncoder = runtimeEncoder;
}
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("start open ...");
try {
RuntimeContext ctx = getRuntimeContext();
LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into (※) pulsar topic {}",
ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), topic);
// this.producer = createProducer();
this.producer = createReusedProducer();
LOG.info("Pulsar producer has been created.");
} catch (IOException ioe) {
LOG.error("Exception while creating connection to Pulsar.", ioe);
throw new RuntimeException("Cannot create connection to Pulsar.", ioe);
} catch (Exception ex) {
LOG.error("Exception while creating connection to Pulsar.", ex);
throw new RuntimeException("Cannot create connection to Pulsar.", ex);
}
if (flushOnCheckpoint
&& !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn(
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
flushOnCheckpoint = false;
}
if (logFailuresOnly) {
this.sendCallback =
(t, u) -> {
if (u != null) {
LOG.error(
"Error while sending message to Pulsar: {}",
ExceptionUtils.stringifyException(u));
}
acknowledgeMessage();
};
} else {
this.sendCallback =
(t, u) -> {
if (asyncException == null && u != null) {
asyncException = new Exception(u);
}
acknowledgeMessage();
};
}
LOG.info("end open.");
}
@Override
public void invoke(T value, Context context) throws Exception {
LOG.info("start to invoke, send pular message.");
// propagate asynchronous errors
checkErroneous();
byte[] serializeValue = runtimeEncoder.serialize(value);
String strValue = new String(serializeValue);
TypedMessageBuilder<byte[]> typedMessageBuilder = producer.newMessage();
typedMessageBuilder.value(serializeValue);
typedMessageBuilder.key(getKey(strValue));
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords++;
}
}
//异步发送
CompletableFuture<MessageId> messageIdCompletableFuture = typedMessageBuilder.sendAsync();
messageIdCompletableFuture.whenComplete(sendCallback);
}
@Override
public void close() throws Exception {
//采用pulsar producer复用的方式,close方法不要具体实现,否则producer会被关闭
LOG.error("PulsarProducerBase Class close function called");
// closed = true;
//
// if (producer != null) {
// try {
// producer.close();
// } catch (IOException e) {
// LOG.warn("Exception occurs while closing Pulsar producer.", e);
// }
// this.producer = null;
// }
checkErroneous();
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
if (pendingRecords != 0) {
try {
LOG.info("等待notify");
pendingRecordsLock.wait();
checkErroneous();
flush();
LOG.info("等待waite之后");
} catch (InterruptedException e) {
// this can be interrupted when the Task has been cancelled.
// by throwing an exception, we ensure that this checkpoint doesn't get
// confirmed
throw new IllegalStateException(
"Flushing got interrupted while checkpointing", e);
}
}
}
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// nothing to do.
}
public String getKey(String strValue) {
// JSONObject jsonObject = JSONObject.parseObject(strValue);
// JSONObject jsonObject = JSONUtil.parseObject(strValue);
// String key = jsonObject.getString("key");
ArrayNode jsonNodes = JSONUtil.parseArray(strValue);
String key = String.valueOf(jsonNodes.get("key"));
return key == null ? "" : key;
}
//获取Pulsar Producer
public Producer createProducer() throws Exception {
LOG.info("current pulsar version is " + PulsarVersion.getVersion());
ClientBuilder builder = PulsarClient.builder();
ProducerBuilder producerBuilder = builder.serviceUrl(serviceUrl)
.maxNumberOfRejectedRequestPerConnection(50)
.loadConf((Map) pulsarClientProperties)
.build()
.newProducer()
.topic(topic)
.blockIfQueueFull(Boolean.TRUE)
.compressionType(CompressionType.LZ4)
.hashingScheme(HashingScheme.JavaStringHash)
// .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.loadConf((Map) pulsarProducerProperties);//实现配置透传功能
Producer producer = producerBuilder.create();
return producer;
// return PulsarClient.builder()
// .serviceUrl(serviceUrl)
// .build()
// .newProducer()
// .loadConf((Map)properties)//实现配置透传功能
// .topic(topic)
// .blockIfQueueFull(Boolean.TRUE)
// .compressionType(CompressionType.LZ4)
// .hashingScheme(HashingScheme.JavaStringHash)
// .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
// .create();
}
//获取复用的Pulsar Producer
public Producer createReusedProducer() throws Exception {
LOG.info("now create client, serviceUrl is :" + serviceUrl);
PulsarClientImpl client = PulsarConnectionHolder.getProducerClient(serviceUrl, pulsarClientProperties);
LOG.info("current pulsar version is " + PulsarVersion.getVersion());
LOG.info("now create producer, topic is :" + topic);
// ProducerConfigurationData configuration = new ProducerConfigurationData();
// configuration.setHashingScheme(HashingScheme.JavaStringHash);
return PulsarProducerHolder.getProducer(topic, pulsarProducerProperties, client);
}
/**
* Defines whether the producer should fail on errors, or only log them. If this is set to true,
* then exceptions will be only logged, if set to false, exceptions will be eventually thrown
* and cause the streaming program to fail (and enter recovery).
*
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/
public void setLogFailuresOnly(boolean logFailuresOnly) {
this.logFailuresOnly = logFailuresOnly;
}
/**
* If set to true, the Flink producer will wait for all outstanding messages in the Pulsar
* buffers to be acknowledged by the Pulsar producer on a checkpoint. This way, the producer can
* guarantee that messages in the Pulsar buffers are part of the checkpoint.
*
* @param flush Flag indicating the flushing mode (true = flush on checkpoint)
*/
public void setFlushOnCheckpoint(boolean flush) {
this.flushOnCheckpoint = flush;
}
protected void checkErroneous() throws Exception {
Exception e = asyncException;
if (e != null) {
// prevent double throwing
asyncException = null;
throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
}
}
private void acknowledgeMessage() {
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
LOG.info("pendingRecords:" + pendingRecords);
pendingRecords--;
if (pendingRecords == 0) {
pendingRecordsLock.notifyAll();
LOG.info("notify完成");
}
}
}
}
/**
* Flush pending records.
*/
protected void flush() throws Exception {
producer.flush();
}
;
}
package com.dlink.connector.pulsar.util;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author DarrenDa
* * @version 1.0
* * @Desc:
*/
public class PulsarConnectionHolder {
private static final Logger LOG = LoggerFactory.getLogger(PulsarConnectionHolder.class);
private static final Map<String, PulsarClientImpl> PULSAR_CLIENT_MAP = new ConcurrentHashMap<>();
public static PulsarClientImpl getConsumerClient(String serviceUrl, Properties properties) throws Exception {
return get(serviceUrl, true, properties);
}
public static PulsarClientImpl getProducerClient(String serviceUrl, Properties properties) throws Exception {
return get(serviceUrl, false, properties);
}
private static PulsarClientImpl get(String serviceUrl, boolean consumer, Properties properties) throws Exception {
synchronized (PulsarConnectionHolder.class) {
String pulsarClientCacheKey = getPulsarClientCacheKey(serviceUrl, consumer);
PulsarClientImpl pulsarClient = PULSAR_CLIENT_MAP.get(pulsarClientCacheKey);
if (null != pulsarClient) {
return pulsarClient;
}
// return PULSAR_CLIENT_MAP.computeIfAbsent(pulsarClientCacheKey, serviceUrlTag -> createPulsarClient(serviceUrl));
PulsarClientImpl pulsarClientImpl = createPulsarClient(serviceUrl, properties);
PulsarClientImpl newPulsarClientImpl = PULSAR_CLIENT_MAP.putIfAbsent(pulsarClientCacheKey, pulsarClientImpl);
if (newPulsarClientImpl == null) {
return pulsarClientImpl;
}
return newPulsarClientImpl;
}
}
private static String getPulsarClientCacheKey(String serviceUrl, boolean consumer) {
return serviceUrl + consumer;
}
private static PulsarClientImpl createPulsarClient(String serviceUrl, Properties properties) {
try {
LOG.info("create client, and ID is " + UUID.randomUUID() + ", and cache map size is " + PULSAR_CLIENT_MAP.size());
return (PulsarClientImpl) PulsarClient
.builder()
.serviceUrl(serviceUrl)
.maxNumberOfRejectedRequestPerConnection(50)
.loadConf((Map) properties)
.build();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("创建PulsarClient失败", e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.connector.pulsar.util;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.pulsar.client.api.SubscriptionType;
import static org.apache.flink.configuration.description.TextElement.text;
/**
* @author DarrenDa
* * @version 1.0
* * @Desc:
**/
/** Options for the Pulsar connector. */
@PublicEvolving
public class PulsarConnectorOptions {
// --------------------------------------------------------------------------------------------
// Format options
// --------------------------------------------------------------------------------------------
public static final ConfigOption<String> SERVICE_URL =
ConfigOptions.key("connector.service-url")
.stringType()
.noDefaultValue()
.withDescription(
"Defines pulsar service url. ");
public static final ConfigOption<String> ADMIN_URL =
ConfigOptions.key("connector.admin-url")
.stringType()
.defaultValue("http://pulsar-dlink-qa.dlink.com:8080")
.withDescription(
"Defines pulsar admin url. ");
public static final ConfigOption<String> TOPIC =
ConfigOptions.key("connector.topic")
.stringType()
.noDefaultValue()
.withDescription(
"Defines pulsar topic. ");
public static final ConfigOption<String> SUBSCRIPTION_NAME =
ConfigOptions.key("connector.subscription-name")
.stringType()
.noDefaultValue()
.withDescription(
"Defines pulsar subscription name. ");
public static final ConfigOption<SubscriptionType> SUBSCRIPTION_TYPE =
ConfigOptions.key("connector.subscription-type")
.enumType(SubscriptionType.class)
.defaultValue(SubscriptionType.Shared)
.withDescription(
"Defines pulsar subscription type. ");
public static final ConfigOption<ScanStartupMode> SUBSCRIPTION_INITIAL_POSITION =
ConfigOptions.key("connector.subscription-initial-position")
.enumType(ScanStartupMode.class)
.defaultValue(ScanStartupMode.LATEST)
.withDescription("Startup mode for Pulsar consumer.");
public static final ConfigOption<Long> SUBSCRIPTION_INITIAL_POSITION_TIMESTAMP =
ConfigOptions.key("connector.subscription-initial-position.timestamp")
.longType()
.noDefaultValue()
.withDescription("Start from the specified message time by Message<byte[]>.getPublishTime().");
public static final ConfigOption<String> UPDATE_MODE =
ConfigOptions.key("update-mode")
.stringType()
.noDefaultValue()
.withDescription(
"Defines pulsar update mode. ");
public static final ConfigOption<Integer> SOURCE_PARALLELISM =
ConfigOptions.key("source-parallelism")
.intType()
.noDefaultValue()
.withDescription(
"Defines pulsar sink parallelism. ");
public static final ConfigOption<Integer> SINK_PARALLELISM =
ConfigOptions.key("sink-parallelism")
.intType()
.noDefaultValue()
.withDescription(
"Defines pulsar sink parallelism. ");
//与老平台 1.14.3之前版本的sql进行兼容,但是并未使用的参数
public static final ConfigOption<String> VERSION =
ConfigOptions.key("connector.version")
.stringType()
.noDefaultValue()
.withDescription(
"Defines pulsar version. ");
//与老平台 1.14.3之前版本的sql进行兼容,但是并未使用的参数
public static final ConfigOption<String> DERIVE_SCHEMA =
ConfigOptions.key("format.derive-schema")
.stringType()
.noDefaultValue()
.withDescription(
"Defines pulsar derive schema. ");
// public static final ConfigOption<Boolean> PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE =
// ConfigOptions.key("pulsar.source.enableAutoAcknowledgeMessage")
// .booleanType()
// .noDefaultValue()
// .withDescription(
// "Defines pulsar enable auto acknowledge message. ");
// --------------------------------------------------------------------------------------------
// Enums
// --------------------------------------------------------------------------------------------
/** Startup mode for the Pulsar consumer, see {@link #SUBSCRIPTION_INITIAL_POSITION}. */
public enum ScanStartupMode implements DescribedEnum {
EARLIEST("Earliest", text("Start from the earliest available message in the topic..")),
LATEST("Latest", text("Start from the latest available message in the topic.")),
TIMESTAMP("Timestamp", text("Start from the specified message time by Message<byte[]>.getPublishTime()."));
private final String value;
private final InlineElement description;
ScanStartupMode(String value, InlineElement description) {
this.value = value;
this.description = description;
}
@Override
public String toString() {
return value;
}
@Override
public InlineElement getDescription() {
return description;
}
}
private PulsarConnectorOptions() {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.connector.pulsar.util;
import org.apache.flink.annotation.PublicEvolving;
import java.util.Map;
import java.util.Properties;
/**
* @author DarrenDa
* * @version 1.0
* * @Desc:
**/
/** Utilities for {@link PulsarConnectorOptions}. */
@PublicEvolving
public class PulsarConnectorOptionsUtil {
// Prefix for Pulsar specific properties.
public static final String PROPERTIES_PREFIX = "properties.";
public static final String PROPERTIES_CLIENT_PREFIX = "properties_client.";
public static Properties getPulsarProperties(Map<String, String> tableOptions, String prefix) {
final Properties pulsarProperties = new Properties();
if (hasPulsarClientProperties(tableOptions)) {
tableOptions.keySet().stream()
.filter(key -> key.startsWith(prefix))
.forEach(
key -> {
final String value = tableOptions.get(key);
final String subKey = key.substring((prefix).length());
pulsarProperties.put(subKey, value);
});
}
return pulsarProperties;
}
/**
* Decides if the table options contains Pulsar client properties that start with prefix
* 'properties'.
*/
private static boolean hasPulsarClientProperties(Map<String, String> tableOptions) {
return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
private PulsarConnectorOptionsUtil() {
}
}
package com.dlink.connector.pulsar.util;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author DarrenDa
* * @version 1.0
* * @Desc:
*/
public class PulsarProducerHolder {
private static final Logger LOG = LoggerFactory.getLogger(PulsarProducerHolder.class);
private static final Map<String, Producer> PULSAR_PRODUCER_MAP = new ConcurrentHashMap<>();
public static Producer getProducer(String defaultTopicName, Properties properties, PulsarClient client) throws Exception {
return get(defaultTopicName, properties, client);
}
private static Producer get(String defaultTopicName, Properties properties, PulsarClient client) throws Exception {
synchronized (PulsarProducerHolder.class) {
String pulsarProducerCacheKey = defaultTopicName;
Producer pulsarProducer = PULSAR_PRODUCER_MAP.get(pulsarProducerCacheKey);
LOG.info("get pulsarProducer from map result is " + pulsarProducer);
if (null != pulsarProducer) {
return pulsarProducer;
}
Producer producer = createPulsarProducer(defaultTopicName, properties, client);
Producer newPulsarProducer = PULSAR_PRODUCER_MAP.putIfAbsent(pulsarProducerCacheKey, producer);
if (newPulsarProducer == null) {
return producer;
}
return newPulsarProducer;
}
}
private static Producer createPulsarProducer(String defaultTopicName, Properties properties, PulsarClient client) {
try {
LOG.info("create producer, and ID is " + UUID.randomUUID() + ", and cache map size is " + PULSAR_PRODUCER_MAP.size());
LOG.info("now defaultTopicName is " + defaultTopicName + ", and map content is " + PULSAR_PRODUCER_MAP.get(defaultTopicName));
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
producerBuilder.
blockIfQueueFull(Boolean.TRUE).
compressionType(CompressionType.LZ4).
topic(defaultTopicName).
hashingScheme(HashingScheme.JavaStringHash).
// batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).
loadConf((Map) properties);
Producer<byte[]> producer = producerBuilder.create();
return producer;
// return client.newProducer().
// blockIfQueueFull(Boolean.TRUE).
// compressionType(CompressionType.LZ4).
// topic(defaultTopicName).
// hashingScheme(HashingScheme.JavaStringHash).
//// batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).
// loadConf((Map)properties).
// create();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("创建Producer失败", e);
}
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.dlink.connector.pulsar.PulsarDynamicTableFactory
package com.dlink.connector.pulsar;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.junit.Test;
/**
* @author DarrenDa
* @version 1.0
* @Desc: Test case
*/
public class PulsarSqlCase {
@Test
public void testCase() {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.executeSql("create table source_gen_data(\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
"\n" +
" -- optional options --\n" +
"\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")")
;
tableEnvironment.executeSql("create table sink_table(\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts string\n" +
") with (\n" +
" 'connector' = 'print'\n" +
")");
TableResult tableResult = tableEnvironment.executeSql("insert into sink_table\n" +
"select\n" +
" f_sequence ,\n" +
" f_random ,\n" +
" f_random_str ,\n" +
" cast(ts as string)\n" +
"from source_gen_data");
tableResult.print();
// Table tb = tableEnvironment.sqlQuery("select * from source_gen_data");
// tableResult.execute().print();
}
@Test
public void pulsarTest() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.executeSql("" +
"CREATE TABLE source_pulsar(\n" +
" requestId VARCHAR,\n" +
" `timestamp` BIGINT,\n" +
" `date` VARCHAR,\n" +
" appId VARCHAR,\n" +
" appName VARCHAR,\n" +
" forwardTimeMs VARCHAR,\n" +
" processingTimeMs INT,\n" +
" errCode VARCHAR,\n" +
" userIp VARCHAR,\n" +
" createTime bigint,\n" +
" b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')\n" +
") WITH (\n" +
" 'connector' = 'pulsar',\n" +
" 'connector.version' = 'universal',\n" +
" 'connector.topic' = 'persistent://dlink/dev/context.pulsar',\n" +
" 'connector.service-url' = 'pulsar://pulsar-dlink-n.stream.com:6650',\n" +
" 'connector.subscription-name' = 'tmp_print_detail',\n" +
" 'connector.subscription-type' = 'Shared',\n" +
" 'connector.subscription-initial-position' = 'Latest',\n" +
" 'update-mode' = 'append',\n" +
" 'format' = 'json',\n" +
" 'format.derive-schema' = 'true'\n" +
")")
;
tableEnvironment.executeSql("" +
"create table sink_pulsar_result(\n" +
" requestId VARCHAR,\n" +
" `timestamp` BIGINT,\n" +
" `date` VARCHAR,\n" +
" appId VARCHAR,\n" +
" appName VARCHAR,\n" +
" forwardTimeMs VARCHAR,\n" +
" processingTimeMs INT,\n" +
" errCode VARCHAR,\n" +
" userIp VARCHAR\n" +
") with (\n" +
" 'connector' = 'print'\n" +
")");
TableResult tableResult = tableEnvironment.executeSql("" +
"insert into sink_pulsar_result\n" +
"select \n" +
" requestId ,\n" +
" `timestamp`,\n" +
" `date`,\n" +
" appId,\n" +
" appName,\n" +
" forwardTimeMs,\n" +
" processingTimeMs,\n" +
" errCode,\n" +
" userIp\n" +
"from source_pulsar");
tableResult.print();
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.dlink.connector.pulsar.PulsarDynamicTableFactory
......@@ -17,6 +17,7 @@
<module>dlink-connector-phoenix-1.13</module>
<module>dlink-connector-phoenix-1.14</module>
<module>dlink-connector-doris-1.13</module>
<module>dlink-connector-pulsar-1.14</module>
</modules>
<artifactId>dlink-connectors</artifactId>
</project>
\ No newline at end of file
</project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment