Unverified Commit 6a58b9a7 authored by 金鑫's avatar 金鑫 Committed by GitHub

支持sqlserver cdc功能,测试同步doris通过。 (#951)

* 增加sqlserver-cdc支持,可以通过手写finksql完成sqlservercdc同步,但是不支持cdcsource模式.

* 修改报名\类名为sqlserver

* 增加sqlserver-cdc支持,可以通过手写finksql完成sqlservercdc同步,但是不支持cdcsource模式.

* 优化trycatch异常日志打印代码

* 支持sqlservercdc功能
Co-authored-by: 's avatar金鑫 <jinyanhui@huansi.net>
parent 844adb47
...@@ -28,7 +28,6 @@ import com.dlink.executor.ExecutorSetting; ...@@ -28,7 +28,6 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.CheckpointingOptions;
import java.io.IOException; import java.io.IOException;
...@@ -51,7 +50,6 @@ import org.slf4j.LoggerFactory; ...@@ -51,7 +50,6 @@ import org.slf4j.LoggerFactory;
* @since 2021/10/27 * @since 2021/10/27
**/ **/
public class Submiter { public class Submiter {
private static final Logger logger = LoggerFactory.getLogger(Submiter.class); private static final Logger logger = LoggerFactory.getLogger(Submiter.class);
private static String getQuerySQL(Integer id) throws SQLException { private static String getQuerySQL(Integer id) throws SQLException {
...@@ -75,10 +73,9 @@ public class Submiter { ...@@ -75,10 +73,9 @@ public class Submiter {
try { try {
statement = DBUtil.getOneByID(getQuerySQL(id), config); statement = DBUtil.getOneByID(getQuerySQL(id), config);
} catch (IOException | SQLException e) { } catch (IOException | SQLException e) {
e.printStackTrace(); logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, \n" +
logger.error(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 异常,ID 为" + id); "连接信息为:{} \n" +
logger.error(LocalDateTime.now().toString() + "连接信息为:" + config.toString()); "异常信息为:{} ", LocalDateTime.now(), id, config.toString(), e.getMessage(), e);
logger.error(LocalDateTime.now().toString() + "异常信息为:" + e.getMessage());
} }
return statement; return statement;
} }
...@@ -88,10 +85,9 @@ public class Submiter { ...@@ -88,10 +85,9 @@ public class Submiter {
try { try {
task = DBUtil.getMapByID(getTaskInfo(id), config); task = DBUtil.getMapByID(getTaskInfo(id), config);
} catch (IOException | SQLException e) { } catch (IOException | SQLException e) {
e.printStackTrace(); logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, \n" +
logger.error(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 配置异常,ID 为" + id); "连接信息为:{} \n" +
logger.error(LocalDateTime.now().toString() + "连接信息为:" + config.toString()); "异常信息为:{} ", LocalDateTime.now(), id, config.toString(), e.getMessage(), e);
logger.error(LocalDateTime.now().toString() + "异常信息为:" + e.getMessage());
} }
return task; return task;
} }
...@@ -123,7 +119,7 @@ public class Submiter { ...@@ -123,7 +119,7 @@ public class Submiter {
executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid); executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid);
} }
logger.info("作业配置如下: " + executorSetting.toString()); logger.info("作业配置如下: {}", executorSetting);
Executor executor = Executor.buildAppStreamExecutor(executorSetting); Executor executor = Executor.buildAppStreamExecutor(executorSetting);
List<StatementParam> ddl = new ArrayList<>(); List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>(); List<StatementParam> trans = new ArrayList<>();
...@@ -185,10 +181,10 @@ public class Submiter { ...@@ -185,10 +181,10 @@ public class Submiter {
logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes)); logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes));
try { try {
executor.execute(executorSetting.getJobName()); executor.execute(executorSetting.getJobName());
logger.info("执行成功");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error("执行失败, {}", e.getMessage(), e);
} }
logger.info("执行成功");
} }
logger.info(LocalDateTime.now() + "任务提交成功"); logger.info(LocalDateTime.now() + "任务提交成功");
System.out.println(LocalDateTime.now() + "任务提交成功"); System.out.println(LocalDateTime.now() + "任务提交成功");
......
...@@ -23,6 +23,7 @@ import com.dlink.assertion.Asserts; ...@@ -23,6 +23,7 @@ import com.dlink.assertion.Asserts;
import com.dlink.cdc.mysql.MysqlCDCBuilder; import com.dlink.cdc.mysql.MysqlCDCBuilder;
import com.dlink.cdc.oracle.OracleCDCBuilder; import com.dlink.cdc.oracle.OracleCDCBuilder;
import com.dlink.cdc.postgres.PostgresCDCBuilder; import com.dlink.cdc.postgres.PostgresCDCBuilder;
import com.dlink.cdc.sqlserver.SqlServerCDCBuilder;
import com.dlink.exception.FlinkClientException; import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
...@@ -33,10 +34,10 @@ import com.dlink.model.FlinkCDCConfig; ...@@ -33,10 +34,10 @@ import com.dlink.model.FlinkCDCConfig;
* @since 2022/4/12 21:12 * @since 2022/4/12 21:12
**/ **/
public class CDCBuilderFactory { public class CDCBuilderFactory {
private static CDCBuilder[] cdcBuilders = { private static CDCBuilder[] cdcBuilders = {
new MysqlCDCBuilder(), new MysqlCDCBuilder(),
new OracleCDCBuilder(), new OracleCDCBuilder(),
new SqlServerCDCBuilder(),
new PostgresCDCBuilder() new PostgresCDCBuilder()
}; };
......
...@@ -233,6 +233,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -233,6 +233,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
OutputTag<Map> outputTag = tagMap.get(table); OutputTag<Map> outputTag = tagMap.get(table);
ctx.output(outputTag, map); ctx.output(outputTag, map);
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e);
out.collect(map); out.collect(map);
} }
} }
......
package com.dlink.cdc.sqlserver;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* sql server CDC
*
* @author 郑文豪
* @date 2022/8/12 18:00
*/
public class SqlServerCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
protected static final Logger logger = LoggerFactory.getLogger(SqlServerCDCBuilder.class);
private final static String KEY_WORD = "sqlserver-cdc";
private final static String METADATA_TYPE = "SqlServer";
public SqlServerCDCBuilder() {
}
public SqlServerCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new SqlServerCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String database = config.getDatabase();
Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
// debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long");
// debeziumProperties.setProperty("decimal.handling.mode", "string");
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}
// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
}
}
final SqlServerSource.Builder<String> sourceBuilder = SqlServerSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.database(databases[0]);
} else {
sourceBuilder.database(new String());
}
List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
}
// sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.deserializer(new SqlServerJsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
case "initial":
sourceBuilder.startupOptions(StartupOptions.initial());
break;
case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest());
break;
}
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}
sourceBuilder.debeziumProperties(debeziumProperties);
final DataStreamSource<String> sqlServer_cdc_source = env.addSource(sourceBuilder.build(), "SqlServer CDC Source");
return sqlServer_cdc_source;
}
@Override
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase();
if (Asserts.isNotNullString(schema)) {
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
tableName = tableName.trim();
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
}
return schemaList;
}
@Override
public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigMap = new HashMap<>();
List<String> schemaList = getSchemaList();
for (String schema : schemaList) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:sqlserver://");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append(";database=");
sb.append(config.getDatabase());
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
allConfigMap.put(schema, configMap);
}
return allConfigMap;
}
@Override
public String getSchemaFieldName() {
return "schema";
}
}
package com.dlink.cdc.sqlserver;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.ConverterType;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* @version 1.0
* @className: com.dlink.cdc.mysql.MysqlJsonDebeziumDeserializationSchema
* @Description:
* @author: jack zhong
* @date 8/2/221:43 PM
*/
public class SqlServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = 1L;
private transient JsonConverter jsonConverter;
private final Boolean includeSchema;
private Map<String, Object> customConverterConfigs;
public SqlServerJsonDebeziumDeserializationSchema() {
this(false);
}
public SqlServerJsonDebeziumDeserializationSchema(Boolean includeSchema) {
this.includeSchema = includeSchema;
}
public SqlServerJsonDebeziumDeserializationSchema(Boolean includeSchema, Map<String, Object> customConverterConfigs) {
this.includeSchema = includeSchema;
this.customConverterConfigs = customConverterConfigs;
}
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
if (this.jsonConverter == null) {
this.initializeJsonConverter();
}
byte[] bytes = this.jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
out.collect(new String(bytes, StandardCharsets.UTF_8));
}
private void initializeJsonConverter() {
this.jsonConverter = new JsonConverter();
HashMap<String, Object> configs = new HashMap(2);
configs.put("converter.type", ConverterType.VALUE.getName());
configs.put("schemas.enable", this.includeSchema);
if (this.customConverterConfigs != null) {
configs.putAll(this.customConverterConfigs);
}
this.jsonConverter.configure(configs);
}
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
...@@ -143,6 +143,11 @@ ...@@ -143,6 +143,11 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.ververica</groupId> <groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId> <artifactId>flink-sql-connector-oracle-cdc</artifactId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment