Commit dc0c45fb authored by wenmo's avatar wenmo

[Fix-140][core] Fix 'table.local-time-zone' parameter is invalid

parent b621d2af
package com.dlink.job; package com.dlink.job;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.api.FlinkAPI; import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
...@@ -21,7 +44,13 @@ import com.dlink.model.SystemConfiguration; ...@@ -21,7 +44,13 @@ import com.dlink.model.SystemConfiguration;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.pool.ClassEntity; import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool; import com.dlink.pool.ClassPool;
import com.dlink.result.*; import com.dlink.result.ErrorResult;
import com.dlink.result.ExplainResult;
import com.dlink.result.IResult;
import com.dlink.result.InsertResult;
import com.dlink.result.ResultBuilder;
import com.dlink.result.ResultPool;
import com.dlink.result.SelectResult;
import com.dlink.session.ExecutorEntity; import com.dlink.session.ExecutorEntity;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo; import com.dlink.session.SessionInfo;
...@@ -31,28 +60,6 @@ import com.dlink.utils.LogUtil; ...@@ -31,28 +60,6 @@ import com.dlink.utils.LogUtil;
import com.dlink.utils.SqlUtil; import com.dlink.utils.SqlUtil;
import com.dlink.utils.UDFUtil; import com.dlink.utils.UDFUtil;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* JobManager * JobManager
...@@ -278,7 +285,8 @@ public class JobManager { ...@@ -278,7 +285,8 @@ public class JobManager {
} }
if (config.isUseResult()) { if (config.isUseResult()) {
// Build insert result. // Build insert result.
IResult result = ResultBuilder.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel()).getResult(tableResult); IResult result =
ResultBuilder.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult);
job.setResult(result); job.setResult(result);
} }
} }
...@@ -301,7 +309,8 @@ public class JobManager { ...@@ -301,7 +309,8 @@ public class JobManager {
FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getValue()); FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getValue());
if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) { if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) {
if (config.isUseResult()) { if (config.isUseResult()) {
IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel()).getResult(flinkInterceptorResult.getTableResult()); IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone())
.getResult(flinkInterceptorResult.getTableResult());
job.setResult(result); job.setResult(result);
} }
} else { } else {
...@@ -314,7 +323,8 @@ public class JobManager { ...@@ -314,7 +323,8 @@ public class JobManager {
}}); }});
} }
if (config.isUseResult()) { if (config.isUseResult()) {
IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel()).getResult(tableResult); IResult result =
ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult);
job.setResult(result); job.setResult(result);
} }
} }
...@@ -364,7 +374,7 @@ public class JobManager { ...@@ -364,7 +374,7 @@ public class JobManager {
}}); }});
} }
if (config.isUseResult()) { if (config.isUseResult()) {
IResult result = ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel()).getResult(null); IResult result = ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(null);
job.setResult(result); job.setResult(result);
} }
} }
...@@ -427,7 +437,7 @@ public class JobManager { ...@@ -427,7 +437,7 @@ public class JobManager {
} }
LocalDateTime startTime = LocalDateTime.now(); LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(newStatement); TableResult tableResult = executor.executeSql(newStatement);
IResult result = ResultBuilder.build(operationType, config.getMaxRowNum(), false, false).getResult(tableResult); IResult result = ResultBuilder.build(operationType, config.getMaxRowNum(), false, false, executor.getTimeZone()).getResult(tableResult);
result.setStartTime(startTime); result.setStartTime(startTime);
return result; return result;
} }
......
package com.dlink.result; package com.dlink.result;
import com.dlink.parser.SqlType;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import com.dlink.parser.SqlType;
/** /**
* ResultBuilder * ResultBuilder
* *
...@@ -11,10 +12,10 @@ import org.apache.flink.table.api.TableResult; ...@@ -11,10 +12,10 @@ import org.apache.flink.table.api.TableResult;
**/ **/
public interface ResultBuilder { public interface ResultBuilder {
static ResultBuilder build(SqlType operationType, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) { static ResultBuilder build(SqlType operationType, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
switch (operationType) { switch (operationType) {
case SELECT: case SELECT:
return new SelectResultBuilder(maxRowNum, isChangeLog, isAutoCancel); return new SelectResultBuilder(maxRowNum, isChangeLog, isAutoCancel, timeZone);
case SHOW: case SHOW:
case DESC: case DESC:
case DESCRIBE: case DESCRIBE:
......
package com.dlink.result; package com.dlink.result;
import com.dlink.constant.FlinkConstant;
import com.dlink.utils.FlinkUtil;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import java.util.*; import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.dlink.constant.FlinkConstant;
import com.dlink.utils.FlinkUtil;
/** /**
* ResultRunnable * ResultRunnable
...@@ -20,13 +29,15 @@ public class ResultRunnable implements Runnable { ...@@ -20,13 +29,15 @@ public class ResultRunnable implements Runnable {
private Integer maxRowNum; private Integer maxRowNum;
private boolean isChangeLog; private boolean isChangeLog;
private boolean isAutoCancel; private boolean isAutoCancel;
private String timeZone = ZoneId.systemDefault().getId();
private String nullColumn = ""; private String nullColumn = "";
public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) { public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
this.tableResult = tableResult; this.tableResult = tableResult;
this.maxRowNum = maxRowNum; this.maxRowNum = maxRowNum;
this.isChangeLog = isChangeLog; this.isChangeLog = isChangeLog;
this.isAutoCancel = isAutoCancel; this.isAutoCancel = isAutoCancel;
this.timeZone = timeZone;
} }
@Override @Override
...@@ -69,10 +80,14 @@ public class ResultRunnable implements Runnable { ...@@ -69,10 +80,14 @@ public class ResultRunnable implements Runnable {
Object field = row.getField(i); Object field = row.getField(i);
if (field == null) { if (field == null) {
map.put(columns.get(i + 1), nullColumn); map.put(columns.get(i + 1), nullColumn);
} else {
if (field instanceof Instant) {
map.put(columns.get(i + 1), ((Instant) field).atZone(ZoneId.of(timeZone)).toLocalDateTime().toString());
} else { } else {
map.put(columns.get(i + 1), field); map.put(columns.get(i + 1), field);
} }
} }
}
rows.add(map); rows.add(map);
} }
} }
...@@ -93,10 +108,14 @@ public class ResultRunnable implements Runnable { ...@@ -93,10 +108,14 @@ public class ResultRunnable implements Runnable {
Object field = row.getField(i); Object field = row.getField(i);
if (field == null) { if (field == null) {
map.put(columns.get(i), nullColumn); map.put(columns.get(i), nullColumn);
} else {
if (field instanceof Instant) {
map.put(columns.get(i), ((Instant) field).atZone(ZoneId.of(timeZone)).toLocalDateTime().toString());
} else { } else {
map.put(columns.get(i), field); map.put(columns.get(i), field);
} }
} }
}
if (RowKind.UPDATE_BEFORE == row.getKind() || RowKind.DELETE == row.getKind()) { if (RowKind.UPDATE_BEFORE == row.getKind() || RowKind.DELETE == row.getKind()) {
rows.remove(map); rows.remove(map);
} else { } else {
......
...@@ -13,18 +13,20 @@ public class SelectResultBuilder implements ResultBuilder { ...@@ -13,18 +13,20 @@ public class SelectResultBuilder implements ResultBuilder {
private Integer maxRowNum; private Integer maxRowNum;
private boolean isChangeLog; private boolean isChangeLog;
private boolean isAutoCancel; private boolean isAutoCancel;
private String timeZone;
public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) { public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
this.maxRowNum = maxRowNum; this.maxRowNum = maxRowNum;
this.isChangeLog = isChangeLog; this.isChangeLog = isChangeLog;
this.isAutoCancel = isAutoCancel; this.isAutoCancel = isAutoCancel;
this.timeZone = timeZone;
} }
@Override @Override
public IResult getResult(TableResult tableResult) { public IResult getResult(TableResult tableResult) {
if (tableResult.getJobClient().isPresent()) { if (tableResult.getJobClient().isPresent()) {
String jobId = tableResult.getJobClient().get().getJobID().toHexString(); String jobId = tableResult.getJobClient().get().getJobID().toHexString();
ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, isChangeLog, isAutoCancel); ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, isChangeLog, isAutoCancel, timeZone);
Thread thread = new Thread(runnable, jobId); Thread thread = new Thread(runnable, jobId);
thread.start(); thread.start();
return SelectResult.buildSuccess(jobId); return SelectResult.buildSuccess(jobId);
......
package com.dlink.executor; package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.interceptor.FlinkInterceptorResult;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
...@@ -20,6 +13,7 @@ import org.apache.flink.streaming.api.graph.JSONGenerator; ...@@ -20,6 +13,7 @@ import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
...@@ -27,6 +21,14 @@ import java.util.HashMap; ...@@ -27,6 +21,14 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.interceptor.FlinkInterceptorResult;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/** /**
* Executor * Executor
* *
...@@ -117,6 +119,14 @@ public abstract class Executor { ...@@ -117,6 +119,14 @@ public abstract class Executor {
this.setConfig = setConfig; this.setConfig = setConfig;
} }
public TableConfig getTableConfig() {
return stEnvironment.getConfig();
}
public String getTimeZone() {
return getTableConfig().getLocalTimeZone().getId();
}
protected void init() { protected void init() {
initEnvironment(); initEnvironment();
initStreamExecutionEnvironment(); initStreamExecutionEnvironment();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment