Unverified Commit 04b37e6e authored by aiwenmo's avatar aiwenmo Committed by GitHub

Fix 0.6.7-rc1 (#1017)

* [Fix][admin,web,app,core] Fix rc1 bugs (#1007)
Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>

* alter error (#1011)

* [Optimization-1014][client] Optimizate Doris sink and type convert and upgrade flink to 1.15.2 (#1015)
Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>

* UI page display misalignment fix (#1013)

Co-authored-by: steve <woai1998>

* out.collect在字段报错时没有上级抛错误,e.getCause().getMessage()会打不出错误栈,打印修改为logger.error("..",e). (#1016)
Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
Co-authored-by: 's avatartoms <94617906+Toms1999@users.noreply.github.com>
Co-authored-by: 's avatarikiler <34333571+ikiler@users.noreply.github.com>
Co-authored-by: 's avatarmengyejiang <403905717@qq.com>
parent a70e0209
...@@ -72,4 +72,19 @@ public class FileUploadController { ...@@ -72,4 +72,19 @@ public class FileUploadController {
} }
} }
/**
* Upload hdfs file<br>
*
* @param files Multi files
* @param dir Dir, default is empty. If not provide, please provide the 'fileType' value
* @param hadoopConfigPath Please refer {@link UploadFileConstant}, default is -1. If not provide, please provide the 'dir' value
* @return {@link Result}
*/
@PostMapping(value = "hdfs")
public Result uploadHdfs(@RequestPart("files") MultipartFile[] files,
@RequestParam(value = "dir", defaultValue = "", required = false) String dir,
@RequestParam(value = "hadoopConfigPath", required = false) String hadoopConfigPath) {
return fileUploadService.uploadHdfs(files, dir, hadoopConfigPath);
}
} }
...@@ -34,7 +34,7 @@ public interface FileUploadService { ...@@ -34,7 +34,7 @@ public interface FileUploadService {
* *
* @param file {@link MultipartFile} instance * @param file {@link MultipartFile} instance
* @param fileType Upload file's type, refer ${@link UploadFileConstant} * @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result} * @return {@link Result}
*/ */
Result upload(MultipartFile file, Byte fileType); Result upload(MultipartFile file, Byte fileType);
...@@ -43,7 +43,7 @@ public interface FileUploadService { ...@@ -43,7 +43,7 @@ public interface FileUploadService {
* *
* @param files {@link MultipartFile} instance * @param files {@link MultipartFile} instance
* @param fileType Upload file's type, refer ${@link UploadFileConstant} * @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result} * @return {@link Result}
*/ */
Result upload(MultipartFile[] files, Byte fileType); Result upload(MultipartFile[] files, Byte fileType);
...@@ -53,7 +53,7 @@ public interface FileUploadService { ...@@ -53,7 +53,7 @@ public interface FileUploadService {
* @param file {@link MultipartFile} instance * @param file {@link MultipartFile} instance
* @param dir Local absolute dir * @param dir Local absolute dir
* @param fileType Upload file's type, refer ${@link UploadFileConstant} * @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result} * @return {@link Result}
*/ */
Result upload(MultipartFile file, String dir, Byte fileType); Result upload(MultipartFile file, String dir, Byte fileType);
...@@ -63,8 +63,25 @@ public interface FileUploadService { ...@@ -63,8 +63,25 @@ public interface FileUploadService {
* @param files {@link MultipartFile} instance * @param files {@link MultipartFile} instance
* @param dir Local absolute dir * @param dir Local absolute dir
* @param fileType Upload file's type, refer ${@link UploadFileConstant} * @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result} * @return {@link Result}
*/ */
Result upload(MultipartFile[] files, String dir, Byte fileType); Result upload(MultipartFile[] files, String dir, Byte fileType);
/**
* Upload one hdfs file, if target file exists, will delete it first
*
* @param file {@link MultipartFile} instance
* @param hadoopConfigPath core-site.xml,hdfs-site.xml,yarn-site.xml
* @return {@link Result}
*/
Result uploadHdfs(MultipartFile file, String dir, String hadoopConfigPath);
/**
* Upload multy hdfs file, if target file exists, will delete it first
*
* @param files {@link MultipartFile} instance
* @param hadoopConfigPath core-site.xml,hdfs-site.xml,yarn-site.xml
* @return {@link Result}
*/
Result uploadHdfs(MultipartFile[] files, String dir, String hadoopConfigPath);
} }
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package com.dlink.service.impl; package com.dlink.service.impl;
import com.dlink.assertion.Asserts;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.constant.UploadFileConstant; import com.dlink.constant.UploadFileConstant;
import com.dlink.model.CodeEnum; import com.dlink.model.CodeEnum;
...@@ -48,8 +49,6 @@ import lombok.extern.slf4j.Slf4j; ...@@ -48,8 +49,6 @@ import lombok.extern.slf4j.Slf4j;
@Service @Service
public class FileUploadServiceImpl implements FileUploadService { public class FileUploadServiceImpl implements FileUploadService {
@Resource
private HdfsUtil hdfsUtil;
@Resource @Resource
private UploadFileRecordService uploadFileRecordService; private UploadFileRecordService uploadFileRecordService;
...@@ -76,7 +75,7 @@ public class FileUploadServiceImpl implements FileUploadService { ...@@ -76,7 +75,7 @@ public class FileUploadServiceImpl implements FileUploadService {
} }
} }
case UploadFileConstant.TARGET_HDFS: { case UploadFileConstant.TARGET_HDFS: {
Result result = hdfsUtil.uploadFile(filePath, file); Result result = HdfsUtil.uploadFile(filePath, file);
if (Objects.equals(result.getCode(), CodeEnum.SUCCESS.getCode())) { if (Objects.equals(result.getCode(), CodeEnum.SUCCESS.getCode())) {
if (uploadFileRecordService.saveOrUpdateFile(file.getOriginalFilename(), dir, filePath, fileType, UploadFileConstant.TARGET_HDFS)) { if (uploadFileRecordService.saveOrUpdateFile(file.getOriginalFilename(), dir, filePath, fileType, UploadFileConstant.TARGET_HDFS)) {
return Result.succeed("上传成功"); return Result.succeed("上传成功");
...@@ -119,6 +118,42 @@ public class FileUploadServiceImpl implements FileUploadService { ...@@ -119,6 +118,42 @@ public class FileUploadServiceImpl implements FileUploadService {
} }
} }
@Override
public Result uploadHdfs(MultipartFile file, String dir, String hadoopConfigPath) {
String filePath = FilePathUtil.addFileSeparator(dir) + file.getOriginalFilename();
Result result = HdfsUtil.uploadFile(filePath, file, hadoopConfigPath);
if (Objects.equals(result.getCode(), CodeEnum.SUCCESS.getCode())) {
if (uploadFileRecordService.saveOrUpdateFile(file.getOriginalFilename(), dir, filePath, UploadFileConstant.FLINK_LIB_ID, UploadFileConstant.TARGET_HDFS)) {
return Result.succeed("上传成功");
} else {
return Result.failed("数据库异常");
}
} else {
return result;
}
}
@Override
public Result uploadHdfs(MultipartFile[] files, String dir, String hadoopConfigPath) {
if (Asserts.isNullString(dir)) {
dir = UploadFileConstant.getDirPath(UploadFileConstant.FLINK_LIB_ID);
}
if (files.length > 0) {
for (MultipartFile file : files) {
Result uploadResult = uploadHdfs(file, dir, hadoopConfigPath);
if (Objects.equals(uploadResult.getCode(), CodeEnum.ERROR.getCode())) {
return uploadResult;
}
}
if (!uploadFileRecordService.saveOrUpdateDir(dir, UploadFileConstant.FLINK_LIB_ID, UploadFileConstant.TARGET_HDFS)) {
return Result.failed("数据库异常");
}
return Result.succeed("全部上传成功");
} else {
return Result.succeed("没有检测到要上传的文件");
}
}
@Override @Override
public Result upload(MultipartFile[] files, Byte fileType) { public Result upload(MultipartFile[] files, Byte fileType) {
String dir = UploadFileConstant.getDirPath(fileType); String dir = UploadFileConstant.getDirPath(fileType);
......
package com.dlink.utils; package com.dlink.utils;
import com.dlink.assertion.Asserts;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.constant.UploadFileConstant; import com.dlink.constant.UploadFileConstant;
import com.dlink.model.CodeEnum; import com.dlink.model.CodeEnum;
...@@ -13,9 +14,6 @@ import java.io.File; ...@@ -13,9 +14,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import cn.hutool.core.exceptions.ExceptionUtil; import cn.hutool.core.exceptions.ExceptionUtil;
...@@ -25,24 +23,27 @@ import lombok.extern.slf4j.Slf4j; ...@@ -25,24 +23,27 @@ import lombok.extern.slf4j.Slf4j;
* Hdfs Handle * Hdfs Handle
**/ **/
@Slf4j @Slf4j
@Component
public class HdfsUtil { public class HdfsUtil {
private final Configuration configuration = new Configuration(); private static FileSystem hdfs = null;
private FileSystem hdfs = null;
/** /**
* Init internal hdfs client * Init internal hdfs client
*
* @param hadoopConfigPath HDFS config path
*/ */
@PostConstruct private static Result init(String hadoopConfigPath) {
private Result init() {
if (hdfs == null) { if (hdfs == null) {
String coreSiteFilePath = FilePathUtil.addFileSeparator(UploadFileConstant.HADOOP_CONF_DIR) + "core-site.xml"; if (Asserts.isNullString(hadoopConfigPath)) {
String hdfsSiteFilePath = FilePathUtil.addFileSeparator(UploadFileConstant.HADOOP_CONF_DIR) + "hdfs-site.xml"; hadoopConfigPath = FilePathUtil.removeFileSeparator(UploadFileConstant.HADOOP_CONF_DIR);
}
String coreSiteFilePath = hadoopConfigPath + "/core-site.xml";
String hdfsSiteFilePath = hadoopConfigPath + "/hdfs-site.xml";
if (!new File(coreSiteFilePath).exists() || !new File(hdfsSiteFilePath).exists()) { if (!new File(coreSiteFilePath).exists() || !new File(hdfsSiteFilePath).exists()) {
return Result.failed("在项目根目录下没有找到 core-site.xml/hdfs-site.xml/yarn-site.xml 文件,请先上传这些文件"); return Result.failed("在项目根目录下没有找到 core-site.xml/hdfs-site.xml/yarn-site.xml 文件,请先上传这些文件");
} }
try { try {
final Configuration configuration = new Configuration();
configuration.addResource(new Path(coreSiteFilePath)); configuration.addResource(new Path(coreSiteFilePath));
configuration.addResource(new Path(hdfsSiteFilePath)); configuration.addResource(new Path(hdfsSiteFilePath));
hdfs = FileSystem.get(configuration); hdfs = FileSystem.get(configuration);
...@@ -60,10 +61,22 @@ public class HdfsUtil { ...@@ -60,10 +61,22 @@ public class HdfsUtil {
* *
* @param path HDFS path * @param path HDFS path
* @param bytes File byte content * @param bytes File byte content
* @return {@link com.dlink.common.result.Result} * @return {@link Result}
*/
public static Result uploadFile(String path, byte[] bytes) {
return uploadFile(path, bytes, null);
}
/**
* Upload file byte content to HDFS
*
* @param path HDFS path
* @param bytes File byte content
* @param hadoopConfigPath hdfs config path
* @return {@link Result}
*/ */
public Result uploadFile(String path, byte[] bytes) { public static Result uploadFile(String path, byte[] bytes, String hadoopConfigPath) {
Result initResult = init(); Result initResult = init(hadoopConfigPath);
if (Objects.equals(initResult.getCode(), CodeEnum.SUCCESS.getCode())) { if (Objects.equals(initResult.getCode(), CodeEnum.SUCCESS.getCode())) {
try (FSDataOutputStream stream = hdfs.create(new Path(path), true)) { try (FSDataOutputStream stream = hdfs.create(new Path(path), true)) {
stream.write(bytes); stream.write(bytes);
...@@ -83,9 +96,9 @@ public class HdfsUtil { ...@@ -83,9 +96,9 @@ public class HdfsUtil {
* *
* @param path HDFS path * @param path HDFS path
* @param file MultipartFile instance * @param file MultipartFile instance
* @return {@link com.dlink.common.result.Result} * @return {@link Result}
*/ */
public Result uploadFile(String path, MultipartFile file) { public static Result uploadFile(String path, MultipartFile file) {
try { try {
return uploadFile(path, file.getBytes()); return uploadFile(path, file.getBytes());
} catch (IOException e) { } catch (IOException e) {
...@@ -94,4 +107,21 @@ public class HdfsUtil { ...@@ -94,4 +107,21 @@ public class HdfsUtil {
} }
} }
/**
* Upload file byte content to HDFS
*
* @param path HDFS path
* @param file MultipartFile instance
* @param hadoopConfigPath hdfs config path
* @return {@link Result}
*/
public static Result uploadFile(String path, MultipartFile file, String hadoopConfigPath) {
try {
return uploadFile(path, file.getBytes(), hadoopConfigPath);
} catch (IOException e) {
log.error(ExceptionUtil.stacktraceToString(e));
return Result.failed("文件上传失败");
}
}
} }
...@@ -28,6 +28,7 @@ import com.dlink.executor.ExecutorSetting; ...@@ -28,6 +28,7 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.CheckpointingOptions;
...@@ -90,7 +91,7 @@ public class Submiter { ...@@ -90,7 +91,7 @@ public class Submiter {
} }
public static List<String> getStatements(String sql) { public static List<String> getStatements(String sql) {
return Arrays.asList(sql.split(FlinkSQLConstant.SEPARATOR)); return Arrays.asList(SqlUtil.getStatements(sql));
} }
public static void submit(Integer id, DBConfig dbConfig) { public static void submit(Integer id, DBConfig dbConfig) {
......
...@@ -193,7 +193,7 @@ public abstract class AbstractSinkBuilder { ...@@ -193,7 +193,7 @@ public abstract class AbstractSinkBuilder {
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
......
...@@ -144,7 +144,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -144,7 +144,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
......
...@@ -193,7 +193,7 @@ public abstract class AbstractSinkBuilder { ...@@ -193,7 +193,7 @@ public abstract class AbstractSinkBuilder {
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
......
...@@ -144,7 +144,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -144,7 +144,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
......
...@@ -193,7 +193,7 @@ public abstract class AbstractSinkBuilder { ...@@ -193,7 +193,7 @@ public abstract class AbstractSinkBuilder {
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
......
...@@ -144,7 +144,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -144,7 +144,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
......
...@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector; ...@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag; import org.apache.flink.util.OutputTag;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -68,6 +67,8 @@ import java.util.List; ...@@ -68,6 +67,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import javax.xml.bind.DatatypeConverter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -83,6 +84,7 @@ public abstract class AbstractSinkBuilder { ...@@ -83,6 +84,7 @@ public abstract class AbstractSinkBuilder {
protected FlinkCDCConfig config; protected FlinkCDCConfig config;
protected List<ModifyOperation> modifyOperations = new ArrayList(); protected List<ModifyOperation> modifyOperations = new ArrayList();
private ZoneId sinkTimeZone = ZoneId.of("UTC");
public AbstractSinkBuilder() { public AbstractSinkBuilder() {
} }
...@@ -104,7 +106,7 @@ public abstract class AbstractSinkBuilder { ...@@ -104,7 +106,7 @@ public abstract class AbstractSinkBuilder {
Map<String, String> sink = config.getSink(); Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) { for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("sink.properties") && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("sink.properties.",""), entry.getValue()); properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
} }
} }
return properties; return properties;
...@@ -193,7 +195,7 @@ public abstract class AbstractSinkBuilder { ...@@ -193,7 +195,7 @@ public abstract class AbstractSinkBuilder {
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
...@@ -213,6 +215,12 @@ public abstract class AbstractSinkBuilder { ...@@ -213,6 +215,12 @@ public abstract class AbstractSinkBuilder {
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)) {
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
...@@ -294,14 +302,41 @@ public abstract class AbstractSinkBuilder { ...@@ -294,14 +302,41 @@ public abstract class AbstractSinkBuilder {
if (logicalType instanceof VarCharType) { if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value); return StringData.fromString((String) value);
} else if (logicalType instanceof DateType) { } else if (logicalType instanceof DateType) {
return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString()); return value;
} else if (logicalType instanceof TimestampType) { } else if (logicalType instanceof TimestampType) {
return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value))); if (value instanceof Integer) {
return TimestampData.fromLocalDateTime(Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime());
} else if (value instanceof Long) {
return TimestampData.fromLocalDateTime(Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime());
} else {
return TimestampData.fromLocalDateTime(Instant.parse(value.toString()).atZone(sinkTimeZone).toLocalDateTime());
}
} else if (logicalType instanceof DecimalType) { } else if (logicalType instanceof DecimalType) {
final DecimalType decimalType = ((DecimalType) logicalType); final DecimalType decimalType = ((DecimalType) logicalType);
final int precision = decimalType.getPrecision(); final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale(); final int scale = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scale); return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scale);
} else if (logicalType instanceof FloatType) {
if (value instanceof Float) {
return value;
} else if (value instanceof Double) {
return ((Double) value).floatValue();
} else {
return Float.parseFloat(value.toString());
}
} else if (logicalType instanceof BigIntType) {
if (value instanceof Integer) {
return ((Integer) value).longValue();
} else {
return value;
}
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary(value.toString());
} else {
return value;
}
} else { } else {
return value; return value;
} }
......
...@@ -40,6 +40,7 @@ import java.io.Serializable; ...@@ -40,6 +40,7 @@ import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
/** /**
* DorisSinkBuilder * DorisSinkBuilder
...@@ -137,6 +138,8 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -137,6 +138,8 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
} }
if (sink.containsKey(DorisSinkOptions.SINK_ENABLE_DELETE.key())) { if (sink.containsKey(DorisSinkOptions.SINK_ENABLE_DELETE.key())) {
executionBuilder.setDeletable(Boolean.valueOf(sink.get(DorisSinkOptions.SINK_ENABLE_DELETE.key()))); executionBuilder.setDeletable(Boolean.valueOf(sink.get(DorisSinkOptions.SINK_ENABLE_DELETE.key())));
} else {
executionBuilder.setDeletable(true);
} }
if (sink.containsKey(DorisSinkOptions.SINK_LABEL_PREFIX.key())) { if (sink.containsKey(DorisSinkOptions.SINK_LABEL_PREFIX.key())) {
executionBuilder.setLabelPrefix(getSinkSchemaName(table) + "_" + getSinkTableName(table) + sink.get(DorisSinkOptions.SINK_LABEL_PREFIX.key())); executionBuilder.setLabelPrefix(getSinkSchemaName(table) + "_" + getSinkTableName(table) + sink.get(DorisSinkOptions.SINK_LABEL_PREFIX.key()));
...@@ -144,7 +147,12 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -144,7 +147,12 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
if (sink.containsKey(DorisSinkOptions.SINK_MAX_RETRIES.key())) { if (sink.containsKey(DorisSinkOptions.SINK_MAX_RETRIES.key())) {
executionBuilder.setMaxRetries(Integer.valueOf(sink.get(DorisSinkOptions.SINK_MAX_RETRIES.key()))); executionBuilder.setMaxRetries(Integer.valueOf(sink.get(DorisSinkOptions.SINK_MAX_RETRIES.key())));
} }
executionBuilder.setStreamLoadProp(getProperties());
Properties properties = getProperties();
// Doris 1.1 need to this para to support delete
properties.setProperty("columns", String.join(",", columnNameList) + ",__DORIS_DELETE_SIGN__");
executionBuilder.setStreamLoadProp(properties);
// Create DorisSink. // Create DorisSink.
DorisSink.Builder<RowData> builder = DorisSink.builder(); DorisSink.Builder<RowData> builder = DorisSink.builder();
...@@ -153,6 +161,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -153,6 +161,7 @@ public class DorisSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
.setSerializer(RowDataSerializer.builder() .setSerializer(RowDataSerializer.builder()
.setFieldNames(columnNames) .setFieldNames(columnNames)
.setType("json") .setType("json")
.enableDelete(true)
.setFieldType(columnTypes).build()) .setFieldType(columnTypes).build())
.setDorisOptions(dorisBuilder.build()); .setDorisOptions(dorisBuilder.build());
......
...@@ -145,7 +145,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -145,7 +145,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value),e);
throw e; throw e;
} }
} }
......
...@@ -193,7 +193,7 @@ public abstract class AbstractSinkBuilder { ...@@ -193,7 +193,7 @@ public abstract class AbstractSinkBuilder {
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
......
...@@ -144,7 +144,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -144,7 +144,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e);
throw e; throw e;
} }
} }
......
...@@ -207,82 +207,82 @@ public class FlinkAPI { ...@@ -207,82 +207,82 @@ public class FlinkAPI {
} }
/** /**
* @return JsonNode
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getJobManagerMetrics 获取jobManager的监控信息 * @Description: getJobManagerMetrics 获取jobManager的监控信息
* @return JsonNode
*/ */
public JsonNode getJobManagerMetrics() { public JsonNode getJobManagerMetrics() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER)); return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER));
} }
/** /**
* @return JsonNode
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getJobManagerConfig 获取jobManager的配置信息 * @Description: getJobManagerConfig 获取jobManager的配置信息
* @return JsonNode
*/ */
public JsonNode getJobManagerConfig() { public JsonNode getJobManagerConfig() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.CONFIG); return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.CONFIG);
} }
/** /**
* @return JsonNode
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getJobManagerLog 获取jobManager的日志信息 * @Description: getJobManagerLog 获取jobManager的日志信息
* @return JsonNode
*/ */
public String getJobManagerLog() { public String getJobManagerLog() {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOG); return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOG);
} }
/** /**
* @return String
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getJobManagerStdOut 获取jobManager的控制台输出日志 * @Description: getJobManagerStdOut 获取jobManager的控制台输出日志
* @return String
*/ */
public String getJobManagerStdOut() { public String getJobManagerStdOut() {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.STDOUT); return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.STDOUT);
} }
/** /**
* @return JsonNode
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getJobManagerLogList 获取jobManager的日志列表 * @Description: getJobManagerLogList 获取jobManager的日志列表
* @return JsonNode
*/ */
public JsonNode getJobManagerLogList() { public JsonNode getJobManagerLogList() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS); return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS);
} }
/** /**
* @param logName 日志文件名
* @return String
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getJobManagerLogFileDetail 获取jobManager的日志文件的具体信息 * @Description: getJobManagerLogFileDetail 获取jobManager的日志文件的具体信息
* @param logName 日志文件名
* @return String
*/ */
public String getJobManagerLogFileDetail(String logName) { public String getJobManagerLogFileDetail(String logName) {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS + logName); return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS + logName);
} }
/** /**
* @return JsonNode
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getTaskManagers 获取taskManager的列表 * @Description: getTaskManagers 获取taskManager的列表
* @return JsonNode
*/ */
public JsonNode getTaskManagers() { public JsonNode getTaskManagers() {
return get(FlinkRestAPIConstant.TASK_MANAGER); return get(FlinkRestAPIConstant.TASK_MANAGER);
} }
/** /**
* @return String
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: buildMetricsParms 构建metrics参数 * @Description: buildMetricsParms 构建metrics参数
* @Params: type: 入参类型 可选值:task-manager, job-manager * @Params: type: 入参类型 可选值:task-manager, job-manager
* @return String
*/ */
public String buildMetricsParms(String type) { public String buildMetricsParms(String type) {
JsonNode jsonNode = get(type + FlinkRestAPIConstant.METRICS); JsonNode jsonNode = get(type + FlinkRestAPIConstant.METRICS);
...@@ -290,70 +290,75 @@ public class FlinkAPI { ...@@ -290,70 +290,75 @@ public class FlinkAPI {
Iterator<JsonNode> jsonNodeIterator = jsonNode.elements(); Iterator<JsonNode> jsonNodeIterator = jsonNode.elements();
while (jsonNodeIterator.hasNext()) { while (jsonNodeIterator.hasNext()) {
JsonNode node = jsonNodeIterator.next(); JsonNode node = jsonNodeIterator.next();
sb.append(node.get("id").asText()).append(","); if (Asserts.isNotNull(node) && Asserts.isNotNull(node.get("id"))) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(node.get("id").asText());
}
} }
return sb.deleteCharAt(sb.length() - 1).toString(); return sb.toString();
} }
/** /**
* @return JsonNode
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getJobManagerLog 获取jobManager的日志信息 * @Description: getJobManagerLog 获取jobManager的日志信息
* @return JsonNode
*/ */
public JsonNode getTaskManagerMetrics(String containerId) { public JsonNode getTaskManagerMetrics(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER)); return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER));
} }
/** /**
* @param containerId 容器id
* @return String
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getTaskManagerLog 获取taskManager的日志信息 * @Description: getTaskManagerLog 获取taskManager的日志信息
* @param containerId 容器id
* @return String
*/ */
public String getTaskManagerLog(String containerId) { public String getTaskManagerLog(String containerId) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOG); return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOG);
} }
/** /**
* @param containerId 容器id
* @return JsonNode
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getTaskManagerStdOut 获取taskManager的StdOut日志信息 * @Description: getTaskManagerStdOut 获取taskManager的StdOut日志信息
* @param containerId 容器id
* @return JsonNode
*/ */
public String getTaskManagerStdOut(String containerId) { public String getTaskManagerStdOut(String containerId) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.STDOUT); return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.STDOUT);
} }
/** /**
* @param containerId 容器id
* @return JsonNode
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getTaskManagerLogList 获取taskManager的日志列表 * @Description: getTaskManagerLogList 获取taskManager的日志列表
* @param containerId 容器id
* @return JsonNode
*/ */
public JsonNode getTaskManagerLogList(String containerId) { public JsonNode getTaskManagerLogList(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOGS); return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOGS);
} }
/** /**
* @param logName 日志名称
* @return String
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getTaskManagerLogFileDeatil 获取具体日志的详细信息 * @Description: getTaskManagerLogFileDeatil 获取具体日志的详细信息
* @param logName 日志名称
* @return String
*/ */
public String getTaskManagerLogFileDeatil(String containerId,String logName) { public String getTaskManagerLogFileDeatil(String containerId, String logName) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOGS + logName); return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOGS + logName);
} }
/** /**
* @return JsonNode
* @Author: zhumingye * @Author: zhumingye
* @date: 2022/6/24 * @date: 2022/6/24
* @Description: getTaskManagerThreadDump 获取taskManager的线程信息 * @Description: getTaskManagerThreadDump 获取taskManager的线程信息
* @return JsonNode
*/ */
public JsonNode getTaskManagerThreadDump(String containerId) { public JsonNode getTaskManagerThreadDump(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.THREAD_DUMP); return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.THREAD_DUMP);
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.1</flink.version> <flink.version>1.15.2</flink.version>
<flink.guava.version>15.0</flink.guava.version> <flink.guava.version>15.0</flink.guava.version>
<flinkcdc.version>2.2.1</flinkcdc.version> <flinkcdc.version>2.2.1</flinkcdc.version>
<commons.version>1.3.1</commons.version> <commons.version>1.3.1</commons.version>
......
...@@ -82,11 +82,11 @@ ol { ...@@ -82,11 +82,11 @@ ol {
margin-bottom: 0!important; margin-bottom: 0!important;
} }
div .ant-pro-card-body { //div .ant-pro-card-body {
padding: 2px; // padding: 2px;
} //}
div .ant-pro-page-container{ div .ant-pro-page-container{
padding-top: 1px; padding-top: 0px;
} }
div .ant-page-header { div .ant-page-header {
...@@ -98,5 +98,5 @@ div .ant-pro-page-container-children-content{ ...@@ -98,5 +98,5 @@ div .ant-pro-page-container-children-content{
} }
.ant-tabs-top > .ant-tabs-nav, .ant-tabs-bottom > .ant-tabs-nav, .ant-tabs-top > div > .ant-tabs-nav, .ant-tabs-bottom > div > .ant-tabs-nav { .ant-tabs-top > .ant-tabs-nav, .ant-tabs-bottom > .ant-tabs-nav, .ant-tabs-top > div > .ant-tabs-nav, .ant-tabs-bottom > div > .ant-tabs-nav {
margin: 0 0 4px 0!important; margin: 0 0 0px 0!important;
} }
...@@ -102,7 +102,6 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props ...@@ -102,7 +102,6 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
return { return {
name: 'files', name: 'files',
action: '/api/fileUpload', action: '/api/fileUpload',
// accept: 'application/json',
headers: { headers: {
authorization: 'authorization-text', authorization: 'authorization-text',
}, },
...@@ -124,6 +123,32 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props ...@@ -124,6 +123,32 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
} }
}; };
const getUploadHdfsProps = (dir: string) => {
return {
name: 'files',
action: '/api/fileUpload/hdfs',
headers: {
authorization: 'authorization-text',
},
data: {
dir,
hadoopConfigPath
},
showUploadList: true,
onChange(info) {
if (info.file.status === 'done') {
if (info.file.response.code == CODE.SUCCESS) {
message.success(info.file.response.msg);
} else {
message.warn(info.file.response.msg);
}
} else if (info.file.status === 'error') {
message.error(`${info.file.name} 上传失败`);
}
},
}
};
const renderContent = (formValsPara: Partial<ClusterConfigurationTableListItem>) => { const renderContent = (formValsPara: Partial<ClusterConfigurationTableListItem>) => {
return ( return (
<> <>
...@@ -146,7 +171,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props ...@@ -146,7 +171,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
> >
<Input placeholder="值如 /etc/hadoop/conf" addonAfter={ <Input placeholder="值如 /etc/hadoop/conf" addonAfter={
<Form.Item name="suffix" noStyle> <Form.Item name="suffix" noStyle>
<Upload {...getUploadProps(hadoopConfigPath)}> <Upload {...getUploadProps(hadoopConfigPath)} multiple>
<UploadOutlined/> <UploadOutlined/>
</Upload> </Upload>
</Form.Item>}/> </Form.Item>}/>
...@@ -235,7 +260,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props ...@@ -235,7 +260,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
> >
<Input placeholder="值如 hdfs:///flink/lib" addonAfter={ <Input placeholder="值如 hdfs:///flink/lib" addonAfter={
<Form.Item name="suffix" noStyle> <Form.Item name="suffix" noStyle>
<Upload {...getUploadProps(flinkLibPath)}> <Upload {...getUploadHdfsProps(flinkLibPath)} multiple>
<UploadOutlined/> <UploadOutlined/>
</Upload> </Upload>
</Form.Item>}/> </Form.Item>}/>
......
...@@ -10,7 +10,7 @@ title: DB SQL 作业快速入门 ...@@ -10,7 +10,7 @@ title: DB SQL 作业快速入门
### 创建数据源 ### 创建数据源
选择**注册中心>>数据源管理>>新建**,假设您连接Starrocks。 选择**注册中心>>数据源管理>>新建**,假设您连接Doris。
![createSource](http://www.aiwenmo.com/dinky/docs/zh-CN/quick_start/dbql_quick_start/createSource.png) ![createSource](http://www.aiwenmo.com/dinky/docs/zh-CN/quick_start/dbql_quick_start/createSource.png)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment