Skip to content

Commit

Permalink
Run long-running processes in threads
Browse files Browse the repository at this point in the history
- This lets us run multiple app deployments in parallel

- We don't bother setting up a thread pool, since (one assumes) there
  won't be thousands of parallel app deployments going on
  • Loading branch information
rudi committed Jul 17, 2024
1 parent 84b462e commit 96d68d4
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,38 @@ public synchronized void stop() {
public class AppCreationMessageHandler extends Handler {
@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
NebulousApp app = null;
try {
log.info("App creation message received");
JsonNode appMessage = mapper.valueToTree(body);
String appID = appMessage.at("/uuid").asText();
final JsonNode appMessage = mapper.valueToTree(body);
final String appID = appMessage.at("/uuid").asText();
MDC.put("appId", appID);
Main.logFile("app-message-" + appID + ".json", appMessage.toPrettyString());
app = NebulousApp.newFromAppMessage(mapper.valueToTree(body), ExnConnector.this);
String appIdFromMessage = app.getUUID();
// FIXME: here's a race condition here: if the app object
// isn't registered yet when the performance indicators
// arrive, and the performance indicators aren't yet
// registered when we reach the `if` statement below, the app
// will never be deployed...
final NebulousApp app = NebulousApp.newFromAppMessage(mapper.valueToTree(body), ExnConnector.this);
final String appIdFromMessage = app.getUUID();
MDC.put("appId", appIdFromMessage);
MDC.put("clusterName", app.getClusterName());
if (NebulousApps.relevantPerformanceIndicators.containsKey(appIdFromMessage)) {
// If the performance indicators haven't arrived yet, this
// will happen in PerformanceIndicatorMessageHandler below.
app.setStateReady(NebulousApps.relevantPerformanceIndicators.get(appIdFromMessage));
NebulousAppDeployer.deployUnmodifiedApplication(app);
final Map<String, String> contextMap = MDC.getCopyOfContextMap();
new Thread(() -> {
MDC.setContextMap(contextMap);
app.deploy();
// No need to call `MDC.cear()` since we're not
// using thread pools
}).start();
// Not strictly necessary to remove the performance
// indicators, but let's not leave unneeded data around
NebulousApps.relevantPerformanceIndicators.remove(appIdFromMessage);
}
} catch (Exception e) {
} catch (RuntimeException e) {
log.error("Error while receiving app creation message", e);
if (app != null) app.setStateFailed();
} finally {
MDC.clear();
}
Expand All @@ -287,9 +296,13 @@ public void onMessage(String key, String address, Map body, Message message, Con
}
try (MDC.MDCCloseable a = MDC.putCloseable("appId", appId); MDC.MDCCloseable b = MDC.putCloseable("clusterName", app.getClusterName())) {
log.info("Starting to undeploy and redeploy cluster.");
NebulousAppDeployer.undeployApplication(app);
NebulousAppDeployer.deployUnmodifiedApplication(app);
log.info("App redeploy finished.");
final Map<String, String> contextMap = MDC.getCopyOfContextMap();
new Thread(() -> {
MDC.setContextMap(contextMap);
NebulousAppDeployer.undeployApplication(app);
app.deploy();
log.info("App redeploy finished.");
}).start();
}
}
}
Expand All @@ -310,12 +323,14 @@ public void onMessage(String key, String address, Map body, Message message, Con
log.error("App with uuid {} not found, ignoring app reset message.", appId);
return;
}
try (MDC.MDCCloseable a = MDC.putCloseable("appId", appId); MDC.MDCCloseable b = MDC.putCloseable("clusterName", app.getClusterName())) {
log.info("Starting to undeploy cluster and remove app.");
NebulousAppDeployer.undeployApplication(app);
NebulousApps.remove(appId);
log.info("Finished removing app.");
}
new Thread(() -> {
try (MDC.MDCCloseable a = MDC.putCloseable("appId", appId); MDC.MDCCloseable b = MDC.putCloseable("clusterName", app.getClusterName())) {
log.info("Starting to undeploy cluster and remove app.");
NebulousAppDeployer.undeployApplication(app);
NebulousApps.remove(appId);
log.info("Finished removing app.");
}
}).start();
}
}

Expand Down Expand Up @@ -355,9 +370,13 @@ public void onMessage(String key, String address, Map body, Message message, Con
} else {
MDC.put("clusterName", app.getClusterName());
if (app.getState().equals(NebulousApp.State.NEW)) {
log.info("Received performance indicator message, deploying");
app.setStateReady(appMessage);
NebulousAppDeployer.deployUnmodifiedApplication(app);
final Map<String, String> contextMap = MDC.getCopyOfContextMap();
new Thread(() -> {
MDC.setContextMap(contextMap);
log.info("Received performance indicator message, deploying");
app.setStateReady(appMessage);
app.deploy();
}).start();
} else {
log.warn("Received duplicate performance indicator message for app, ignoring");
}
Expand All @@ -369,8 +388,8 @@ public void onMessage(String key, String address, Map body, Message message, Con
}

/**
* A handler that detects when the solver has started for a given
* application.
* A handler that detects when the solver for a given application has
* started, and sends it the AMPL file and metric list.
*/
public class SolverStatusMessageHandler extends Handler {
@Override
Expand Down Expand Up @@ -401,6 +420,7 @@ public void onMessage(String key, String address, Map body, Message message, Con
if (app == null) {
log.info("Received solver status message {} for unknown app object, this should not happen", body);
} else {
// This should be very quick, no need to start a thread
MDC.put("clusterName", app.getClusterName());
app.sendAMPL();
app.sendMetricList(); // re-send for solver
Expand Down Expand Up @@ -438,7 +458,12 @@ public void onMessage(String key, String address, Map body, Message message, Con
MDC.put("clusterName", app.getClusterName());
if (app.getState() == NebulousApp.State.RUNNING) {
log.debug("Sending solver solution to application for redeployment");
app.processSolution(json_body);
final Map<String, String> contextMap = MDC.getCopyOfContextMap();
new Thread(() -> {
MDC.setContextMap(contextMap);
log.debug("Received solver solution for application");
app.redeployWithSolution(json_body);
}).start();
} else {
// app.State==RUNNING gets checked once more inside
// app.processSolution -- here we discard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class LocalExecution implements Callable<Integer> {
if (deploy) {
if (perf_msg != null) {
log.info("Deploying application", connector.getAmplMessagePublisher());
NebulousAppDeployer.deployUnmodifiedApplication(app);
app.deploy();
} else {
log.warn("Performance indicators not supplied, cannot deploy");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,8 @@ private String getObjectiveFunction() {
* "VariableValues" that can be processed by {@link
* NebulousApp#rewriteKubevelaWithSolution}.
*/
public void processSolution(ObjectNode solution) {
@Synchronized
public void redeployWithSolution(ObjectNode solution) {
if (!solution.get("DeploySolution").asBoolean(false)) {
// `asBoolean` returns its argument if node is missing or cannot
// be converted to Boolean
Expand All @@ -663,20 +664,22 @@ public void processSolution(ObjectNode solution) {
ObjectNode variables = solution.withObjectProperty("VariableValues");
ObjectNode kubevela = rewriteKubevelaWithSolution(variables);
if (deployGeneration > 0) {
// We assume that killing a node will confuse the application's
// Kubernetes cluster, therefore:
// 1. Recalculate node sets
// 2. Tell SAL to start fresh nodes, passing in the deployment
// scripts
// 3. Send updated KubeVela for redeployment
// 4. Shut down superfluous nodes
NebulousAppDeployer.redeployApplication(this, kubevela);
} else {
// 1. Calculate node sets, including Nebulous controller node
// 2. Tell SAL to start all nodes, passing in the deployment
// scripts
// 3. Send KubeVela file for deployment
// Since the solver is started as part of the initial deployment
// this branch is effectively dead code -- but in case the overall
// deployment flow changes and we somehow obtain a solution before
// deployment, can't hurt to do this.
log.warn("App received a solver solution before being deployed, this is unexpected. Boldly moving forward with initial deployment.");
NebulousAppDeployer.deployApplication(this, kubevela);
}
}

/**
* Perform initial deployment.
*/
@Synchronized
public void deploy() {
NebulousAppDeployer.deployApplication(this, getOriginalKubevela());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ public static void deployApplication(NebulousApp app, JsonNode kubevela) {
String appUUID = app.getUUID();
String clusterName = app.getClusterName();
if (!app.setStateDeploying()) {
// TODO: wait until we got the performance indicators from Marta
log.error("Trying to deploy app that is in state {} (should be READY), aborting deployment",
app.getState().name());
app.setStateFailed();
Expand Down Expand Up @@ -504,16 +503,6 @@ public static void deployApplication(NebulousApp app, JsonNode kubevela) {
log.info("App deployment finished.");
}

/**
* Given an app, deploy the Kubevela file as specified in the initial app
* creation message.
*
* @param app the NebulOuS app object to deploy.
*/
public static void deployUnmodifiedApplication(NebulousApp app) {
deployApplication(app, app.getOriginalKubevela());
}

/**
* Given a KubeVela file, adapt the running application to its
* specification.<p>
Expand Down

0 comments on commit 96d68d4

Please sign in to comment.