Commit af3b7796 authored by wenmo's avatar wenmo

支持 Kubernetes Session 和 Application 模式提交

parent 3b1e1308
......@@ -21,7 +21,7 @@ Dlink 是一个交互式的 FlinkSQL Studio,可以在线开发、补全、校
注意:以下功能均为对应版本已实现的功能,实测可用。
| 应用 | 方向 | 功能 | 进展 |
| :------: | :------------: | ------------------------------------------------------------ | :---: |
| :------: |:----------:|---------------------------------------| :---: |
| 开发中心 | FlinkSQL | 支持 sql-client 所有语法 | 0.4.0 |
| | | 支持 Flink 所有 Configuration | 0.4.0 |
| | | 支持 Flink 所有 Connector | 0.4.0 |
......@@ -46,7 +46,10 @@ Dlink 是一个交互式的 FlinkSQL Studio,可以在线开发、补全、校
| | | 支持 yarn session 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn per-job 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 yarn application 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 kubernetes session 模式下 FlinkSQL 提交 | 0.4.0 |
| | | 支持 kubernetes application 模式下 FlinkSQL 提交 | 0.4.0 |
| | Flink 作业 | 支持 yarn application 模式下 Jar 提交 | 0.4.0 |
| | | 支持 k8s application 模式下 Jar 提交 | 0.4.0 |
| | | 支持 作业 Cancel | 0.4.0 |
| | | 支持 作业 SavePoint 的 Cancel、Stop、Trigger | 0.4.0 |
| | | 新增 作业自动从 SavePoint 恢复机制(包含最近、最早、指定一次) | 0.4.0 |
......@@ -294,7 +297,7 @@ AGG BY TOP2(value) as (value,rank);
20. 支持 Flink 所有官方的连接器及插件的扩展,但需注意版本号适配。
21. 使用 IDEA 进行源码调试时,需要在 admin 及 core 下修改相应 pom 依赖的引入来完成功能的加载。
22. 支持基于 StreamGraph 的可执行 FlinkSql (Insert into)的血缘分析,无论你的 sql 有多复杂或者多 view。
23. Dlink 目前提交方式支持 Standalone 、Yarn Session、Yarn PerJob、Yarn Application,K8S 后续支持
23. Dlink 目前所有的提交方式
24. Dlink 目前对于 Flink 多版本的支持只能一个 Dlink 实例支持一个 Flink 版本,未来将开源同时支持多版本的能力。
25. 使用 Yarn PerJob、Yarn Application 需要配置集群配置,且其自动注册的集群实例需要手动点击回收。
26. 其他内容后续更新。。。
......
......@@ -61,7 +61,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
} else {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())) {
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())||GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if(!isJarTask) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
......@@ -105,7 +105,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
private boolean isJarTask(Task task){
return GatewayType.YARN_APPLICATION.equalsValue(task.getType())&&Asserts.isNotNull(task.getJarId());
return (GatewayType.YARN_APPLICATION.equalsValue(task.getType())||GatewayType.KUBERNETES_APPLICATION.equalsValue(task.getType()))&&Asserts.isNotNull(task.getJarId());
}
@Override
......
......@@ -79,7 +79,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>provided</scope>
<!-- <scope>provided</scope>-->
<version>8.0.21</version>
</dependency>
<dependency>
......
......@@ -160,6 +160,13 @@
<include>dlink-app-${project.version}-jar-with-dependencies.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-doc/extends</directory>
<outputDirectory>jar</outputDirectory>
<includes>
<include>Dockerfile</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-web/dist</directory>
<outputDirectory>html</outputDirectory>
......
......@@ -52,7 +52,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<!--<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
......@@ -67,9 +67,14 @@
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusion>-->
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
......@@ -53,7 +53,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<!--<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
......@@ -68,9 +68,14 @@
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusion>-->
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
......@@ -53,7 +53,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<!--<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
......@@ -68,9 +68,14 @@
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusion>-->
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
......@@ -52,24 +52,29 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<!--<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
</exclusion>-->
<!--<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
</exclusion>-->
<!--<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusion>-->
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
......@@ -137,7 +137,8 @@ public class JobManager {
}
public static boolean useGateway(String type) {
return (GatewayType.YARN_PER_JOB.equalsValue(type) || GatewayType.YARN_APPLICATION.equalsValue(type));
return (GatewayType.YARN_PER_JOB.equalsValue(type) || GatewayType.YARN_APPLICATION.equalsValue(type)
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(type));
}
private Executor createExecutor() {
......@@ -234,7 +235,7 @@ public class JobManager {
currentSql = String.join(sqlSeparator, inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = null;
if (GatewayType.YARN_APPLICATION.equals(runMode)) {
if (GatewayType.YARN_APPLICATION.equals(runMode)||GatewayType.KUBERNETES_APPLICATION.equals(runMode)) {
config.addGatewayConfig(executor.getSetConfig());
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
} else {
......@@ -271,7 +272,7 @@ public class JobManager {
currentSql = String.join(sqlSeparator, inserts);
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = null;
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())) {
if (GatewayType.YARN_APPLICATION.equals(runMode)||GatewayType.KUBERNETES_APPLICATION.equals(runMode)) {
config.addGatewayConfig(executor.getSetConfig());
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
} else {
......
FROM centos:7
MAINTAINER dlink
ADD flink-1.14.0-bin-scala_2.11.tgz /root
ADD jdk-8u261-linux-x64.tar.gz /root
ADD dlink-app.jar /root
ARG FLINK_VERSION=1.14.0
ARG SCALA_VERSION=2.11
ENV FLINK_HOME=/root/flink-1.14.0
ENV PATH=$FLINK_HOME/bin:$PATH
ENV JAVA_HOME=/root/jdk1.8.0_261
ENV PATH=$JAVA_HOME/bin:$PATH
RUN cd $FLINK_HOME \
&& echo "env.java.home: /root/jdk1.8.0_261" >> $FLINK_HOME/conf/flink-conf.yaml \
&& echo "FLINK_HOME=/root/flink-1.14.0" >> /etc/profile \
&& echo "PATH=$FLINK_HOME/bin:$PATH" >> /etc/profile \
&& echo "JAVA_HOME=/root/jdk1.8.0_261" >> /etc/profile \
&& echo "PATH=$JAVA_HOME/bin:$PATH" >> /etc/profile \
&& source /etc/profile
EXPOSE 8081
......@@ -41,21 +41,9 @@
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<artifactId>dlink-client-1.14</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>1.12.5</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -10,7 +10,9 @@ import com.dlink.assertion.Asserts;
**/
public enum GatewayType {
LOCAL("l","local"),STANDALONE("s","standalone"),YARN_SESSION("ys","yarn-session"),YARN_APPLICATION("ya","yarn-application"),YARN_PER_JOB("ypj","yarn-per-job");
LOCAL("l","local"),STANDALONE("s","standalone"),
YARN_SESSION("ys","yarn-session"),YARN_APPLICATION("ya","yarn-application"),
YARN_PER_JOB("ypj","yarn-per-job"),KUBERNETES_APPLICATION("ka","kubernetes-application");
private String value;
private String longValue;
......
package com.dlink.gateway.kubernetes;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.KubernetesResult;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import java.util.Collections;
/**
* KubernetesApplicationGateway
*
* @author wenmo
* @since 2021/12/26 14:59
*/
public class KubernetesApplicationGateway extends KubernetesGateway {
@Override
public GatewayType getType() {
return GatewayType.KUBERNETES_APPLICATION;
}
@Override
public GatewayResult submitJobGraph(JobGraph jobGraph) {
throw new GatewayException("Couldn't deploy Kubernetes Application Cluster with job graph.");
}
@Override
public GatewayResult submitJar() {
if(Asserts.isNull(client)){
init();
}
KubernetesResult result = KubernetesResult.build(getType());
AppConfig appConfig = config.getAppConfig();
configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(appConfig.getUserJarParas(), appConfig.getUserJarMainAppClass());
KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration, client);
try{
ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient();
String clusterId = clusterClient.getClusterId();
result.setClusterId(clusterId);
result.setWebURL(clusterClient.getWebInterfaceURL());
result.success();
}catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
result.fail(e.getMessage());
}finally {
kubernetesClusterDescriptor.close();
}
return result;
}
}
package com.dlink.gateway.kubernetes;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.AbstractGateway;
import com.dlink.gateway.config.ActionType;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import java.util.*;
import java.util.concurrent.CompletableFuture;
/**
* KubernetesGateway
*
* @author wenmo
* @since 2021/12/26 14:09
*/
public abstract class KubernetesGateway extends AbstractGateway {
protected FlinkKubeClient client;
public KubernetesGateway() {
}
public KubernetesGateway(GatewayConfig config) {
super(config);
}
public void init(){
initConfig();
initKubeClient();
}
private void initConfig(){
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getFlinkConfigPath());
if(Asserts.isNotNull(config.getFlinkConfig().getConfiguration())) {
addConfigParas(config.getFlinkConfig().getConfiguration());
}
configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, config.getFlinkConfig().getSavePoint());
}
configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(config.getClusterConfig().getFlinkLibPath()));
if(Asserts.isNotNullString(config.getFlinkConfig().getJobName())) {
configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getFlinkConfig().getJobName());
}
}
private void initKubeClient(){
client = FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client");
}
private void addConfigParas(Map<String, String> configMap){
if(Asserts.isNotNull(configMap)) {
for (Map.Entry<String, String> entry : configMap.entrySet()) {
this.configuration.setString(entry.getKey(), entry.getValue());
}
}
}
public SavePointResult savepointCluster(){
return savepointCluster(null);
}
public SavePointResult savepointCluster(String savePoint){
if(Asserts.isNull(client)){
init();
}
SavePointResult result = SavePointResult.build(getType());
configuration.set(KubernetesConfigOptions.CLUSTER_ID, config.getClusterConfig().getAppId());
KubernetesClusterClientFactory clusterClientFactory = new KubernetesClusterClientFactory();
String clusterId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(clusterId)){
throw new GatewayException("No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
try(ClusterClient<String> clusterClient = clusterDescriptor.retrieve(
clusterId).getClusterClient()){
List<JobInfo> jobInfos = new ArrayList<>();
CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs();
for( JobStatusMessage jobStatusMessage: listJobsFuture.get()){
JobInfo jobInfo = new JobInfo(jobStatusMessage.getJobId().toHexString());
jobInfo.setStatus(JobInfo.JobStatus.RUN);
jobInfos.add(jobInfo);
}
runSavePointJob(jobInfos,clusterClient,savePoint);
result.setJobInfos(jobInfos);
}catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
result.fail(e.getMessage());
}
return null;
}
public SavePointResult savepointJob(){
return savepointJob(null);
}
public SavePointResult savepointJob(String savePoint){
if(Asserts.isNull(client)){
init();
}
if(Asserts.isNull(config.getFlinkConfig().getJobId())){
throw new GatewayException("No job id was specified. Please specify a job to which you would like to savepont.");
}
if(Asserts.isNotNullString(config.getClusterConfig().getYarnConfigPath())) {
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getYarnConfigPath());
}else {
configuration = new Configuration();
}
SavePointResult result = SavePointResult.build(getType());
configuration.set(KubernetesConfigOptions.CLUSTER_ID, config.getClusterConfig().getAppId());
KubernetesClusterClientFactory clusterClientFactory = new KubernetesClusterClientFactory();
String clusterId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(clusterId)){
throw new GatewayException("No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
try(ClusterClient<String> clusterClient = clusterDescriptor.retrieve(clusterId).getClusterClient()){
List<JobInfo> jobInfos = new ArrayList<>();
jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(),JobInfo.JobStatus.FAIL));
runSavePointJob(jobInfos,clusterClient,savePoint);
result.setJobInfos(jobInfos);
}catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
result.fail(e.getMessage());
}
return result;
}
private void runSavePointJob(List<JobInfo> jobInfos,ClusterClient<String> clusterClient,String savePoint) throws Exception{
for( JobInfo jobInfo: jobInfos){
if(ActionType.CANCEL== config.getFlinkConfig().getAction()){
clusterClient.cancel(JobID.fromHexString(jobInfo.getJobId()));
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
continue;
}
switch (config.getFlinkConfig().getSavePointType()){
case TRIGGER:
CompletableFuture<String> triggerFuture = clusterClient.triggerSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setSavePoint(triggerFuture.get());
break;
case STOP:
CompletableFuture<String> stopFuture = clusterClient.stopWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), true, savePoint);
jobInfo.setStatus(JobInfo.JobStatus.STOP);
jobInfo.setSavePoint(stopFuture.get());
break;
case CANCEL:
CompletableFuture<String> cancelFuture = clusterClient.cancelWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
jobInfo.setSavePoint(cancelFuture.get());
break;
default:
}
}
}
public TestResult test(){
try {
initConfig();
}catch (Exception e){
logger.error("测试 Flink 配置失败:"+e.getMessage());
return TestResult.fail("测试 Flink 配置失败:"+e.getMessage());
}
try {
initKubeClient();
logger.info("配置连接测试成功");
return TestResult.success();
}catch (Exception e){
logger.error("测试 Kubernetes 配置失败:"+e.getMessage());
return TestResult.fail("测试 Kubernetes 配置失败:"+e.getMessage());
}
}
}
package com.dlink.gateway.result;
import com.dlink.gateway.GatewayType;
import java.time.LocalDateTime;
/**
* KubernetesResult
*
* @author wenmo
* @since 2021/12/26 15:06
*/
public class KubernetesResult extends AbstractGatewayResult {
private String clusterId;
private String webURL;
public KubernetesResult(GatewayType type, LocalDateTime startTime) {
super(type, startTime);
}
public KubernetesResult(String clusterId, LocalDateTime startTime, LocalDateTime endTime, boolean isSuccess, String exceptionMsg) {
super(startTime, endTime, isSuccess, exceptionMsg);
this.clusterId = clusterId;
}
public String getClusterId() {
return clusterId;
}
@Override
public String getAppId() {
return clusterId;
}
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}
public void setWebURL(String webURL) {
this.webURL = webURL;
}
public String getWebURL() {
return webURL;
}
public static KubernetesResult build(GatewayType type){
return new KubernetesResult(type,LocalDateTime.now());
}
}
......@@ -12,8 +12,6 @@ import java.time.LocalDateTime;
* @author wenmo
* @since 2021/10/29
**/
@Getter
@Setter
public class YarnResult extends AbstractGatewayResult {
private String appId;
......@@ -36,6 +34,14 @@ public class YarnResult extends AbstractGatewayResult {
return webURL;
}
public void setAppId(String appId) {
this.appId = appId;
}
public void setWebURL(String webURL) {
this.webURL = webURL;
}
public static YarnResult build(GatewayType type){
return new YarnResult(type,LocalDateTime.now());
}
......
package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult;
......@@ -69,6 +69,8 @@ public class YarnApplicationGateway extends YarnGateway {
e.printStackTrace();
logger.error(e.getMessage());
result.fail(e.getMessage());
}finally {
yarnClusterDescriptor.close();
}
return result;
}
......
com.dlink.gateway.yarn.YarnApplicationGateway
com.dlink.gateway.yarn.YarnPerJobGateway
com.dlink.gateway.kubernetes.KubernetesApplicationGateway
\ No newline at end of file
......@@ -108,9 +108,11 @@ const StudioSetting = (props: any) => {
<Option value={RUN_MODE.YARN_SESSION}>Yarn Session</Option>
<Option value={RUN_MODE.YARN_PER_JOB}>Yarn Per-Job</Option>
<Option value={RUN_MODE.YARN_APPLICATION}>Yarn Application</Option>
<Option value={RUN_MODE.KUBERNETES_SESSION}>Kubernetes Session</Option>
<Option value={RUN_MODE.KUBERNETES_APPLICATION}>Kubernetes Application</Option>
</Select>
</Form.Item>
{(current.task.type === RUN_MODE.YARN_SESSION || current.task.type === RUN_MODE.STANDALONE) ? (
{(current.task.type === RUN_MODE.YARN_SESSION || current.task.type === RUN_MODE.KUBERNETES_SESSION || current.task.type === RUN_MODE.STANDALONE) ? (
<Row>
<Col span={24}>
<Form.Item label="Flink集群" tooltip={`选择Flink集群进行 ${current.task.type} 模式的远程提交任务`} name="clusterId"
......@@ -133,7 +135,7 @@ const StudioSetting = (props: any) => {
</Form.Item>
</Col>
</Row>) : undefined}
{(current.task.type === RUN_MODE.YARN_PER_JOB || current.task.type === RUN_MODE.YARN_APPLICATION) ? (
{(current.task.type === RUN_MODE.YARN_PER_JOB || current.task.type === RUN_MODE.YARN_APPLICATION|| current.task.type === RUN_MODE.KUBERNETES_APPLICATION) ? (
<Row>
<Col span={24}>
<Form.Item label="Flink集群配置" tooltip={`选择Flink集群配置进行 ${current.task.type} 模式的远程提交任务`}
......@@ -149,7 +151,7 @@ const StudioSetting = (props: any) => {
</Form.Item>
</Col>
</Row>) : undefined}
{(current.task.type === RUN_MODE.YARN_APPLICATION) ? (
{(current.task.type === RUN_MODE.YARN_APPLICATION || current.task.type === RUN_MODE.KUBERNETES_APPLICATION) ? (
<Row>
<Col span={24}>
<Form.Item label="可执行 Jar"
......
......@@ -4,6 +4,8 @@ export const RUN_MODE = {
YARN_SESSION:'yarn-session',
YARN_PER_JOB:'yarn-per-job',
YARN_APPLICATION:'yarn-application',
KUBERNETES_SESSION:'kubernetes-session',
KUBERNETES_APPLICATION:'kubernetes-application',
};
export const DIALECT = {
......
......@@ -76,6 +76,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
>
<Select defaultValue="Yarn" value="Yarn">
<Option value="Yarn">Flink On Yarn</Option>
<Option value="Kubernetes">Flink On Kubernetes</Option>
</Select>
</Form.Item>
<Divider>Hadoop 配置</Divider>
......
......@@ -505,6 +505,9 @@ export default (): React.ReactNode => {
<li>
<Link>解决perjob和application模式的任务名无法自定义的问题</Link>
</li>
<li>
<Link>支持 Kubernetes Session 和 Application 模式提交任务</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment