Commit 0bef2d49 authored by wenmo's avatar wenmo

优化 PerJob 和 Application 作业的JID提交检测

parent be150742
...@@ -17,7 +17,7 @@ public class TaskWorker implements Runnable { ...@@ -17,7 +17,7 @@ public class TaskWorker implements Runnable {
@Override @Override
public void run() { public void run() {
log.info("TaskWorker run"); // log.info("TaskWorker run");
while (running) { while (running) {
DaemonTask daemonTask = queue.dequeue(); DaemonTask daemonTask = queue.dequeue();
if (daemonTask != null) { if (daemonTask != null) {
...@@ -31,7 +31,7 @@ public class TaskWorker implements Runnable { ...@@ -31,7 +31,7 @@ public class TaskWorker implements Runnable {
} }
public void shutdown() { public void shutdown() {
log.info(Thread.currentThread().getName() + "TaskWorker shutdown"); // log.info(Thread.currentThread().getName() + "TaskWorker shutdown");
running = false; running = false;
} }
} }
...@@ -53,6 +53,15 @@ public class KubernetesApplicationGateway extends KubernetesGateway { ...@@ -53,6 +53,15 @@ public class KubernetesApplicationGateway extends KubernetesGateway {
ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration); ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient(); ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient();
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get(); Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
int counts = 10;
while (jobStatusMessages.size() == 0 && counts > 0) {
Thread.sleep(1000);
counts--;
jobStatusMessages = clusterClient.listJobs().get();
if (jobStatusMessages.size() > 0) {
break;
}
}
if (jobStatusMessages.size() > 0) { if (jobStatusMessages.size() > 0) {
List<String> jids = new ArrayList<>(); List<String> jids = new ArrayList<>();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) { for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
......
...@@ -8,6 +8,7 @@ import com.dlink.gateway.exception.GatewayException; ...@@ -8,6 +8,7 @@ import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult; import com.dlink.gateway.result.YarnResult;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
...@@ -67,12 +68,22 @@ public class YarnApplicationGateway extends YarnGateway { ...@@ -67,12 +68,22 @@ public class YarnApplicationGateway extends YarnGateway {
applicationConfiguration); applicationConfiguration);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get(); Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
int counts = 10;
while (jobStatusMessages.size() == 0 && counts > 0) {
Thread.sleep(1000);
counts--;
jobStatusMessages = clusterClient.listJobs().get();
if (jobStatusMessages.size() > 0) {
break;
}
}
if (jobStatusMessages.size() > 0) { if (jobStatusMessages.size() > 0) {
List<String> jids = new ArrayList<>(); List<String> jids = new ArrayList<>();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) { for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jids.add(jobStatusMessage.getJobId().toHexString()); jids.add(jobStatusMessage.getJobId().toHexString());
} }
result.setJids(jids); result.setJids(jids);
logger.info("JIDS =" + StringUtils.join(jids, ","));
} }
ApplicationId applicationId = clusterClient.getClusterId(); ApplicationId applicationId = clusterClient.getClusterId();
result.setAppId(applicationId.toString()); result.setAppId(applicationId.toString());
......
...@@ -56,6 +56,15 @@ public class YarnPerJobGateway extends YarnGateway { ...@@ -56,6 +56,15 @@ public class YarnPerJobGateway extends YarnGateway {
result.setAppId(applicationId.toString()); result.setAppId(applicationId.toString());
result.setWebURL(clusterClient.getWebInterfaceURL()); result.setWebURL(clusterClient.getWebInterfaceURL());
Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get(); Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
int counts = 10;
while (jobStatusMessages.size() == 0 && counts > 0) {
Thread.sleep(1000);
counts--;
jobStatusMessages = clusterClient.listJobs().get();
if (jobStatusMessages.size() > 0) {
break;
}
}
if (jobStatusMessages.size() > 0) { if (jobStatusMessages.size() > 0) {
List<String> jids = new ArrayList<>(); List<String> jids = new ArrayList<>();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) { for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
......
...@@ -710,6 +710,9 @@ export default (): React.ReactNode => { ...@@ -710,6 +710,9 @@ export default (): React.ReactNode => {
<li> <li>
<Link>优化 K8S Application 提交配置</Link> <Link>优化 K8S Application 提交配置</Link>
</li> </li>
<li>
<Link>优化 PerJob 和 Application 作业的JID提交检测</Link>
</li>
</ul> </ul>
</Paragraph> </Paragraph>
</Timeline.Item> </Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment