Commit 0d80b4ee authored by wenmo's avatar wenmo

[Feature-355][admin,executor] Restore the job from a savepoint path in remote mode

parent a16d9282
package com.dlink.service.impl;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.Alert;
import com.dlink.alert.AlertConfig;
......@@ -526,15 +528,18 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Savepoints latestSavepoints = savepointsService.getLatestSavepointByTaskId(task.getId());
if (Asserts.isNotNull(latestSavepoints)) {
config.setSavePointPath(latestSavepoints.getPath());
config.getConfig().put(SavepointConfigOptions.SAVEPOINT_PATH.key(),latestSavepoints.getPath());
}
break;
case EARLIEST:
Savepoints earliestSavepoints = savepointsService.getEarliestSavepointByTaskId(task.getId());
if (Asserts.isNotNull(earliestSavepoints)) {
config.setSavePointPath(earliestSavepoints.getPath());
config.getConfig().put(SavepointConfigOptions.SAVEPOINT_PATH.key(),earliestSavepoints.getPath());
}
break;
case CUSTOM:
config.getConfig().put(SavepointConfigOptions.SAVEPOINT_PATH.key(),config.getSavePointPath());
break;
default:
config.setSavePointPath(null);
......
package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* AppBatchExecutor
*
......@@ -12,7 +16,12 @@ public class AppBatchExecutor extends Executor {
public AppBatchExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
init();
}
......
package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* AppStreamExecutor
*
......@@ -12,7 +16,12 @@ public class AppStreamExecutor extends Executor {
public AppStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
}
init();
}
......
package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* LocalBatchExecutor
*
......@@ -12,7 +16,12 @@ public class LocalBatchExecutor extends Executor {
public LocalBatchExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
init();
}
......
package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* LocalStreamExecuter
*
......@@ -12,7 +16,12 @@ public class LocalStreamExecutor extends Executor {
public LocalStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
}
init();
}
......
package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* RemoteBatchExecutor
*
......@@ -13,7 +17,12 @@ public class RemoteBatchExecutor extends Executor {
public RemoteBatchExecutor(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort(), configuration);
} else {
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
}
init();
}
......
package com.dlink.executor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dlink.assertion.Asserts;
/**
* RemoteStreamExecutor
*
......@@ -13,7 +17,12 @@ public class RemoteStreamExecutor extends Executor {
public RemoteStreamExecutor(EnvironmentSetting environmentSetting, ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort(), configuration);
} else {
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort());
}
init();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment