Skip to content

Commit

Permalink
More robust polling for cluster start
Browse files Browse the repository at this point in the history
- Give up instead of polling endlessly in these conditions:
  - Fail when getCluster response includes "status":"failed"
  - Fail when getCluster returns null or response without "status" field
    three times within 30s
- Decrease frequency of logging while in the polling loop

Change-Id: Ib40732f7095c253b1a1134d9a97e31813e879549
  • Loading branch information
rudi committed Apr 25, 2024
1 parent bb1951f commit 847cdad
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,11 @@ public void onMessage(String key, String address, Map body, Message message, Con
* }</pre>
*
* @param responseMessage The response from exn-middleware.
* @param appID The application ID, used for logging only.
* @param caller Caller information, used for logging only.
* @return The SAL response as a parsed JsonNode, or a node where {@code
* isMissingNode()} will return true if SAL reported an error.
*/
private static JsonNode extractPayloadFromExnResponse(Map<String, Object> responseMessage, String appID, String caller) {
private static JsonNode extractPayloadFromExnResponse(Map<String, Object> responseMessage, String caller) {
JsonNode response = mapper.valueToTree(responseMessage);
String salRawResponse = response.at("/body").asText(); // it's already a string, asText() is for the type system
JsonNode metadata = response.at("/metaData");
Expand Down Expand Up @@ -511,7 +510,7 @@ public List<NodeCandidate> findNodeCandidatesFromSal(List<Requirement> requireme
return null;
}
Map<String, Object> response = findSalNodeCandidates.sendSync(msg, appID, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, appID, "findNodeCandidatesFromSal");
JsonNode payload = extractPayloadFromExnResponse(response, "findNodeCandidatesFromSal");
if (payload.isMissingNode()) return null;
if (!payload.isArray()) return null;
List<NodeCandidate> candidates = Arrays.asList(mapper.convertValue(payload, NodeCandidate[].class));
Expand Down Expand Up @@ -578,7 +577,7 @@ public boolean defineCluster(String appID, String clusterName, ObjectNode cluste
return false;
}
Map<String, Object> response = defineCluster.sendSync(msg, appID, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, appID, "defineCluster");
JsonNode payload = extractPayloadFromExnResponse(response, "defineCluster");
return payload.asBoolean();
}

Expand All @@ -591,7 +590,7 @@ public boolean defineCluster(String appID, String clusterName, ObjectNode cluste
public JsonNode getCluster(String clusterName) {
Map<String, Object> msg = Map.of("metaData", Map.of("user", "admin", "clusterName", clusterName));
Map<String, Object> response = getCluster.sendSync(msg, clusterName, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, clusterName, "getCluster");
JsonNode payload = extractPayloadFromExnResponse(response, "getCluster");
return payload.isMissingNode() ? null : payload;
}

Expand All @@ -613,7 +612,7 @@ public boolean labelNodes(String appID, String clusterID, JsonNode labels) {
return false;
}
Map<String, Object> response = labelNodes.sendSync(msg, appID, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, appID, "labelNodes");
JsonNode payload = extractPayloadFromExnResponse(response, "labelNodes");
return payload.isMissingNode() ? false : true;
}

Expand All @@ -632,7 +631,7 @@ public boolean deployCluster(String appID, String clusterName) {
Map<String, Object> msg = Map.of("metaData",
Map.of("user", "admin", "clusterName", clusterName));
Map<String, Object> response = deployCluster.sendSync(msg, appID, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, appID, "deployCluster");
JsonNode payload = extractPayloadFromExnResponse(response, "deployCluster");
return payload.asBoolean();
}

Expand Down Expand Up @@ -665,7 +664,7 @@ public long deployApplication(String appID, String clusterName, String appName,
return -1;
}
Map<String, Object> response = deployApplication.sendSync(msg, appID, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, appID, "deployApplication");
JsonNode payload = extractPayloadFromExnResponse(response, "deployApplication");
return payload.asLong();
}

Expand Down Expand Up @@ -694,7 +693,7 @@ public void scaleOut(String appID, String clusterName, ArrayNode nodesToAdd) {
// value from scaleOut is the same as getCluster, but since we have to
// poll for cluster status anyway to make sure the new machines are
// running, we do not return it here.
JsonNode payload = extractPayloadFromExnResponse(response, appID, "scaleOut");
JsonNode payload = extractPayloadFromExnResponse(response, "scaleOut");
}

/**
Expand All @@ -718,7 +717,7 @@ public boolean scaleIn(String appID, String clusterName, List<String> superfluou
return false;
}
Map<String, Object> response = scaleIn.sendSync(msg, appID, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, appID, "scaleIn");
JsonNode payload = extractPayloadFromExnResponse(response, "scaleIn");
return payload.asBoolean();
}

Expand All @@ -734,7 +733,7 @@ public boolean deleteCluster(String appID, String clusterName) {
Map<String, Object> msg = Map.of("metaData",
Map.of("user", "admin", "clusterName", clusterName));
Map<String, Object> response = deleteCluster.sendSync(msg, appID, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, appID, "deleteCluster");
JsonNode payload = extractPayloadFromExnResponse(response, "deleteCluster");
return payload.asBoolean();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,64 +125,82 @@ public static String createNodeName(String clusterName, String componentName, in
}

/**
* Given a cluster definition (as returned by {@link
* ExnConnector#getCluster}), return if all nodes are ready, i.e., are in
* state {@code "Finished"}. Once this method returns {@code true}, it is
* safe to call {@link ExnConnector#labelNodes} and {@link
* ExnConnector#deployApplication}.
*
* <p>Note that this approach will not detect when a call to the scaleIn
* endpoint has finished, since there are no nodes to start in that case.
* But for our workflow this does not matter, since scaling in is done
* after scaling out, relabeling and redeploying the application, so there
* is no other step that needs to wait for scaleIn to finishh.
*
* @param clusterStatus The cluster status, as returned by {@link
* ExnConnector#getCluster}.
* @return {@code true} if all nodes are in state {@code "Finished"},
* {@code false} otherwise.
*/
private static boolean isClusterDeploymentFinished(JsonNode clusterStatus) {
if (clusterStatus == null || !clusterStatus.isObject())
// Catch various failure states, e.g., SAL spuriously returning
// null. Persistent getClusterStatus failures need to be handled outside
// this method.
return false;
return clusterStatus.withArray("/nodes")
.findParents("state")
.stream()
.allMatch(node -> node.get("state").asText().equals("Finished"));
}

/**
* Wait until all nodes in cluster are in state "Finished".
* Wait until cluster deployment is finished.
*
* <p>Note: Cluster deployment includes provisioning and booting VMs,
* installing various software packages, bringing up a Kubernetes cluster
* and installing the NebulOuS runtime. This can take some minutes.
* Depending on the status of the {@code status} field in the getCluster
* endpoint return value, we do the following:
*
* <ul>
* <li> {@code submited}: wait 10 seconds, then poll again.
* <li> {@code deployed}: return {@code true}.
* <li> {@code failed}: return {@code false}.
* <li> others: warn for unknown value and handle like {@code submited}.
* <li> getCluster returns {@code null}: If more than 3 times in a row,
* return {@code false}. Else wait 10 seconds, then poll again.
* </ul>
*
* @param conn The exn connector.
* @param clusterName The name of the cluster to poll.
*/
private static boolean waitForClusterDeploymentFinished(ExnConnector conn, String clusterName, String appUUID) {
// TODO: find out what state node(s) or the whole cluster are in when
// cluster start fails, and return false in that case.
try {
// Sleep a little at the beginning so that SAL has a chance to
// initialize its data structures etc. -- don't want to call
// getCluster immediately after deployCluster
Thread.sleep(10000);
} catch (InterruptedException e1) {
// ignore
}
JsonNode clusterState = conn.getCluster(clusterName);
while (clusterState == null || !isClusterDeploymentFinished(clusterState)) {
log.info("Waiting for cluster deployment to finish, cluster state = {}", clusterState);
private static boolean waitForClusterDeploymentFinished(ExnConnector conn, String clusterName) {
final int pollInterval = 10000; // Check status every 10s
int callsSincePrinting = 0; // number of intervals since we last logged what we're doing
int failedCalls = 0;
final int maxFailedCalls = 3; // number of retries if getCluster returns null
while (true) {
// Note: values for the "status" field come from SAL source:
// https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/887b19b1c1f991b517a3983133bd8857e7e9cc2b/sal-service/src/main/java/org/ow2/proactive/sal/service/service/ClusterService.java#L200

try {
Thread.sleep(10000);
// Immediately sleep on first loop iteration, so SAL has a chance to catch up
Thread.sleep(pollInterval);
} catch (InterruptedException e1) {
// ignore
}
clusterState = conn.getCluster(clusterName);
JsonNode clusterState = conn.getCluster(clusterName);
final String status;
if (clusterState != null) {
JsonNode jsonState = clusterState.at("/status");
status = jsonState.isMissingNode() ? null : jsonState.asText();
} else {
status = null;
}
if (status == null) {
failedCalls++;
if (failedCalls >= maxFailedCalls) {
log.warn("getCluster returned invalid result (null or structure without 'status' field) too many times, giving up");
return false;
} else {
log.warn("getCluster returned invalid result (null or structure without 'status' field), retrying");
continue;
}
} else {
// Forget about intermittent failures
failedCalls = 0;
}
if (status.equals("deployed")) {
log.info("Cluster deployment finished successfully");
return true;
} else if (status.equals("failed")) {
log.warn("Cluster deployment failed");
return false;
} else {
if (!status.equals("submited" /* [sic] */)) {
// Better paranoid than sorry
log.warn("Unknown 'status' value in getCluster result: {}", status);
}
// still waiting, log every minute
if (callsSincePrinting < 5) {
callsSincePrinting++;
} else {
log.info("Waiting for cluster deployment to finish, cluster state = {}", clusterState);
callsSincePrinting = 0;
}
}
}
return true;
}

/**
Expand Down Expand Up @@ -409,7 +427,7 @@ public static void deployApplication(NebulousApp app, JsonNode kubevela) {
return;
}

if (!waitForClusterDeploymentFinished(conn, clusterName, appUUID)) {
if (!waitForClusterDeploymentFinished(conn, clusterName)) {
log.error("Error while waiting for deployCluster to finish, trying to delete cluster {} and aborting deployment",
cluster);
app.setStateFailed();
Expand Down Expand Up @@ -620,7 +638,7 @@ public static void redeployApplication(NebulousApp app, ObjectNode updatedKubeve
log.info("Starting scaleout: {}", nodesToAdd);
Main.logFile("redeploy-scaleout-" + appUUID + ".json", nodesToAdd.toPrettyString());
conn.scaleOut(appUUID, clusterName, nodesToAdd);
waitForClusterDeploymentFinished(conn, clusterName, appUUID);
waitForClusterDeploymentFinished(conn, clusterName);
} else {
log.info("No nodes added, skipping scaleout");
}
Expand Down

0 comments on commit 847cdad

Please sign in to comment.