Unverified Commit f713e0a6 authored by chrofram's avatar chrofram Committed by GitHub

[Optimization-819][dlink-client] CDCSOURCE with timestamp and timezone (#843)

change default timezone = UTC ,
set timezone with setting sink.timezone
(#819)
Co-authored-by: 's avatarqiaoyiming <wenpin@tianmagroup.com>
parent a06a7900
......@@ -69,6 +69,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
public SQLSinkBuilder() {
}
......@@ -188,6 +189,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
......@@ -254,15 +260,15 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
if (logicalType instanceof DateType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDate();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
......
......@@ -69,6 +69,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
public SQLSinkBuilder() {
}
......@@ -188,6 +189,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
......@@ -254,15 +260,15 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
if (logicalType instanceof DateType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDate();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
......
......@@ -69,6 +69,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
public SQLSinkBuilder() {
}
......@@ -187,6 +188,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
......@@ -254,17 +260,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
if (logicalType instanceof DateType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDate();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime();
} else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
......
......@@ -70,6 +70,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
public SQLSinkBuilder() {
}
......@@ -189,6 +190,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
......@@ -255,17 +261,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
if (logicalType instanceof DateType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDate();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime();
} else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
......
......@@ -69,6 +69,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
private final static String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L;
private ZoneId sinkTimeZone = ZoneId.of("UTC");
public SQLSinkBuilder() {
}
......@@ -188,6 +189,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)){
sinkTimeZone = ZoneId.of(timeZone);
}
final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) {
......@@ -254,17 +260,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
if (logicalType instanceof DateType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDate();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime();
} else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment