Commit fdbab0fa authored by coderTomato's avatar coderTomato

savepoint的restapi实现

parent 26fb0d36
......@@ -4,12 +4,17 @@ import cn.hutool.http.HttpUtil;
import cn.hutool.http.Method;
import com.dlink.constant.FlinkRestAPIConstant;
import com.dlink.constant.NetConstant;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* FlinkAPI
......@@ -52,7 +57,6 @@ public class FlinkAPI {
}
private JsonNode patch(String route, String body) {
String res = HttpUtil.createRequest(Method.PATCH,NetConstant.HTTP + address + NetConstant.SLASH + route).timeout(NetConstant.SERVER_TIME_OUT_ACTIVE).body(body).execute().body();
return parse(res);
}
......@@ -74,6 +78,60 @@ public class FlinkAPI {
return true;
}
public SavePointResult savepoints(String jobId, String savePointType){
SavePointType type = SavePointType.get(savePointType);
String paramType = null;
SavePointResult result = SavePointResult.build(GatewayType.YARN_PER_JOB);
JobInfo jobInfo = new JobInfo(jobId);
Map<String, Object> paramMap = new HashMap<>();
switch (type){
case CANCEL:
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
case STOP:
paramMap.put("drain",false);
paramType = FlinkRestAPIConstant.STOP;
jobInfo.setStatus(JobInfo.JobStatus.STOP);
break;
case TRIGGER:
paramMap.put("cancel-job",false);
//paramMap.put("target-directory","hdfs:///flink13/ss1");
paramType = FlinkRestAPIConstant.SAVEPOINTS;
jobInfo.setStatus(JobInfo.JobStatus.RUN);
}
ObjectMapper mapper = new ObjectMapper();
JsonNode json = null;
try {
String s = mapper.writeValueAsString(paramMap);
json = post(FlinkRestAPIConstant.JOBS + jobId + paramType, s);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String triggerid = json.get("request-id").asText();
while (triggerid != null)
{
try {
JsonNode node = get(FlinkRestAPIConstant.JOBS + jobId + FlinkRestAPIConstant.SAVEPOINTS + NetConstant.SLASH + triggerid);
JsonNode operation = node.get("operation");
String location = operation.get("location").toString();
List<JobInfo> jobInfos = new ArrayList<>();
jobInfo.setSavePoint(location);
jobInfos.add(jobInfo);
result.setJobInfos(jobInfos);
break;
} catch (Exception e) {
e.printStackTrace();
result.fail(e.getMessage());
break;
}
}
return result;
}
public String getVersion() {
JsonNode result = get(FlinkRestAPIConstant.CONFIG);
return result.get("flink-version").asText();
......
......@@ -23,4 +23,12 @@ public interface FlinkRestAPIConstant {
* cancel
*/
String CANCEL = "/yarn-cancel";
/**
* savepoints
*/
String SAVEPOINTS = "/savepoints";
/**
* stop
*/
String STOP = "/stop";
}
......@@ -415,7 +415,7 @@ public class JobManager {
savePointType, null));
return Gateway.build(config.getGatewayConfig()).savepointJob();
} else {
return null;
return FlinkAPI.build(config.getAddress()).savepoints(jobId,savePointType);
}
}
......
package com.dlink.core;
import com.dlink.api.FlinkAPI;
import com.dlink.gateway.result.SavePointResult;
import com.fasterxml.jackson.databind.JsonNode;
import org.junit.Test;
......@@ -14,7 +15,16 @@ import java.util.List;
**/
public class FlinkRestAPITest {
private String address = "192.168.123.157:8081";
//private String address = "192.168.123.157:8081";
private String address = "node02:45659";
@Test
public void savepointTest(){
//JsonNode savepointInfo = FlinkAPI.build(address).getSavepointInfo("602ad9d03b872dba44267432d1a2a3b2","04044589477a973a32e7dd53e1eb20fd");
SavePointResult savepoints = FlinkAPI.build(address).savepoints("243b97597448edbd2e635fc3d25b1064", "trigger");
System.out.println(savepoints.toString());
}
@Test
public void selectTest(){
List<JsonNode> jobs = FlinkAPI.build(address).listJobs();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment