Unverified Commit 74ec170a authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix-503][gateway,client] Compatibility of different versions of the …

[Fix-503][gateway,client] Compatibility of different versions of the …
parents 081f5ca4 905aef3d
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
......@@ -8,6 +10,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
* FlinkUtil
......@@ -31,4 +34,16 @@ public class FlinkUtil {
public static List<String> catchColumn(TableResult tableResult) {
return Arrays.asList(tableResult.getTableSchema().getFieldNames());
}
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
}
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
......@@ -8,6 +10,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
* FlinkUtil
......@@ -31,4 +34,16 @@ public class FlinkUtil {
public static List<String> catchColumn(TableResult tableResult) {
return Arrays.asList(tableResult.getTableSchema().getFieldNames());
}
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
}
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
......@@ -7,6 +9,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
* FlinkUtil
......@@ -30,4 +33,16 @@ public class FlinkUtil {
public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames();
}
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
}
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
......@@ -7,6 +9,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
* FlinkUtil
......@@ -27,8 +30,19 @@ public class FlinkUtil {
}
}
public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames();
}
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
}
package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ContextResolvedTable;
......@@ -8,6 +11,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
* FlinkUtil
......@@ -19,7 +23,7 @@ public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table) {
Optional<ContextResolvedTable> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table)
ObjectIdentifier.of(catalog, database, table)
);
if (tableOpt.isPresent()) {
return tableOpt.get().getResolvedSchema().getColumnNames();
......@@ -32,4 +36,16 @@ public class FlinkUtil {
public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames();
}
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint, SavepointFormatType.DEFAULT).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint, SavepointFormatType.DEFAULT).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint, SavepointFormatType.DEFAULT).get().toString();
}
}
......@@ -8,6 +8,7 @@ import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult;
import com.dlink.utils.FlinkUtil;
import com.dlink.utils.LogUtil;
import org.apache.flink.api.common.JobID;
......@@ -163,18 +164,15 @@ public abstract class KubernetesGateway extends AbstractGateway {
}
switch (config.getFlinkConfig().getSavePointType()) {
case TRIGGER:
CompletableFuture<String> triggerFuture = clusterClient.triggerSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setSavePoint(triggerFuture.get());
jobInfo.setSavePoint(FlinkUtil.triggerSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
break;
case STOP:
CompletableFuture<String> stopFuture = clusterClient.stopWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), true, savePoint);
jobInfo.setSavePoint(FlinkUtil.stopWithSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setStatus(JobInfo.JobStatus.STOP);
jobInfo.setSavePoint(stopFuture.get());
break;
case CANCEL:
CompletableFuture<String> cancelFuture = clusterClient.cancelWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setSavePoint(FlinkUtil.cancelWithSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
jobInfo.setSavePoint(cancelFuture.get());
break;
default:
}
......
package com.dlink.gateway.yarn;
import com.dlink.utils.FlinkUtil;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.program.ClusterClient;
......@@ -238,18 +239,15 @@ public abstract class YarnGateway extends AbstractGateway {
}
switch (config.getFlinkConfig().getSavePointType()) {
case TRIGGER:
CompletableFuture<String> triggerFuture = clusterClient.triggerSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setSavePoint(triggerFuture.get());
jobInfo.setSavePoint(FlinkUtil.triggerSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
break;
case STOP:
CompletableFuture<String> stopFuture = clusterClient.stopWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), true, savePoint);
jobInfo.setSavePoint(FlinkUtil.stopWithSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setStatus(JobInfo.JobStatus.STOP);
jobInfo.setSavePoint(stopFuture.get());
break;
case CANCEL:
CompletableFuture<String> cancelFuture = clusterClient.cancelWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setSavePoint(FlinkUtil.cancelWithSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
jobInfo.setSavePoint(cancelFuture.get());
break;
default:
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment