Unverified Commit 35810a97 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix] [admin] Fix job monitoring bugs caused by multi tenancy (#1158)

Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent 8f5e5b8f
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package com.dlink.configure; package com.dlink.configure;
import com.dlink.context.RequestContext; import com.dlink.context.TenantContextHolder;
import java.util.List; import java.util.List;
...@@ -46,34 +46,21 @@ import net.sf.jsqlparser.expression.NullValue; ...@@ -46,34 +46,21 @@ import net.sf.jsqlparser.expression.NullValue;
public class MybatisPlusConfig { public class MybatisPlusConfig {
private static final List<String> IGNORE_TABLE_NAMES = Lists.newArrayList( private static final List<String> IGNORE_TABLE_NAMES = Lists.newArrayList(
"dlink_namespace" "dlink_namespace", "dlink_alert_group", "dlink_alert_history", "dlink_alert_instance", "dlink_catalogue",
, "dlink_alert_group" "dlink_cluster", "dlink_cluster_configuration", "dlink_database"
, "dlink_alert_history" // ,"dlink_fragment"
, "dlink_alert_instance" , "dlink_history", "dlink_jar", "dlink_job_history", "dlink_job_instance", "dlink_role", "dlink_savepoints",
, "dlink_catalogue" "dlink_task", "dlink_task_statement", "dlink_task_version");
, "dlink_cluster"
, "dlink_cluster_configuration"
, "dlink_database"
//,"dlink_fragment"
, "dlink_history"
, "dlink_jar"
, "dlink_job_history"
, "dlink_job_instance"
,"dlink_role"
, "dlink_savepoints"
, "dlink_task"
, "dlink_task_statement"
, "dlink_task_version"
);
@Bean @Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() { public MybatisPlusInterceptor mybatisPlusInterceptor() {
log.info("mybatis plus interceptor execute"); log.info("mybatis plus interceptor execute");
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new TenantLineInnerInterceptor(new TenantLineHandler() { interceptor.addInnerInterceptor(new TenantLineInnerInterceptor(new TenantLineHandler() {
@Override @Override
public Expression getTenantId() { public Expression getTenantId() {
Integer tenantId = (Integer) RequestContext.get(); Integer tenantId = (Integer) TenantContextHolder.get();
if (tenantId == null) { if (tenantId == null) {
log.warn("request context tenant id is null"); log.warn("request context tenant id is null");
return new NullValue(); return new NullValue();
...@@ -90,4 +77,4 @@ public class MybatisPlusConfig { ...@@ -90,4 +77,4 @@ public class MybatisPlusConfig {
return interceptor; return interceptor;
} }
} }
\ No newline at end of file
...@@ -20,20 +20,21 @@ ...@@ -20,20 +20,21 @@
package com.dlink.context; package com.dlink.context;
/** /**
* request context * TenantContextHolder
*/ */
public class RequestContext { public class TenantContextHolder {
private static final ThreadLocal<Object> threadLocal = new ThreadLocal<>();
private static final ThreadLocal<Object> TENANT_CONTEXT = new ThreadLocal<>();
public static void set(Object value) { public static void set(Object value) {
threadLocal.set(value); TENANT_CONTEXT.set(value);
} }
public static Object get() { public static Object get() {
return threadLocal.get(); return TENANT_CONTEXT.get();
} }
public static void remove() { public static void clear() {
threadLocal.remove(); TENANT_CONTEXT.remove();
} }
} }
\ No newline at end of file
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
package com.dlink.init; package com.dlink.init;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.context.RequestContext;
import com.dlink.daemon.task.DaemonFactory; import com.dlink.daemon.task.DaemonFactory;
import com.dlink.daemon.task.DaemonTaskConfig; import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.job.FlinkJobTask; import com.dlink.job.FlinkJobTask;
...@@ -76,7 +75,6 @@ public class SystemInit implements ApplicationRunner { ...@@ -76,7 +75,6 @@ public class SystemInit implements ApplicationRunner {
List<Tenant> tenants = tenantService.list(); List<Tenant> tenants = tenantService.list();
sysConfigService.initSysConfig(); sysConfigService.initSysConfig();
for (Tenant tenant : tenants) { for (Tenant tenant : tenants) {
RequestContext.set(tenant.getId());
taskService.initDefaultFlinkSQLEnv(tenant.getId()); taskService.initDefaultFlinkSQLEnv(tenant.getId());
} }
initTaskMonitor(); initTaskMonitor();
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package com.dlink.interceptor; package com.dlink.interceptor;
import com.dlink.context.RequestContext; import com.dlink.context.TenantContextHolder;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
...@@ -41,7 +41,7 @@ public class TenantInterceptor implements HandlerInterceptor { ...@@ -41,7 +41,7 @@ public class TenantInterceptor implements HandlerInterceptor {
Object handler) throws Exception { Object handler) throws Exception {
String tenantId = request.getHeader("tenantId"); String tenantId = request.getHeader("tenantId");
if (!StringUtils.isNullOrEmpty(tenantId)) { if (!StringUtils.isNullOrEmpty(tenantId)) {
RequestContext.set(Integer.valueOf(tenantId)); TenantContextHolder.set(Integer.valueOf(tenantId));
} }
return HandlerInterceptor.super.preHandle(request, response, handler); return HandlerInterceptor.super.preHandle(request, response, handler);
} }
......
...@@ -74,7 +74,7 @@ public class FlinkJobTask implements DaemonTask { ...@@ -74,7 +74,7 @@ public class FlinkJobTask implements DaemonTask {
preDealTime = System.currentTimeMillis(); preDealTime = System.currentTimeMillis();
JobInstance jobInstance = taskService.refreshJobInstance(config.getId(), false); JobInstance jobInstance = taskService.refreshJobInstance(config.getId(), false);
if ((!JobStatus.isDone(jobInstance.getStatus())) || (Asserts.isNotNull(jobInstance.getFinishTime()) if ((!JobStatus.isDone(jobInstance.getStatus())) || (Asserts.isNotNull(jobInstance.getFinishTime())
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) { && Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
DefaultThreadPool.getInstance().execute(this); DefaultThreadPool.getInstance().execute(this);
} else { } else {
taskService.handleJobDone(jobInstance); taskService.handleJobDone(jobInstance);
......
...@@ -24,6 +24,8 @@ import com.dlink.model.JobHistory; ...@@ -24,6 +24,8 @@ import com.dlink.model.JobHistory;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import com.baomidou.mybatisplus.annotation.InterceptorIgnore;
/** /**
* JobHistoryMapper * JobHistoryMapper
* *
...@@ -33,6 +35,9 @@ import org.apache.ibatis.annotations.Mapper; ...@@ -33,6 +35,9 @@ import org.apache.ibatis.annotations.Mapper;
@Mapper @Mapper
public interface JobHistoryMapper extends SuperMapper<JobHistory> { public interface JobHistoryMapper extends SuperMapper<JobHistory> {
@InterceptorIgnore(tenantLine = "true")
JobHistory getByIdWithoutTenant(Integer id);
int insert(JobHistory jobHistory); int insert(JobHistory jobHistory);
} }
...@@ -27,6 +27,8 @@ import org.apache.ibatis.annotations.Mapper; ...@@ -27,6 +27,8 @@ import org.apache.ibatis.annotations.Mapper;
import java.util.List; import java.util.List;
import com.baomidou.mybatisplus.annotation.InterceptorIgnore;
/** /**
* JobInstanceMapper * JobInstanceMapper
* *
...@@ -36,6 +38,9 @@ import java.util.List; ...@@ -36,6 +38,9 @@ import java.util.List;
@Mapper @Mapper
public interface JobInstanceMapper extends SuperMapper<JobInstance> { public interface JobInstanceMapper extends SuperMapper<JobInstance> {
@InterceptorIgnore(tenantLine = "true")
JobInstance getByIdWithoutTenant(Integer id);
List<JobInstanceCount> countStatus(); List<JobInstanceCount> countStatus();
List<JobInstanceCount> countHistoryStatus(); List<JobInstanceCount> countHistoryStatus();
......
...@@ -27,6 +27,8 @@ import org.apache.ibatis.annotations.Param; ...@@ -27,6 +27,8 @@ import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
import com.baomidou.mybatisplus.annotation.InterceptorIgnore;
/** /**
* 作业 Mapper 接口 * 作业 Mapper 接口
* *
...@@ -38,9 +40,11 @@ public interface TaskMapper extends SuperMapper<Task> { ...@@ -38,9 +40,11 @@ public interface TaskMapper extends SuperMapper<Task> {
Integer queryAllSizeByName(String name); Integer queryAllSizeByName(String name);
List<Task> queryOnLineTaskByDoneStatus(@Param("parentIds") List<Integer> parentIds List<Task> queryOnLineTaskByDoneStatus(@Param("parentIds") List<Integer> parentIds,
, @Param("stepIds") List<Integer> stepIds, @Param("includeNull") boolean includeNull @Param("stepIds") List<Integer> stepIds,
, @Param("jobStatuses") List<String> jobStatuses); @Param("includeNull") boolean includeNull,
@Param("jobStatuses") List<String> jobStatuses);
@InterceptorIgnore(tenantLine = "true")
Task getTaskByNameAndTenantId(@Param("name") String name, @Param("tenantId") Integer tenantId); Task getTaskByNameAndTenantId(@Param("name") String name, @Param("tenantId") Integer tenantId);
} }
...@@ -30,9 +30,12 @@ import com.dlink.model.JobHistory; ...@@ -30,9 +30,12 @@ import com.dlink.model.JobHistory;
**/ **/
public interface JobHistoryService extends ISuperService<JobHistory> { public interface JobHistoryService extends ISuperService<JobHistory> {
JobHistory getByIdWithoutTenant(Integer id);
JobHistory getJobHistory(Integer id); JobHistory getJobHistory(Integer id);
JobHistory getJobHistoryInfo(JobHistory jobHistory); JobHistory getJobHistoryInfo(JobHistory jobHistory);
JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave); JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave);
} }
...@@ -38,6 +38,8 @@ import com.fasterxml.jackson.databind.JsonNode; ...@@ -38,6 +38,8 @@ import com.fasterxml.jackson.databind.JsonNode;
*/ */
public interface JobInstanceService extends ISuperService<JobInstance> { public interface JobInstanceService extends ISuperService<JobInstance> {
JobInstance getByIdWithoutTenant(Integer id);
JobInstanceStatus getStatusCount(boolean isHistory); JobInstanceStatus getStatusCount(boolean isHistory);
List<JobInstance> listJobInstanceActive(); List<JobInstance> listJobInstanceActive();
......
...@@ -43,6 +43,11 @@ import com.fasterxml.jackson.databind.JsonNode; ...@@ -43,6 +43,11 @@ import com.fasterxml.jackson.databind.JsonNode;
@Service @Service
public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, JobHistory> implements JobHistoryService { public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, JobHistory> implements JobHistoryService {
@Override
public JobHistory getByIdWithoutTenant(Integer id) {
return baseMapper.getByIdWithoutTenant(id);
}
@Override @Override
public JobHistory getJobHistory(Integer id) { public JobHistory getJobHistory(Integer id) {
return getJobHistoryInfo(getById(id)); return getJobHistoryInfo(getById(id));
...@@ -111,11 +116,10 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo ...@@ -111,11 +116,10 @@ public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, Jo
jobHistory.setCheckpointsConfigJson(JSONUtil.toJsonString(checkPointsConfig)); jobHistory.setCheckpointsConfigJson(JSONUtil.toJsonString(checkPointsConfig));
jobHistory.setConfigJson(JSONUtil.toJsonString(jobsConfig)); jobHistory.setConfigJson(JSONUtil.toJsonString(jobsConfig));
if (needSave) { if (needSave) {
if (Asserts.isNotNull(getById(id))) { updateById(jobHistory);
updateById(jobHistory); /*
} else { * if (Asserts.isNotNull(getById(id))) { updateById(jobHistory); } else { save(jobHistory); }
save(jobHistory); */
}
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
......
...@@ -59,7 +59,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; ...@@ -59,7 +59,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
* @since 2022/2/2 13:52 * @since 2022/2/2 13:52
*/ */
@Service @Service
public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, JobInstance> implements JobInstanceService { public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, JobInstance>
implements
JobInstanceService {
@Autowired @Autowired
private HistoryService historyService; private HistoryService historyService;
...@@ -70,6 +72,11 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, ...@@ -70,6 +72,11 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
@Autowired @Autowired
private JobHistoryService jobHistoryService; private JobHistoryService jobHistoryService;
@Override
public JobInstance getByIdWithoutTenant(Integer id) {
return baseMapper.getByIdWithoutTenant(id);
}
@Override @Override
public JobInstanceStatus getStatusCount(boolean isHistory) { public JobInstanceStatus getStatusCount(boolean isHistory) {
List<JobInstanceCount> jobInstanceCounts = null; List<JobInstanceCount> jobInstanceCounts = null;
...@@ -153,7 +160,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, ...@@ -153,7 +160,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
history.setConfig(JSONUtil.parseObject(history.getConfigJson())); history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
jobInfoDetail.setHistory(history); jobInfoDetail.setHistory(history);
if (Asserts.isNotNull(history.getClusterConfigurationId())) { if (Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); jobInfoDetail.setClusterConfiguration(
clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
} }
return jobInfoDetail; return jobInfoDetail;
} }
...@@ -174,7 +182,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, ...@@ -174,7 +182,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
history.setConfig(JSONUtil.parseObject(history.getConfigJson())); history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
jobInfoDetail.setHistory(history); jobInfoDetail.setHistory(history);
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) { if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); jobInfoDetail.setClusterConfiguration(
clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
} }
if (pool.exist(key)) { if (pool.exist(key)) {
pool.refresh(jobInfoDetail); pool.refresh(jobInfoDetail);
...@@ -219,7 +228,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, ...@@ -219,7 +228,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
list.get(i).setDuration(pool.get(list.get(i).getId().toString()).getInstance().getDuration()); list.get(i).setDuration(pool.get(list.get(i).getId().toString()).getInstance().getDuration());
} }
} }
return ProTableResult.<JobInstance>builder().success(true).data(list).total(page.getTotal()).current(current).pageSize(pageSize).build(); return ProTableResult.<JobInstance>builder().success(true).data(list).total(page.getTotal()).current(current)
.pageSize(pageSize).build();
} }
} }
...@@ -166,7 +166,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -166,7 +166,7 @@ public class StudioServiceImpl implements StudioService {
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) { public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
if (Dialect.notFlinkSql(studioExecuteDTO.getDialect())) { if (Dialect.notFlinkSql(studioExecuteDTO.getDialect())) {
return executeCommonSql(SqlDTO.build(studioExecuteDTO.getStatement(), return executeCommonSql(SqlDTO.build(studioExecuteDTO.getStatement(),
studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum())); studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum()));
} else { } else {
return executeFlinkSql(studioExecuteDTO); return executeFlinkSql(studioExecuteDTO);
} }
...@@ -177,7 +177,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -177,7 +177,7 @@ public class StudioServiceImpl implements StudioService {
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
buildSession(config); buildSession(config);
// To initialize java udf, but it only support local mode. // To initialize java udf, but it only support local mode.
UDFPath udfPath = udfService.initUDF(studioExecuteDTO.getStatement(), config.getGatewayConfig() == null ? null : config.getGatewayConfig().getType()); UDFPath udfPath = udfService.initUDF(studioExecuteDTO.getStatement(), GatewayType.get(config.getType()));
config.setJarFiles(udfPath.getJarPaths()); config.setJarFiles(udfPath.getJarPaths());
config.setPyFiles(udfPath.getPyPaths()); config.setPyFiles(udfPath.getPyPaths());
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
...@@ -233,7 +233,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -233,7 +233,7 @@ public class StudioServiceImpl implements StudioService {
JobConfig config = studioDDLDTO.getJobConfig(); JobConfig config = studioDDLDTO.getJobConfig();
if (!config.isUseSession()) { if (!config.isUseSession()) {
config.setAddress( config.setAddress(
clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId())); clusterService.buildEnvironmentAddress(config.isUseRemote(), studioDDLDTO.getClusterId()));
} }
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
return jobManager.executeDDL(studioDDLDTO.getStatement()); return jobManager.executeDDL(studioDDLDTO.getStatement());
...@@ -252,7 +252,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -252,7 +252,7 @@ public class StudioServiceImpl implements StudioService {
Map<String, ProcessEntity> map = ProcessPool.getInstance().getMap(); Map<String, ProcessEntity> map = ProcessPool.getInstance().getMap();
Map<String, StringBuilder> map2 = ConsolePool.getInstance().getMap(); Map<String, StringBuilder> map2 = ConsolePool.getInstance().getMap();
ProcessEntity process = ProcessContextHolder.registerProcess( ProcessEntity process = ProcessContextHolder.registerProcess(
ProcessEntity.init(ProcessType.FLINKEXPLAIN, SaManager.getStpLogic(null).getLoginIdAsInt(), "admin")); ProcessEntity.init(ProcessType.FLINKEXPLAIN, SaManager.getStpLogic(null).getLoginIdAsInt(), "admin"));
addFlinkSQLEnv(studioExecuteDTO); addFlinkSQLEnv(studioExecuteDTO);
...@@ -269,7 +269,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -269,7 +269,7 @@ public class StudioServiceImpl implements StudioService {
process.start(); process.start();
JobManager jobManager = JobManager.buildPlanMode(config); JobManager jobManager = JobManager.buildPlanMode(config);
List<SqlExplainResult> sqlExplainResults = List<SqlExplainResult> sqlExplainResults =
jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults(); jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
process.finish(); process.finish();
return sqlExplainResults; return sqlExplainResults;
} }
...@@ -343,15 +343,15 @@ public class StudioServiceImpl implements StudioService { ...@@ -343,15 +343,15 @@ public class StudioServiceImpl implements StudioService {
if (sessionDTO.isUseRemote()) { if (sessionDTO.isUseRemote()) {
Cluster cluster = clusterService.getById(sessionDTO.getClusterId()); Cluster cluster = clusterService.getById(sessionDTO.getClusterId());
SessionConfig sessionConfig = SessionConfig.build( SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), true, sessionDTO.getType(), true,
cluster.getId(), cluster.getAlias(), cluster.getId(), cluster.getAlias(),
clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId())); clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId()));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser); return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
} else { } else {
SessionConfig sessionConfig = SessionConfig.build( SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), false, sessionDTO.getType(), false,
null, null, null, null,
clusterService.buildEnvironmentAddress(false, null)); clusterService.buildEnvironmentAddress(false, null));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser); return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
} }
} }
...@@ -369,7 +369,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -369,7 +369,7 @@ public class StudioServiceImpl implements StudioService {
@Override @Override
public LineageResult getLineage(StudioCADTO studioCADTO) { public LineageResult getLineage(StudioCADTO studioCADTO) {
if (Asserts.isNotNullString(studioCADTO.getDialect()) if (Asserts.isNotNullString(studioCADTO.getDialect())
&& !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) { && !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) {
if (Asserts.isNull(studioCADTO.getDatabaseId())) { if (Asserts.isNull(studioCADTO.getDatabaseId())) {
return null; return null;
} }
...@@ -379,10 +379,10 @@ public class StudioServiceImpl implements StudioService { ...@@ -379,10 +379,10 @@ public class StudioServiceImpl implements StudioService {
} }
if (studioCADTO.getDialect().equalsIgnoreCase("doris")) { if (studioCADTO.getDialect().equalsIgnoreCase("doris")) {
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql", return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql",
dataBase.getDriverConfig()); dataBase.getDriverConfig());
} else { } else {
return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(),
studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig()); studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
} }
} else { } else {
addFlinkSQLEnv(studioCADTO); addFlinkSQLEnv(studioCADTO);
...@@ -414,7 +414,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -414,7 +414,7 @@ public class StudioServiceImpl implements StudioService {
jobConfig.setAddress(cluster.getJobManagerHost()); jobConfig.setAddress(cluster.getJobManagerHost());
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) { if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId()); clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig); jobConfig.buildGatewayConfig(gatewayConfig);
} }
JobManager jobManager = JobManager.build(jobConfig); JobManager jobManager = JobManager.build(jobConfig);
...@@ -433,7 +433,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -433,7 +433,7 @@ public class StudioServiceImpl implements StudioService {
// 如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的 // 如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) { if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId()); clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig); jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName()); jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
jobConfig.setTaskId(cluster.getTaskId()); jobConfig.setTaskId(cluster.getTaskId());
...@@ -492,7 +492,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -492,7 +492,7 @@ public class StudioServiceImpl implements StudioService {
} }
for (Catalog catalog : catalogs) { for (Catalog catalog : catalogs) {
String statement = FlinkQuery.useCatalog(catalog.getName()) + FlinkQuery.separator() String statement = FlinkQuery.useCatalog(catalog.getName()) + FlinkQuery.separator()
+ FlinkQuery.showDatabases(); + FlinkQuery.showDatabases();
studioMetaStoreDTO.setStatement(statement); studioMetaStoreDTO.setStatement(statement);
IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO); IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) { if (result instanceof DDLResult) {
...@@ -526,7 +526,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -526,7 +526,7 @@ public class StudioServiceImpl implements StudioService {
} }
} else { } else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator() String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator(); + FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
// show tables // show tables
String tableStatement = baseStatement + FlinkQuery.showTables(); String tableStatement = baseStatement + FlinkQuery.showTables();
studioMetaStoreDTO.setStatement(tableStatement); studioMetaStoreDTO.setStatement(tableStatement);
...@@ -564,7 +564,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -564,7 +564,7 @@ public class StudioServiceImpl implements StudioService {
// nothing to do // nothing to do
} else { } else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator() String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator(); + FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
// desc tables // desc tables
String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable()); String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable());
studioMetaStoreDTO.setStatement(tableStatement); studioMetaStoreDTO.setStatement(tableStatement);
...@@ -575,12 +575,12 @@ public class StudioServiceImpl implements StudioService { ...@@ -575,12 +575,12 @@ public class StudioServiceImpl implements StudioService {
int i = 1; int i = 1;
for (Map<String, Object> item : rowData) { for (Map<String, Object> item : rowData) {
FlinkColumn column = FlinkColumn.build(i, FlinkColumn column = FlinkColumn.build(i,
item.get(FlinkQuery.columnName()).toString(), item.get(FlinkQuery.columnName()).toString(),
item.get(FlinkQuery.columnType()).toString(), item.get(FlinkQuery.columnType()).toString(),
item.get(FlinkQuery.columnKey()).toString(), item.get(FlinkQuery.columnKey()).toString(),
item.get(FlinkQuery.columnNull()).toString(), item.get(FlinkQuery.columnNull()).toString(),
item.get(FlinkQuery.columnExtras()).toString(), item.get(FlinkQuery.columnExtras()).toString(),
item.get(FlinkQuery.columnWatermark()).toString()); item.get(FlinkQuery.columnWatermark()).toString());
columns.add(column); columns.add(column);
i++; i++;
} }
......
...@@ -21,7 +21,7 @@ package com.dlink.service.impl; ...@@ -21,7 +21,7 @@ package com.dlink.service.impl;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.context.RequestContext; import com.dlink.context.TenantContextHolder;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.TenantMapper; import com.dlink.mapper.TenantMapper;
import com.dlink.model.Namespace; import com.dlink.model.Namespace;
...@@ -66,7 +66,7 @@ public class TenantServiceImpl extends SuperServiceImpl<TenantMapper, Tenant> im ...@@ -66,7 +66,7 @@ public class TenantServiceImpl extends SuperServiceImpl<TenantMapper, Tenant> im
} }
tenant.setIsDelete(false); tenant.setIsDelete(false);
if (save(tenant)) { if (save(tenant)) {
RequestContext.set(tenant.getId()); TenantContextHolder.set(tenant.getId());
return Result.succeed("新增成功"); return Result.succeed("新增成功");
} }
return Result.failed("新增失败"); return Result.failed("新增失败");
...@@ -101,12 +101,14 @@ public class TenantServiceImpl extends SuperServiceImpl<TenantMapper, Tenant> im ...@@ -101,12 +101,14 @@ public class TenantServiceImpl extends SuperServiceImpl<TenantMapper, Tenant> im
return Result.failed("租户不存在"); return Result.failed("租户不存在");
} }
Long tenantRoleCount = roleService.getBaseMapper().selectCount(new QueryWrapper<Role>().eq("tenant_id", id)); Long tenantRoleCount =
roleService.getBaseMapper().selectCount(new QueryWrapper<Role>().eq("tenant_id", id));
if (tenantRoleCount > 0) { if (tenantRoleCount > 0) {
return Result.failed("删除租户失败,该租户已绑定角色"); return Result.failed("删除租户失败,该租户已绑定角色");
} }
Long tenantNamespaceCount = namespaceService.getBaseMapper().selectCount(new QueryWrapper<Namespace>().eq("tenant_id", id)); Long tenantNamespaceCount =
namespaceService.getBaseMapper().selectCount(new QueryWrapper<Namespace>().eq("tenant_id", id));
if (tenantNamespaceCount > 0) { if (tenantNamespaceCount > 0) {
return Result.failed("删除租户失败,该租户已绑定名称空间"); return Result.failed("删除租户失败,该租户已绑定名称空间");
} }
...@@ -162,12 +164,12 @@ public class TenantServiceImpl extends SuperServiceImpl<TenantMapper, Tenant> im ...@@ -162,12 +164,12 @@ public class TenantServiceImpl extends SuperServiceImpl<TenantMapper, Tenant> im
public Result switchTenant(JsonNode para) { public Result switchTenant(JsonNode para) {
if (para.size() > 0) { if (para.size() > 0) {
Integer tenantId = para.get("tenantId").asInt(); Integer tenantId = para.get("tenantId").asInt();
RequestContext.remove(); TenantContextHolder.clear();
RequestContext.set(tenantId); TenantContextHolder.set(tenantId);
return Result.succeed("切换租户成功"); return Result.succeed("切换租户成功");
} else { } else {
return Result.failed("无法切换租户,获取不到租户信息"); return Result.failed("无法切换租户,获取不到租户信息");
} }
} }
} }
\ No newline at end of file
...@@ -21,7 +21,7 @@ package com.dlink.service.impl; ...@@ -21,7 +21,7 @@ package com.dlink.service.impl;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.context.RequestContext; import com.dlink.context.TenantContextHolder;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.dto.LoginUTO; import com.dlink.dto.LoginUTO;
import com.dlink.dto.UserDTO; import com.dlink.dto.UserDTO;
...@@ -146,7 +146,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen ...@@ -146,7 +146,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
} }
// 将前端入参 租户id 放入上下文 // 将前端入参 租户id 放入上下文
RequestContext.set(loginUTO.getTenantId()); TenantContextHolder.set(loginUTO.getTenantId());
// get user tenants and roles // get user tenants and roles
UserDTO userDTO = getUserALLBaseInfo(loginUTO, user); UserDTO userDTO = getUserALLBaseInfo(loginUTO, user);
...@@ -177,7 +177,8 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen ...@@ -177,7 +177,8 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
}); });
userTenants.forEach(userTenant -> { userTenants.forEach(userTenant -> {
Tenant tenant = tenantService.getBaseMapper().selectOne(new QueryWrapper<Tenant>().eq("id", userTenant.getTenantId())); Tenant tenant = tenantService.getBaseMapper()
.selectOne(new QueryWrapper<Tenant>().eq("id", userTenant.getTenantId()));
if (Asserts.isNotNull(tenant)) { if (Asserts.isNotNull(tenant)) {
tenantList.add(tenant); tenantList.add(tenant);
} }
......
...@@ -2,6 +2,12 @@ ...@@ -2,6 +2,12 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.JobHistoryMapper"> <mapper namespace="com.dlink.mapper.JobHistoryMapper">
<select id="getByIdWithoutTenant" resultType="com.dlink.model.JobHistory">
select *
from dlink_job_history
where id = #{id}
limit 1
</select>
<insert id="insert"> <insert id="insert">
insert into dlink_job_history (id,job_json,exceptions_json,checkpoints_json,checkpoints_config_json,config_json, insert into dlink_job_history (id,job_json,exceptions_json,checkpoints_json,checkpoints_config_json,config_json,
......
...@@ -2,6 +2,13 @@ ...@@ -2,6 +2,13 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.JobInstanceMapper"> <mapper namespace="com.dlink.mapper.JobInstanceMapper">
<select id="getByIdWithoutTenant" resultType="com.dlink.model.JobInstance">
select *
from dlink_job_instance
where id = #{id}
limit 1
</select>
<select id="selectForProTable" resultType="com.dlink.model.JobInstance"> <select id="selectForProTable" resultType="com.dlink.model.JobInstance">
select select
a.*, a.*,
......
...@@ -59,8 +59,8 @@ ...@@ -59,8 +59,8 @@
select * select *
from dlink_task from dlink_task
where 1 = 1 where 1 = 1
and name = "${name}" and name = #{name}
and tenant_id = ${tenantId} and tenant_id = #{tenantId}
</select> </select>
<select id="queryOnLineTaskByDoneStatus" resultType="com.dlink.model.Task"> <select id="queryOnLineTaskByDoneStatus" resultType="com.dlink.model.Task">
......
...@@ -15,16 +15,14 @@ ...@@ -15,16 +15,14 @@
~ See the License for the specific language governing permissions and ~ See the License for the specific language governing permissions and
~ limitations under the License. ~ limitations under the License.
--> -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent> <parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client</artifactId>
<version>0.6.8-SNAPSHOT</version> <version>0.6.8-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-client-1.14</artifactId> <artifactId>dlink-client-1.14</artifactId>
<properties> <properties>
...@@ -54,5 +52,11 @@ ...@@ -54,5 +52,11 @@
<artifactId>jackson-datatype-jsr310</artifactId> <artifactId>jackson-datatype-jsr310</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.3_flink-1.14_2.12</version>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -15,16 +15,14 @@ ...@@ -15,16 +15,14 @@
~ See the License for the specific language governing permissions and ~ See the License for the specific language governing permissions and
~ limitations under the License. ~ limitations under the License.
--> -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent> <parent>
<artifactId>dlink-flink</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-flink</artifactId>
<version>0.6.8-SNAPSHOT</version> <version>0.6.8-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-flink-1.14</artifactId> <artifactId>dlink-flink-1.14</artifactId>
...@@ -45,17 +43,19 @@ ...@@ -45,17 +43,19 @@
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
<version>${flink.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId> <artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
...@@ -66,8 +66,6 @@ ...@@ -66,8 +66,6 @@
<artifactId>flink-shaded-guava</artifactId> <artifactId>flink-shaded-guava</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
...@@ -77,13 +75,13 @@ ...@@ -77,13 +75,13 @@
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId> <artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
<version>${flink.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
...@@ -143,8 +141,8 @@ ...@@ -143,8 +141,8 @@
<version>${flinkcdc.version}</version> <version>${flinkcdc.version}</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
...@@ -183,10 +181,5 @@ ...@@ -183,10 +181,5 @@
<artifactId>flink-doris-connector-1.14_${scala.binary.version}</artifactId> <artifactId>flink-doris-connector-1.14_${scala.binary.version}</artifactId>
<version>1.1.0</version> <version>1.1.0</version>
</dependency> </dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.3_flink-1.14_${scala.binary.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -55,7 +55,8 @@ public class ConsolePool extends AbstractPool<StringBuilder> { ...@@ -55,7 +55,8 @@ public class ConsolePool extends AbstractPool<StringBuilder> {
if (consoleEntityMap.containsKey(user)) { if (consoleEntityMap.containsKey(user)) {
consoleEntityMap.get(user).append(str); consoleEntityMap.get(user).append(str);
} else { } else {
consoleEntityMap.put(user, new StringBuilder(str)); StringBuilder sb = new StringBuilder("Dinky User Console:");
consoleEntityMap.put(user, sb.append(str));
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment