Commit 03a17f8d authored by wenmo's avatar wenmo

新增 CDCSOURCE 多源合并语法

parent 62516d56
......@@ -35,6 +35,7 @@ Dinky(原 Dlink):
| | | 支持 INSERT 语句集 | 0.4.0 |
| | | 新增 SQL 片段语法 | 0.4.0 |
| | | 新增 AGGTABLE 表值聚合语法及 UDATF 支持 | 0.4.0 |
| | | 新增 CDCSOURCE 多源合并语法支持 | 0.6.0 |
| | | 新增 FlinkSQLEnv 执行环境复用 | 0.5.0 |
| | | 新增 Flink Catalog 交互查询 | 0.4.0 |
| | | 新增 执行环境的共享与私有会话机制 | 0.4.0 |
......
......@@ -46,48 +46,49 @@ public class Submiter {
private static String getFlinkSQLStatement(Integer id, DBConfig config) {
String statement = "";
try {
statement = DBUtil.getOneByID(getQuerySQL(id),config);
statement = DBUtil.getOneByID(getQuerySQL(id), config);
} catch (IOException | SQLException e) {
e.printStackTrace();
logger.error(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 异常,ID 为"+ id );
logger.error(LocalDateTime.now().toString() + "连接信息为:"+ config.toString() );
logger.error(LocalDateTime.now().toString() + "异常信息为:"+ e.getMessage() );
logger.error(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 异常,ID 为" + id);
logger.error(LocalDateTime.now().toString() + "连接信息为:" + config.toString());
logger.error(LocalDateTime.now().toString() + "异常信息为:" + e.getMessage());
}
return statement;
}
public static Map<String,String> getTaskConfig(Integer id, DBConfig config) {
Map<String,String> task = new HashMap<>();
public static Map<String, String> getTaskConfig(Integer id, DBConfig config) {
Map<String, String> task = new HashMap<>();
try {
task = DBUtil.getMapByID(getTaskInfo(id),config);
task = DBUtil.getMapByID(getTaskInfo(id), config);
} catch (IOException | SQLException e) {
e.printStackTrace();
logger.error(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 配置异常,ID 为"+ id );
logger.error(LocalDateTime.now().toString() + "连接信息为:"+ config.toString() );
logger.error(LocalDateTime.now().toString() + "异常信息为:"+ e.getMessage() );
logger.error(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 配置异常,ID 为" + id);
logger.error(LocalDateTime.now().toString() + "连接信息为:" + config.toString());
logger.error(LocalDateTime.now().toString() + "异常信息为:" + e.getMessage());
}
return task;
}
public static List<String> getStatements(String sql){
public static List<String> getStatements(String sql) {
return Arrays.asList(sql.split(FlinkSQLConstant.SEPARATOR));
}
public static void submit(Integer id,DBConfig dbConfig){
logger.info(LocalDateTime.now() + "开始提交作业 -- "+id);
public static void submit(Integer id, DBConfig dbConfig) {
logger.info(LocalDateTime.now() + "开始提交作业 -- " + id);
StringBuilder sb = new StringBuilder();
Map<String, String> taskConfig = Submiter.getTaskConfig(id, dbConfig);
if(Asserts.isNotNull(taskConfig.get("envId"))){
if (Asserts.isNotNull(taskConfig.get("envId"))) {
sb.append(getFlinkSQLStatement(Integer.valueOf(taskConfig.get("envId")), dbConfig));
sb.append("\r\n");
}
sb.append(getFlinkSQLStatement(id, dbConfig));
List<String> statements = Submiter.getStatements(sb.toString());
ExecutorSetting executorSetting = ExecutorSetting.build(Submiter.getTaskConfig(id,dbConfig));
logger.info("作业配置如下: "+executorSetting.toString());
ExecutorSetting executorSetting = ExecutorSetting.build(Submiter.getTaskConfig(id, dbConfig));
logger.info("作业配置如下: " + executorSetting.toString());
Executor executor = Executor.buildAppStreamExecutor(executorSetting);
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
List<StatementParam> execute = new ArrayList<>();
for (String item : statements) {
String statement = FlinkInterceptor.pretreatStatement(executor, item);
if (statement.isEmpty()) {
......@@ -99,33 +100,57 @@ public class Submiter {
if (!executorSetting.isUseStatementSet()) {
break;
}
} else if (operationType.equals(SqlType.EXECUTE)) {
execute.add(new StatementParam(statement, operationType));
if (!executorSetting.isUseStatementSet()) {
break;
}
} else {
ddl.add(new StatementParam(statement, operationType));
}
}
for (StatementParam item : ddl) {
logger.info("正在执行 FlinkSQL: "+item.getValue());
logger.info("正在执行 FlinkSQL: " + item.getValue());
executor.submitSql(item.getValue());
logger.info("执行成功");
}
if(executorSetting.isUseStatementSet()) {
if (trans.size() > 0) {
if (executorSetting.isUseStatementSet()) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : trans) {
if(item.getType().equals(SqlType.INSERT)) {
if (item.getType().equals(SqlType.INSERT)) {
inserts.add(item.getValue());
}
}
logger.info("正在执行 FlinkSQL 语句集: "+String.join(FlinkSQLConstant.SEPARATOR,inserts));
logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, inserts));
executor.submitStatementSet(inserts);
logger.info("执行成功");
}else{
} else {
for (StatementParam item : trans) {
logger.info("正在执行 FlinkSQL: "+item.getValue());
logger.info("正在执行 FlinkSQL: " + item.getValue());
executor.submitSql(item.getValue());
logger.info("执行成功");
break;
}
}
}
if (execute.size() > 0) {
List<String> executes = new ArrayList<>();
for (StatementParam item : execute) {
executes.add(item.getValue());
executor.executeSql(item.getValue());
if(!executorSetting.isUseStatementSet()){
break;
}
}
logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes));
try {
executor.getEnvironment().execute(executorSetting.getJobName());
} catch (Exception e) {
e.printStackTrace();
}
logger.info("执行成功");
}
logger.info(LocalDateTime.now() + "任务提交成功");
System.out.println(LocalDateTime.now() + "任务提交成功");
}
......
......@@ -68,22 +68,4 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", var4);
}
}
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
}
public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
}
public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) {
TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
}
......@@ -14,6 +14,7 @@
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.6</flink.version>
<flinkcdc.version>2.1.1</flinkcdc.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......@@ -21,6 +22,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
......@@ -79,6 +85,16 @@
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......@@ -88,62 +104,4 @@
<artifactId>dlink-common</artifactId>
</dependency>
</dependencies>
<!--<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>uber</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>-->
</project>
\ No newline at end of file
package com.dlink.cdc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CustomerDeserialization
*
* @author wenmo
* @since 2022/1/29 22:16
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
Map result = new HashMap<String,String>();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Map source = new HashMap<String,String>();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
Map beforeJson = new HashMap<String,String>();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
Map afterJson = new HashMap<String,String>();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(mapper.writeValueAsString(result));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
/**
* FlinkCDCMergeBuilder
*
* @author wenmo
* @since 2022/1/29 22:37
*/
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) throws Exception {
if(Asserts.isNotNull(config.getParallelism())){
env.setParallelism(config.getParallelism());
}
if(Asserts.isNotNull(config.getCheckpoint())){
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if(Asserts.isNotNull(config.getDatabase())&&config.getDatabase().size()>0){
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSource<String> sourceFunction = sourceBuilder.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
// .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
// .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.print();
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
// JobExecutionResult jobExecutionResult = env.execute(config.getJobName());
// return jobExecutionResult.getJobID().toHexString();
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
......@@ -15,6 +15,7 @@
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.7</flink.version>
<flinkcdc.version>2.1.1</flinkcdc.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......@@ -22,6 +23,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
......@@ -80,6 +86,16 @@
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......@@ -89,63 +105,4 @@
<artifactId>dlink-common</artifactId>
</dependency>
</dependencies>
<!--<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
&lt;!&ndash;打jar包&ndash;&gt;
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>uber</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>-->
</project>
\ No newline at end of file
package com.dlink.cdc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CustomerDeserialization
*
* @author wenmo
* @since 2022/1/29 22:16
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
Map result = new HashMap<String,String>();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Map source = new HashMap<String,String>();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
Map beforeJson = new HashMap<String,String>();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
Map afterJson = new HashMap<String,String>();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(mapper.writeValueAsString(result));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
/**
* FlinkCDCMergeBuilder
*
* @author wenmo
* @since 2022/1/29 22:37
*/
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) throws Exception {
if(Asserts.isNotNull(config.getParallelism())){
env.setParallelism(config.getParallelism());
}
if(Asserts.isNotNull(config.getCheckpoint())){
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if(Asserts.isNotNull(config.getDatabase())&&config.getDatabase().size()>0){
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSource<String> sourceFunction = sourceBuilder.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
// .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
// .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.print();
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
// JobExecutionResult jobExecutionResult = env.execute(config.getJobName());
// return jobExecutionResult.getJobID().toHexString();
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
......@@ -15,6 +15,7 @@
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.5</flink.version>
<flinkcdc.version>2.1.1</flinkcdc.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
......@@ -22,6 +23,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
......@@ -80,6 +86,16 @@
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
package com.dlink.cdc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CustomerDeserialization
*
* @author wenmo
* @since 2022/1/29 22:16
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
Map result = new HashMap<String,String>();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Map source = new HashMap<String,String>();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
Map beforeJson = new HashMap<String,String>();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
Map afterJson = new HashMap<String,String>();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(mapper.writeValueAsString(result));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
/**
* FlinkCDCMergeBuilder
*
* @author wenmo
* @since 2022/1/29 22:37
*/
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) throws Exception {
if(Asserts.isNotNull(config.getParallelism())){
env.setParallelism(config.getParallelism());
}
if(Asserts.isNotNull(config.getCheckpoint())){
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if(Asserts.isNotNull(config.getDatabase())&&config.getDatabase().size()>0){
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSource<String> sourceFunction = sourceBuilder.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
// .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
// .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.print();
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
// JobExecutionResult jobExecutionResult = env.execute(config.getJobName());
// return jobExecutionResult.getJobID().toHexString();
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
......@@ -14,6 +14,7 @@
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.3</flink.version>
<flinkcdc.version>2.1.1</flinkcdc.version>
<commons.version>1.3.1</commons.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
......@@ -22,6 +23,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
......@@ -80,6 +86,16 @@
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
package com.dlink.cdc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CustomerDeserialization
*
* @author wenmo
* @since 2022/1/29 22:16
*/
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建 JSON 对象用于存储最终数据
Map result = new HashMap<String,String>();
//2.获取库名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Map source = new HashMap<String,String>();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
Map beforeJson = new HashMap<String,String>();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
Map afterJson = new HashMap<String,String>();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段写入 JSON 对象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.输出数据
collector.collect(mapper.writeValueAsString(result));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
package com.dlink.cdc;
import com.dlink.assertion.Asserts;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
/**
* FlinkCDCMergeBuilder
*
* @author wenmo
* @since 2022/1/29 22:37
*/
public class FlinkCDCMergeBuilder {
public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig config) throws Exception {
if(Asserts.isNotNull(config.getParallelism())){
env.setParallelism(config.getParallelism());
}
if(Asserts.isNotNull(config.getCheckpoint())){
env.enableCheckpointing(config.getCheckpoint());
}
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
if(Asserts.isNotNull(config.getDatabase())&&config.getDatabase().size()>0){
sourceBuilder.databaseList(config.getDatabase().toArray(new String[0]));
}
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSource<String> sourceFunction = sourceBuilder.deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
// .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
// .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.print();
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
// JobExecutionResult jobExecutionResult = env.execute(config.getJobName());
// return jobExecutionResult.getJobID().toHexString();
}
private static FlinkKafkaProducer<String> getKafkaProducer(String brokers, String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-client-base</artifactId>
<properties>
<source.level>1.8</source.level>
<target.level>1.8</target.level>
<junit.version>4.12</junit.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
</dependencies>
<profiles>
<!-- For jdk 11 above JavaEE annotation -->
<profile>
<id>javax.annotation</id>
<activation>
<jdk>[1.11,)</jdk>
</activation>
<dependencies>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>provider</id>
<build>
<plugins>
<plugin>
<!-- Build an executable JAR -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<finalName>provider</finalName>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.dlink.BasicProvider</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>consumer</id>
<build>
<plugins>
<plugin>
<!-- Build an executable JAR -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<finalName>consumer</finalName>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.dlink.BasicConsumer</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${source.level}</source>
<target>${target.level}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>
${project.build.directory}/lib
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.dlink.model;
import java.util.List;
/**
* FlinkCDCConfig
*
* @author wenmo
* @since 2022/1/29 22:50
*/
public class FlinkCDCConfig {
private String hostname;
private Integer port;
private String username;
private String password;
private Integer checkpoint;
private Integer parallelism;
private List<String> database;
private List<String> table;
private String topic;
private String brokers;
public FlinkCDCConfig() {
}
public FlinkCDCConfig(String hostname, int port, String username, String password, int checkpoint, int parallelism, List<String> database, List<String> table, String topic, String brokers) {
this.hostname = hostname;
this.port = port;
this.username = username;
this.password = password;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.database = database;
this.table = table;
this.topic = topic;
this.brokers = brokers;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getCheckpoint() {
return checkpoint;
}
public void setCheckpoint(int checkpoint) {
this.checkpoint = checkpoint;
}
public int getParallelism() {
return parallelism;
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public List<String> getDatabase() {
return database;
}
public void setDatabase(List<String> database) {
this.database = database;
}
public List<String> getTable() {
return table;
}
public void setTable(List<String> table) {
this.table = table;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getBrokers() {
return brokers;
}
public void setBrokers(String brokers) {
this.brokers = brokers;
}
}
......@@ -18,5 +18,6 @@
<module>dlink-client-1.11</module>
<module>dlink-client-1.14</module>
<module>dlink-client-hadoop</module>
<module>dlink-client-base</module>
</modules>
</project>
\ No newline at end of file
......@@ -60,6 +60,7 @@ public class Explainer {
public JobParam pretreatStatements(String[] statements) {
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
List<StatementParam> execute = new ArrayList<>();
for (String item : statements) {
String statement = executor.pretreatStatement(item);
if (statement.isEmpty()) {
......@@ -72,11 +73,13 @@ public class Explainer {
if (!useStatementSet) {
break;
}
} else if(operationType.equals(SqlType.EXECUTE)){
execute.add(new StatementParam(statement, operationType));
} else {
ddl.add(new StatementParam(statement, operationType));
}
}
return new JobParam(ddl, trans);
return new JobParam(ddl, trans, execute);
}
@Deprecated
......@@ -206,23 +209,63 @@ public class Explainer {
}
}
}
for (StatementParam item : jobParam.getExecute()) {
SqlExplainResult record = new SqlExplainResult();
try {
record = executor.explainSqlRecord(item.getValue());
if (Asserts.isNull(record)) {
record = new SqlExplainResult();
executor.getEnvironment().getStreamGraph();
}else {
executor.executeSql(item.getValue());
}
record.setType("DATASTREAM");
record.setParseTrue(true);
}catch (Exception e){
e.printStackTrace();
record.setError(e.getMessage());
record.setExplainTrue(false);
record.setExplainTime(LocalDateTime.now());
record.setSql(item.getValue());
record.setIndex(index);
sqlExplainRecords.add(record);
correct = false;
break;
}
record.setExplainTrue(true);
record.setExplainTime(LocalDateTime.now());
record.setSql(item.getValue());
record.setIndex(index++);
sqlExplainRecords.add(record);
}
return new ExplainResult(correct,sqlExplainRecords.size(),sqlExplainRecords);
}
public ObjectNode getStreamGraph(String statement){
List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults();
List<String> strPlans = new ArrayList<>();
List<String> sqlPlans = new ArrayList<>();
List<String> datastreamPlans = new ArrayList<>();
for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) {
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator);
for(String str : statements){
strPlans.add(str);
sqlPlans.add(str);
}
continue;
}
if(Asserts.isNotNull(item.getType())
&& item.getType().equals(FlinkSQLConstant.DATASTREAM)){
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator);
for(String str : statements){
datastreamPlans.add(str);
}
}
}
if(strPlans.size()>0){
return executor.getStreamGraph(strPlans);
if(sqlPlans.size()>0){
return executor.getStreamGraph(sqlPlans);
}else if(datastreamPlans.size()>0){
return executor.getStreamGraphFromDataStream(sqlPlans);
}else{
return mapper.createObjectNode();
}
......@@ -230,18 +273,29 @@ public class Explainer {
public JobPlanInfo getJobPlanInfo(String statement){
List<SqlExplainResult> sqlExplainRecords = explainSql(statement).getSqlExplainResults();
List<String> strPlans = new ArrayList<>();
List<String> sqlPlans = new ArrayList<>();
List<String> datastreamPlans = new ArrayList<>();
for (SqlExplainResult item : sqlExplainRecords) {
if (Asserts.isNotNull(item.getType())
&& item.getType().contains(FlinkSQLConstant.DML)) {
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator);
for(String str : statements){
strPlans.add(str);
sqlPlans.add(str);
}
continue;
}
if(Asserts.isNotNull(item.getType())
&& item.getType().equals(FlinkSQLConstant.DATASTREAM)){
String[] statements = SqlUtil.getStatements(item.getSql(),sqlSeparator);
for(String str : statements){
datastreamPlans.add(str);
}
}
}
if(strPlans.size()>0){
return executor.getJobPlanInfo(strPlans);
if(sqlPlans.size()>0){
return executor.getJobPlanInfo(sqlPlans);
}else if(datastreamPlans.size()>0){
return executor.getJobPlanInfoFromDataStream(datastreamPlans);
}else{
return new JobPlanInfo("");
}
......
......@@ -30,6 +30,7 @@ import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import com.dlink.utils.UDFUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
......@@ -37,6 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.slf4j.Logger;
......@@ -318,6 +320,48 @@ public class JobManager {
}
}
}
if (jobParam.getExecute().size() > 0) {
if (useGateway) {
for (StatementParam item : jobParam.getExecute()) {
executor.executeSql(item.getValue());
if(!useStatementSet) {
break;
}
}
StreamGraph streamGraph = executor.getEnvironment().getStreamGraph();
streamGraph.setJobName(config.getJobName());
JobGraph jobGraph = streamGraph.getJobGraph();
GatewayResult gatewayResult = null;
if (GatewayType.YARN_APPLICATION.equals(runMode)||GatewayType.KUBERNETES_APPLICATION.equals(runMode)) {
config.addGatewayConfig(executor.getSetConfig());
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
} else {
config.addGatewayConfig(executor.getSetConfig());
if(Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath()));
}
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
}
job.setResult(InsertResult.success(gatewayResult.getAppId()));
job.setJobId(gatewayResult.getAppId());
job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL()));
} else {
for (StatementParam item : jobParam.getExecute()) {
executor.executeSql(item.getValue());
if(!useStatementSet) {
break;
}
}
JobExecutionResult jobExecutionResult = executor.getEnvironment().execute(config.getJobName());
if (jobExecutionResult.isJobExecutionResult()) {
job.setJobId(jobExecutionResult.getJobID().toHexString());
}
if (config.isUseResult()) {
IResult result = ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(),config.isUseAutoCancel()).getResult(null);
job.setResult(result);
}
}
}
job.setEndTime(LocalDateTime.now());
job.setStatus(Job.JobStatus.SUCCESS);
success();
......
......@@ -11,11 +11,17 @@ import java.util.List;
public class JobParam {
private List<StatementParam> ddl;
private List<StatementParam> trans;
private List<StatementParam> execute;
public JobParam(List<StatementParam> ddl, List<StatementParam> trans) {
this.ddl = ddl;
this.trans = trans;
}
public JobParam(List<StatementParam> ddl, List<StatementParam> trans, List<StatementParam> execute) {
this.ddl = ddl;
this.trans = trans;
this.execute = execute;
}
public List<StatementParam> getDdl() {
return ddl;
......@@ -32,4 +38,12 @@ public class JobParam {
public void setTrans(List<StatementParam> trans) {
this.trans = trans;
}
public List<StatementParam> getExecute() {
return execute;
}
public void setExecute(List<StatementParam> execute) {
this.execute = execute;
}
}
......@@ -60,4 +60,22 @@ public class SqlParserTest {
sql=sql.replaceAll("--([^'\r\n]{0,}('[^'\r\n]{0,}'){0,1}[^'\r\n]{0,}){0,}","").trim();
System.out.println(sql);
}
@Test
public void createCDCSourceTest(){
String sql = "EXECUTE CDCSOURCE demo WITH (\n" +
" 'hostname'='127.0.0.1',\n" +
" 'port'='3306',\n" +
" 'password'='dlink',\n" +
" 'hostname'='dlink',\n" +
" 'checkpoint'='3000',\n" +
" 'parallelism'='1',\n" +
" 'database'='dlink:test',\n" +
" 'table'='',\n" +
" 'topic'='dlinkcdc',\n" +
" 'brokers'='127.0.0.1:9092'\n" +
");";
Map<String,List<String>> lists = SingleSqlParserFactory.generateParser(sql);
System.out.println(lists.toString());
}
}
......@@ -19,6 +19,10 @@ public interface FlinkSQLConstant {
* DML 类型
*/
String DML = "DML";
/**
* DATASTREAM 类型
*/
String DATASTREAM = "DATASTREAM";
/**
* 片段 Fragments 标识
*/
......
......@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
......@@ -232,10 +233,36 @@ public abstract class Executor {
}
}
public ObjectNode getStreamGraphFromDataStream(List<String> statements){
for(String statement : statements){
executeSql(statement);
}
StreamGraph streamGraph = environment.getStreamGraph();
JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
String json = jsonGenerator.getJSON();
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(json);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
return objectNode;
}
}
public JobPlanInfo getJobPlanInfo(List<String> statements){
return stEnvironment.getJobPlanInfo(statements);
}
public JobPlanInfo getJobPlanInfoFromDataStream(List<String> statements){
for(String statement : statements){
executeSql(statement);
}
StreamGraph streamGraph = environment.getStreamGraph();
return new JobPlanInfo(JsonPlanGenerator.generatePlan(streamGraph.getJobGraph()));
}
public void registerFunction(String name, ScalarFunction function){
stEnvironment.registerFunction(name,function);
}
......
......@@ -44,7 +44,8 @@ public abstract class BaseSingleSqlParser {
for (SqlSegment sqlSegment : segments) {
sqlSegment.parse(originalSql);
if(Asserts.isNotNullString(sqlSegment.getStart())) {
map.put(sqlSegment.getStart().toUpperCase(), sqlSegment.getBodyPieces());
// map.put(sqlSegment.getStart().toUpperCase(), sqlSegment.getBodyPieces());
map.put(sqlSegment.getType().toUpperCase(), sqlSegment.getBodyPieces());
}
}
return map;
......
package com.dlink.parser;
/**
* CreateCDCSourceSqlParser
*
* @author wenmo
* @since 2022/1/29 23:39
*/
public class CreateCDCSourceSqlParser extends BaseSingleSqlParser {
public CreateCDCSourceSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("CDCSOURCE","(execute\\s+cdcsource\\s+)(.+)(\\s+with\\s+\\()", "[,]"));
segments.add(new SqlSegment("WITH","(with\\s+\\()(.+)(\\))", "[,]"));
}
}
......@@ -21,6 +21,8 @@ public class SingleSqlParserFactory {
tmp = new InsertSelectSqlParser(sql);
} else if (contains(sql, "(create\\s+aggtable)(.+)(as\\s+select)(.+)")) {
tmp = new CreateAggTableSelectSqlParser(sql);
} else if (contains(sql, "(execute\\s+cdcsource)")) {
tmp = new CreateCDCSourceSqlParser(sql);
} else if (contains(sql, "(select)(.+)(from)(.+)")) {
tmp = new SelectSqlParser(sql);
} else if (contains(sql, "(delete\\s+from)(.+)")) {
......
package com.dlink.parser;
import com.dlink.assertion.Asserts;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
......@@ -60,6 +62,16 @@ public class SqlSegment {
this.bodyPieces = new ArrayList<String>();
}
public SqlSegment(String type, String segmentRegExp, String bodySplitPattern) {
this.type = type;
this.start = "";
this.body = "";
this.end = "";
this.segmentRegExp = segmentRegExp;
this.bodySplitPattern = bodySplitPattern;
this.bodyPieces = new ArrayList<String>();
}
/**
* 从sql中查找符合segmentRegExp的部分,并赋值到start,body,end等三个属性中
**/
......@@ -70,7 +82,9 @@ public class SqlSegment {
start = matcher.group(1);
body = matcher.group(2);
end = matcher.group(3);
type = start.replace("\n"," ").replaceAll("\\s{1,}", " ").toUpperCase();
if(Asserts.isNullString(type)) {
type = start.replace("\n", " ").replaceAll("\\s{1,}", " ").toUpperCase();
}
parseBody();
}
}
......
......@@ -21,6 +21,7 @@ public enum SqlType {
UNLOAD("UNLOAD"),
SET("SET"),
RESET("RESET"),
EXECUTE("EXECUTE"),
UNKNOWN("UNKNOWN"),
;
......
......@@ -2,6 +2,7 @@ package com.dlink.trans;
import com.dlink.parser.SqlType;
import com.dlink.trans.ddl.CreateAggTableOperation;
import com.dlink.trans.ddl.CreateCDCSourceOperation;
import com.dlink.trans.ddl.SetOperation;
/**
......@@ -15,6 +16,7 @@ public class Operations {
private static Operation[] operations = {
new CreateAggTableOperation()
, new SetOperation()
, new CreateCDCSourceOperation()
};
public static SqlType getSqlTypeFromStatements(String statement){
......
package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts;
import com.dlink.parser.SingleSqlParserFactory;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* TODO
*
* @author wenmo
* @since 2022/1/29 23:30
*/
public class CDCSource {
private String statement;
private String name;
private String hostname;
private Integer port;
private String username;
private String password;
private Integer checkpoint;
private Integer parallelism;
private List<String> database;
private List<String> table;
private String topic;
private String brokers;
public CDCSource(String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String topic, String brokers) {
this.statement = statement;
this.name = name;
this.hostname = hostname;
this.port = port;
this.username = username;
this.password = password;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.topic = topic;
this.brokers = brokers;
}
public CDCSource(String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, List<String> database, List<String> table, String topic, String brokers) {
this.statement = statement;
this.name = name;
this.hostname = hostname;
this.port = port;
this.username = username;
this.password = password;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.database = database;
this.table = table;
this.topic = topic;
this.brokers = brokers;
}
public static CDCSource build(String statement) {
Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement);
Map<String, String> config = getKeyValue(map.get("WITH"));
CDCSource cdcSource = new CDCSource(statement,
map.get("CDCSOURCE").toString(),
config.get("hostname"),
Integer.valueOf(config.get("port")),
config.get("username"),
config.get("password"),
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("topic"),
config.get("brokers")
);
if(Asserts.isNotNullString(config.get("database"))){
cdcSource.setDatabase(Arrays.asList(config.get("database").split(":")));
}
if(Asserts.isNotNullString(config.get("table"))){
cdcSource.setTable(Arrays.asList(config.get("table").split(":")));
}
return cdcSource;
}
private static Map<String, String> getKeyValue(List<String> list) {
Map<String, String> map = new HashMap<>();
Pattern p = Pattern.compile("'(.*?)'\\s*=\\s*'(.*?)'");
for (int i = 0; i < list.size(); i++) {
Matcher m = p.matcher(list.get(i));
if(m.find()){
map.put(m.group(1),m.group(2));
}
}
return map;
}
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public Integer getCheckpoint() {
return checkpoint;
}
public void setCheckpoint(Integer checkpoint) {
this.checkpoint = checkpoint;
}
public Integer getParallelism() {
return parallelism;
}
public void setParallelism(Integer parallelism) {
this.parallelism = parallelism;
}
public List<String> getDatabase() {
return database;
}
public void setDatabase(List<String> database) {
this.database = database;
}
public List<String> getTable() {
return table;
}
public void setTable(List<String> table) {
this.table = table;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getBrokers() {
return brokers;
}
public void setBrokers(String brokers) {
this.brokers = brokers;
}
}
package com.dlink.trans.ddl;
import com.dlink.cdc.FlinkCDCMergeBuilder;
import com.dlink.executor.Executor;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation;
/**
* TODO
*
* @author wenmo
* @since 2022/1/29 23:25
*/
public class CreateCDCSourceOperation extends AbstractOperation implements Operation {
private String KEY_WORD = "EXECUTE CDCSOURCE";
public CreateCDCSourceOperation() {
}
public CreateCDCSourceOperation(String statement) {
super(statement);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public Operation create(String statement) {
return new CreateCDCSourceOperation(statement);
}
@Override
public void build(Executor executor) {
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getHostname(),cdcSource.getPort(),cdcSource.getUsername()
,cdcSource.getPassword(),cdcSource.getCheckpoint(),cdcSource.getParallelism(),cdcSource.getDatabase(),cdcSource.getTable()
,cdcSource.getTopic(),cdcSource.getBrokers());
try {
FlinkCDCMergeBuilder.buildMySqlCDC(executor.getEnvironment(),config);
} catch (Exception e) {
e.printStackTrace();
}
}
}
......@@ -638,6 +638,9 @@ export default (): React.ReactNode => {
<li>
<Link>新增 元数据生成 FlinkSQL 和 SQL</Link>
</li>
<li>
<Link>新增 CDCSOURCE 多源合并任务语法支持</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
......@@ -73,6 +73,7 @@ Dinky 通过已注册的集群配置来获取对应的 YarnClient 实例。对
| | | 支持 INSERT 语句集 | 0.4.0 |
| | | 新增 SQL 片段语法 | 0.4.0 |
| | | 新增 AGGTABLE 表值聚合语法及 UDATF 支持 | 0.4.0 |
| | | 新增 CDCSOURCE 多源合并语法支持 | 0.6.0 |
| | | 新增 FlinkSQLEnv 执行环境复用 | 0.5.0 |
| | | 新增 Flink Catalog 交互查询 | 0.4.0 |
| | | 新增 执行环境的共享与私有会话机制 | 0.4.0 |
......
......@@ -73,6 +73,7 @@ Dinky 通过已注册的集群配置来获取对应的 YarnClient 实例。对
| | | 支持 INSERT 语句集 | 0.4.0 |
| | | 新增 SQL 片段语法 | 0.4.0 |
| | | 新增 AGGTABLE 表值聚合语法及 UDATF 支持 | 0.4.0 |
| | | 新增 CDCSOURCE 多源合并语法支持 | 0.6.0 |
| | | 新增 FlinkSQLEnv 执行环境复用 | 0.5.0 |
| | | 新增 Flink Catalog 交互查询 | 0.4.0 |
| | | 新增 执行环境的共享与私有会话机制 | 0.4.0 |
......
......@@ -169,6 +169,11 @@
<artifactId>dlink-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment