Unverified Commit 45136385 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-140][core] Fix 'table.local-time-zone' parameter is invalid

[Fix-140][core] Fix 'table.local-time-zone' parameter is invalid
parents b621d2af dc0c45fb
package com.dlink.job;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant;
......@@ -21,7 +44,13 @@ import com.dlink.model.SystemConfiguration;
import com.dlink.parser.SqlType;
import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool;
import com.dlink.result.*;
import com.dlink.result.ErrorResult;
import com.dlink.result.ExplainResult;
import com.dlink.result.IResult;
import com.dlink.result.InsertResult;
import com.dlink.result.ResultBuilder;
import com.dlink.result.ResultPool;
import com.dlink.result.SelectResult;
import com.dlink.session.ExecutorEntity;
import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo;
......@@ -31,28 +60,6 @@ import com.dlink.utils.LogUtil;
import com.dlink.utils.SqlUtil;
import com.dlink.utils.UDFUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* JobManager
......@@ -156,7 +163,7 @@ public class JobManager {
public static boolean useGateway(String type) {
return (GatewayType.YARN_PER_JOB.equalsValue(type) || GatewayType.YARN_APPLICATION.equalsValue(type)
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(type));
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(type));
}
private Executor createExecutor() {
......@@ -278,7 +285,8 @@ public class JobManager {
}
if (config.isUseResult()) {
// Build insert result.
IResult result = ResultBuilder.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel()).getResult(tableResult);
IResult result =
ResultBuilder.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult);
job.setResult(result);
}
}
......@@ -301,7 +309,8 @@ public class JobManager {
FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getValue());
if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) {
if (config.isUseResult()) {
IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel()).getResult(flinkInterceptorResult.getTableResult());
IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone())
.getResult(flinkInterceptorResult.getTableResult());
job.setResult(result);
}
} else {
......@@ -314,7 +323,8 @@ public class JobManager {
}});
}
if (config.isUseResult()) {
IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel()).getResult(tableResult);
IResult result =
ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult);
job.setResult(result);
}
}
......@@ -364,7 +374,7 @@ public class JobManager {
}});
}
if (config.isUseResult()) {
IResult result = ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel()).getResult(null);
IResult result = ResultBuilder.build(SqlType.EXECUTE, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(null);
job.setResult(result);
}
}
......@@ -427,7 +437,7 @@ public class JobManager {
}
LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(newStatement);
IResult result = ResultBuilder.build(operationType, config.getMaxRowNum(), false, false).getResult(tableResult);
IResult result = ResultBuilder.build(operationType, config.getMaxRowNum(), false, false, executor.getTimeZone()).getResult(tableResult);
result.setStartTime(startTime);
return result;
}
......@@ -475,7 +485,7 @@ public class JobManager {
public boolean cancel(String jobId) {
if (useGateway && !useRestAPI) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(),
null, null));
null, null));
Gateway.build(config.getGatewayConfig()).savepointJob();
return true;
} else {
......@@ -491,7 +501,7 @@ public class JobManager {
public SavePointResult savepoint(String jobId, String savePointType, String savePoint) {
if (useGateway && !useRestAPI) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(),
savePointType, null));
savePointType, null));
return Gateway.build(config.getGatewayConfig()).savepointJob(savePoint);
} else {
return FlinkAPI.build(config.getAddress()).savepoints(jobId, savePointType);
......
package com.dlink.result;
import com.dlink.parser.SqlType;
import org.apache.flink.table.api.TableResult;
import com.dlink.parser.SqlType;
/**
* ResultBuilder
*
......@@ -11,10 +12,10 @@ import org.apache.flink.table.api.TableResult;
**/
public interface ResultBuilder {
static ResultBuilder build(SqlType operationType, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) {
static ResultBuilder build(SqlType operationType, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
switch (operationType) {
case SELECT:
return new SelectResultBuilder(maxRowNum, isChangeLog, isAutoCancel);
return new SelectResultBuilder(maxRowNum, isChangeLog, isAutoCancel, timeZone);
case SHOW:
case DESC:
case DESCRIBE:
......
package com.dlink.result;
import com.dlink.constant.FlinkConstant;
import com.dlink.utils.FlinkUtil;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import java.util.*;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.dlink.constant.FlinkConstant;
import com.dlink.utils.FlinkUtil;
/**
* ResultRunnable
......@@ -20,13 +29,15 @@ public class ResultRunnable implements Runnable {
private Integer maxRowNum;
private boolean isChangeLog;
private boolean isAutoCancel;
private String timeZone = ZoneId.systemDefault().getId();
private String nullColumn = "";
public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) {
public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
this.tableResult = tableResult;
this.maxRowNum = maxRowNum;
this.isChangeLog = isChangeLog;
this.isAutoCancel = isAutoCancel;
this.timeZone = timeZone;
}
@Override
......@@ -70,7 +81,11 @@ public class ResultRunnable implements Runnable {
if (field == null) {
map.put(columns.get(i + 1), nullColumn);
} else {
map.put(columns.get(i + 1), field);
if (field instanceof Instant) {
map.put(columns.get(i + 1), ((Instant) field).atZone(ZoneId.of(timeZone)).toLocalDateTime().toString());
} else {
map.put(columns.get(i + 1), field);
}
}
}
rows.add(map);
......@@ -94,7 +109,11 @@ public class ResultRunnable implements Runnable {
if (field == null) {
map.put(columns.get(i), nullColumn);
} else {
map.put(columns.get(i), field);
if (field instanceof Instant) {
map.put(columns.get(i), ((Instant) field).atZone(ZoneId.of(timeZone)).toLocalDateTime().toString());
} else {
map.put(columns.get(i), field);
}
}
}
if (RowKind.UPDATE_BEFORE == row.getKind() || RowKind.DELETE == row.getKind()) {
......
......@@ -13,18 +13,20 @@ public class SelectResultBuilder implements ResultBuilder {
private Integer maxRowNum;
private boolean isChangeLog;
private boolean isAutoCancel;
private String timeZone;
public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) {
public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
this.maxRowNum = maxRowNum;
this.isChangeLog = isChangeLog;
this.isAutoCancel = isAutoCancel;
this.timeZone = timeZone;
}
@Override
public IResult getResult(TableResult tableResult) {
if (tableResult.getJobClient().isPresent()) {
String jobId = tableResult.getJobClient().get().getJobID().toHexString();
ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, isChangeLog, isAutoCancel);
ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, isChangeLog, isAutoCancel, timeZone);
Thread thread = new Thread(runnable, jobId);
thread.start();
return SelectResult.buildSuccess(jobId);
......
package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.interceptor.FlinkInterceptorResult;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
......@@ -20,6 +13,7 @@ import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
......@@ -27,6 +21,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.interceptor.FlinkInterceptorResult;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Executor
*
......@@ -117,6 +119,14 @@ public abstract class Executor {
this.setConfig = setConfig;
}
public TableConfig getTableConfig() {
return stEnvironment.getConfig();
}
public String getTimeZone() {
return getTableConfig().getLocalTimeZone().getId();
}
protected void init() {
initEnvironment();
initStreamExecutionEnvironment();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment