Commit 905aef3d authored by godkaikai's avatar godkaikai

[Fix-503][gateway,client] Compatibility of different versions of the interface ClusterClient

parent 081f5ca4
package com.dlink.utils; package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectIdentifier;
...@@ -8,6 +10,7 @@ import java.util.ArrayList; ...@@ -8,6 +10,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
/** /**
* FlinkUtil * FlinkUtil
...@@ -31,4 +34,16 @@ public class FlinkUtil { ...@@ -31,4 +34,16 @@ public class FlinkUtil {
public static List<String> catchColumn(TableResult tableResult) { public static List<String> catchColumn(TableResult tableResult) {
return Arrays.asList(tableResult.getTableSchema().getFieldNames()); return Arrays.asList(tableResult.getTableSchema().getFieldNames());
} }
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
} }
package com.dlink.utils; package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectIdentifier;
...@@ -8,6 +10,7 @@ import java.util.ArrayList; ...@@ -8,6 +10,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
/** /**
* FlinkUtil * FlinkUtil
...@@ -31,4 +34,16 @@ public class FlinkUtil { ...@@ -31,4 +34,16 @@ public class FlinkUtil {
public static List<String> catchColumn(TableResult tableResult) { public static List<String> catchColumn(TableResult tableResult) {
return Arrays.asList(tableResult.getTableSchema().getFieldNames()); return Arrays.asList(tableResult.getTableSchema().getFieldNames());
} }
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
} }
package com.dlink.utils; package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectIdentifier;
...@@ -7,6 +9,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; ...@@ -7,6 +9,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
/** /**
* FlinkUtil * FlinkUtil
...@@ -30,4 +33,16 @@ public class FlinkUtil { ...@@ -30,4 +33,16 @@ public class FlinkUtil {
public static List<String> catchColumn(TableResult tableResult) { public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames(); return tableResult.getResolvedSchema().getColumnNames();
} }
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
} }
package com.dlink.utils; package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectIdentifier;
...@@ -7,6 +9,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; ...@@ -7,6 +9,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
/** /**
* FlinkUtil * FlinkUtil
...@@ -27,8 +30,19 @@ public class FlinkUtil { ...@@ -27,8 +30,19 @@ public class FlinkUtil {
} }
} }
public static List<String> catchColumn(TableResult tableResult) { public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames(); return tableResult.getResolvedSchema().getColumnNames();
} }
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint).get().toString();
}
} }
package com.dlink.utils; package com.dlink.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ContextResolvedTable;
...@@ -8,6 +11,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; ...@@ -8,6 +11,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
/** /**
* FlinkUtil * FlinkUtil
...@@ -19,7 +23,7 @@ public class FlinkUtil { ...@@ -19,7 +23,7 @@ public class FlinkUtil {
public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table) { public static List<String> getFieldNamesFromCatalogManager(CatalogManager catalogManager, String catalog, String database, String table) {
Optional<ContextResolvedTable> tableOpt = catalogManager.getTable( Optional<ContextResolvedTable> tableOpt = catalogManager.getTable(
ObjectIdentifier.of(catalog, database, table) ObjectIdentifier.of(catalog, database, table)
); );
if (tableOpt.isPresent()) { if (tableOpt.isPresent()) {
return tableOpt.get().getResolvedSchema().getColumnNames(); return tableOpt.get().getResolvedSchema().getColumnNames();
...@@ -32,4 +36,16 @@ public class FlinkUtil { ...@@ -32,4 +36,16 @@ public class FlinkUtil {
public static List<String> catchColumn(TableResult tableResult) { public static List<String> catchColumn(TableResult tableResult) {
return tableResult.getResolvedSchema().getColumnNames(); return tableResult.getResolvedSchema().getColumnNames();
} }
public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.triggerSavepoint(JobID.fromHexString(jobId), savePoint, SavepointFormatType.DEFAULT).get().toString();
}
public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint, SavepointFormatType.DEFAULT).get().toString();
}
public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint) throws ExecutionException, InterruptedException {
return clusterClient.cancelWithSavepoint(JobID.fromHexString(jobId), savePoint, SavepointFormatType.DEFAULT).get().toString();
}
} }
...@@ -8,6 +8,7 @@ import com.dlink.gateway.exception.GatewayException; ...@@ -8,6 +8,7 @@ import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.gateway.result.TestResult; import com.dlink.gateway.result.TestResult;
import com.dlink.utils.FlinkUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
...@@ -163,18 +164,15 @@ public abstract class KubernetesGateway extends AbstractGateway { ...@@ -163,18 +164,15 @@ public abstract class KubernetesGateway extends AbstractGateway {
} }
switch (config.getFlinkConfig().getSavePointType()) { switch (config.getFlinkConfig().getSavePointType()) {
case TRIGGER: case TRIGGER:
CompletableFuture<String> triggerFuture = clusterClient.triggerSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint); jobInfo.setSavePoint(FlinkUtil.triggerSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setSavePoint(triggerFuture.get());
break; break;
case STOP: case STOP:
CompletableFuture<String> stopFuture = clusterClient.stopWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), true, savePoint); jobInfo.setSavePoint(FlinkUtil.stopWithSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setStatus(JobInfo.JobStatus.STOP); jobInfo.setStatus(JobInfo.JobStatus.STOP);
jobInfo.setSavePoint(stopFuture.get());
break; break;
case CANCEL: case CANCEL:
CompletableFuture<String> cancelFuture = clusterClient.cancelWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint); jobInfo.setSavePoint(FlinkUtil.cancelWithSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setStatus(JobInfo.JobStatus.CANCEL); jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
jobInfo.setSavePoint(cancelFuture.get());
break; break;
default: default:
} }
......
package com.dlink.gateway.yarn; package com.dlink.gateway.yarn;
import com.dlink.utils.FlinkUtil;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.ClusterRetrieveException; import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
...@@ -238,18 +239,15 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -238,18 +239,15 @@ public abstract class YarnGateway extends AbstractGateway {
} }
switch (config.getFlinkConfig().getSavePointType()) { switch (config.getFlinkConfig().getSavePointType()) {
case TRIGGER: case TRIGGER:
CompletableFuture<String> triggerFuture = clusterClient.triggerSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint); jobInfo.setSavePoint(FlinkUtil.triggerSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setSavePoint(triggerFuture.get());
break; break;
case STOP: case STOP:
CompletableFuture<String> stopFuture = clusterClient.stopWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), true, savePoint); jobInfo.setSavePoint(FlinkUtil.stopWithSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setStatus(JobInfo.JobStatus.STOP); jobInfo.setStatus(JobInfo.JobStatus.STOP);
jobInfo.setSavePoint(stopFuture.get());
break; break;
case CANCEL: case CANCEL:
CompletableFuture<String> cancelFuture = clusterClient.cancelWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint); jobInfo.setSavePoint(FlinkUtil.cancelWithSavepoint(clusterClient,jobInfo.getJobId(),savePoint));
jobInfo.setStatus(JobInfo.JobStatus.CANCEL); jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
jobInfo.setSavePoint(cancelFuture.get());
break; break;
default: default:
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment