Unverified Commit c83f5fe5 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-355][admin,executor] Restore the job from a savepoint path i…

[Feature-355][admin,executor] Restore the job from a savepoint path i…
parents a16d9282 0d80b4ee
package com.dlink.service.impl; package com.dlink.service.impl;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.Alert; import com.dlink.alert.Alert;
import com.dlink.alert.AlertConfig; import com.dlink.alert.AlertConfig;
...@@ -526,15 +528,18 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -526,15 +528,18 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Savepoints latestSavepoints = savepointsService.getLatestSavepointByTaskId(task.getId()); Savepoints latestSavepoints = savepointsService.getLatestSavepointByTaskId(task.getId());
if (Asserts.isNotNull(latestSavepoints)) { if (Asserts.isNotNull(latestSavepoints)) {
config.setSavePointPath(latestSavepoints.getPath()); config.setSavePointPath(latestSavepoints.getPath());
config.getConfig().put(SavepointConfigOptions.SAVEPOINT_PATH.key(),latestSavepoints.getPath());
} }
break; break;
case EARLIEST: case EARLIEST:
Savepoints earliestSavepoints = savepointsService.getEarliestSavepointByTaskId(task.getId()); Savepoints earliestSavepoints = savepointsService.getEarliestSavepointByTaskId(task.getId());
if (Asserts.isNotNull(earliestSavepoints)) { if (Asserts.isNotNull(earliestSavepoints)) {
config.setSavePointPath(earliestSavepoints.getPath()); config.setSavePointPath(earliestSavepoints.getPath());
config.getConfig().put(SavepointConfigOptions.SAVEPOINT_PATH.key(),earliestSavepoints.getPath());
} }
break; break;
case CUSTOM: case CUSTOM:
config.getConfig().put(SavepointConfigOptions.SAVEPOINT_PATH.key(),config.getSavePointPath());
break; break;
default: default:
config.setSavePointPath(null); config.setSavePointPath(null);
......
package com.dlink.executor; package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/** /**
* AppBatchExecutor * AppBatchExecutor
* *
...@@ -12,7 +16,12 @@ public class AppBatchExecutor extends Executor { ...@@ -12,7 +16,12 @@ public class AppBatchExecutor extends Executor {
public AppBatchExecutor(ExecutorSetting executorSetting) { public AppBatchExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting; this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment(); if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
init(); init();
} }
......
package com.dlink.executor; package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/** /**
* AppStreamExecutor * AppStreamExecutor
* *
...@@ -12,7 +16,12 @@ public class AppStreamExecutor extends Executor { ...@@ -12,7 +16,12 @@ public class AppStreamExecutor extends Executor {
public AppStreamExecutor(ExecutorSetting executorSetting) { public AppStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting; this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.getExecutionEnvironment(); if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
}
init(); init();
} }
......
package com.dlink.executor; package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/** /**
* LocalBatchExecutor * LocalBatchExecutor
* *
...@@ -12,7 +16,12 @@ public class LocalBatchExecutor extends Executor { ...@@ -12,7 +16,12 @@ public class LocalBatchExecutor extends Executor {
public LocalBatchExecutor(ExecutorSetting executorSetting) { public LocalBatchExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting; this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment(); if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
init(); init();
} }
......
package com.dlink.executor; package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/** /**
* LocalStreamExecuter * LocalStreamExecuter
* *
...@@ -12,7 +16,12 @@ public class LocalStreamExecutor extends Executor { ...@@ -12,7 +16,12 @@ public class LocalStreamExecutor extends Executor {
public LocalStreamExecutor(ExecutorSetting executorSetting) { public LocalStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting; this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment(); if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
init(); init();
} }
......
package com.dlink.executor; package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/** /**
* RemoteBatchExecutor * RemoteBatchExecutor
* *
...@@ -13,7 +17,12 @@ public class RemoteBatchExecutor extends Executor { ...@@ -13,7 +17,12 @@ public class RemoteBatchExecutor extends Executor {
public RemoteBatchExecutor(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) { public RemoteBatchExecutor(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting; this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting; this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort()); if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort(), configuration);
} else {
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
}
init(); init();
} }
......
package com.dlink.executor; package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/** /**
* RemoteStreamExecutor * RemoteStreamExecutor
* *
...@@ -13,7 +17,12 @@ public class RemoteStreamExecutor extends Executor { ...@@ -13,7 +17,12 @@ public class RemoteStreamExecutor extends Executor {
public RemoteStreamExecutor(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) { public RemoteStreamExecutor(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting; this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting; this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort()); if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort(), configuration);
} else {
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
}
init(); init();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment