diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java index 2341c18..4f200ce 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java @@ -240,29 +240,38 @@ public synchronized void stop() { public class AppCreationMessageHandler extends Handler { @Override public void onMessage(String key, String address, Map body, Message message, Context context) { - NebulousApp app = null; try { log.info("App creation message received"); - JsonNode appMessage = mapper.valueToTree(body); - String appID = appMessage.at("/uuid").asText(); + final JsonNode appMessage = mapper.valueToTree(body); + final String appID = appMessage.at("/uuid").asText(); MDC.put("appId", appID); Main.logFile("app-message-" + appID + ".json", appMessage.toPrettyString()); - app = NebulousApp.newFromAppMessage(mapper.valueToTree(body), ExnConnector.this); - String appIdFromMessage = app.getUUID(); + // FIXME: here's a race condition here: if the app object + // isn't registered yet when the performance indicators + // arrive, and the performance indicators aren't yet + // registered when we reach the `if` statement below, the app + // will never be deployed... + final NebulousApp app = NebulousApp.newFromAppMessage(mapper.valueToTree(body), ExnConnector.this); + final String appIdFromMessage = app.getUUID(); MDC.put("appId", appIdFromMessage); MDC.put("clusterName", app.getClusterName()); if (NebulousApps.relevantPerformanceIndicators.containsKey(appIdFromMessage)) { // If the performance indicators haven't arrived yet, this // will happen in PerformanceIndicatorMessageHandler below. app.setStateReady(NebulousApps.relevantPerformanceIndicators.get(appIdFromMessage)); - NebulousAppDeployer.deployUnmodifiedApplication(app); + final Map contextMap = MDC.getCopyOfContextMap(); + new Thread(() -> { + MDC.setContextMap(contextMap); + app.deploy(); + // No need to call `MDC.cear()` since we're not + // using thread pools + }).start(); // Not strictly necessary to remove the performance // indicators, but let's not leave unneeded data around NebulousApps.relevantPerformanceIndicators.remove(appIdFromMessage); } - } catch (Exception e) { + } catch (RuntimeException e) { log.error("Error while receiving app creation message", e); - if (app != null) app.setStateFailed(); } finally { MDC.clear(); } @@ -287,9 +296,13 @@ public void onMessage(String key, String address, Map body, Message message, Con } try (MDC.MDCCloseable a = MDC.putCloseable("appId", appId); MDC.MDCCloseable b = MDC.putCloseable("clusterName", app.getClusterName())) { log.info("Starting to undeploy and redeploy cluster."); - NebulousAppDeployer.undeployApplication(app); - NebulousAppDeployer.deployUnmodifiedApplication(app); - log.info("App redeploy finished."); + final Map contextMap = MDC.getCopyOfContextMap(); + new Thread(() -> { + MDC.setContextMap(contextMap); + NebulousAppDeployer.undeployApplication(app); + app.deploy(); + log.info("App redeploy finished."); + }).start(); } } } @@ -310,12 +323,14 @@ public void onMessage(String key, String address, Map body, Message message, Con log.error("App with uuid {} not found, ignoring app reset message.", appId); return; } - try (MDC.MDCCloseable a = MDC.putCloseable("appId", appId); MDC.MDCCloseable b = MDC.putCloseable("clusterName", app.getClusterName())) { - log.info("Starting to undeploy cluster and remove app."); - NebulousAppDeployer.undeployApplication(app); - NebulousApps.remove(appId); - log.info("Finished removing app."); - } + new Thread(() -> { + try (MDC.MDCCloseable a = MDC.putCloseable("appId", appId); MDC.MDCCloseable b = MDC.putCloseable("clusterName", app.getClusterName())) { + log.info("Starting to undeploy cluster and remove app."); + NebulousAppDeployer.undeployApplication(app); + NebulousApps.remove(appId); + log.info("Finished removing app."); + } + }).start(); } } @@ -355,9 +370,13 @@ public void onMessage(String key, String address, Map body, Message message, Con } else { MDC.put("clusterName", app.getClusterName()); if (app.getState().equals(NebulousApp.State.NEW)) { - log.info("Received performance indicator message, deploying"); - app.setStateReady(appMessage); - NebulousAppDeployer.deployUnmodifiedApplication(app); + final Map contextMap = MDC.getCopyOfContextMap(); + new Thread(() -> { + MDC.setContextMap(contextMap); + log.info("Received performance indicator message, deploying"); + app.setStateReady(appMessage); + app.deploy(); + }).start(); } else { log.warn("Received duplicate performance indicator message for app, ignoring"); } @@ -369,8 +388,8 @@ public void onMessage(String key, String address, Map body, Message message, Con } /** - * A handler that detects when the solver has started for a given - * application. + * A handler that detects when the solver for a given application has + * started, and sends it the AMPL file and metric list. */ public class SolverStatusMessageHandler extends Handler { @Override @@ -401,6 +420,7 @@ public void onMessage(String key, String address, Map body, Message message, Con if (app == null) { log.info("Received solver status message {} for unknown app object, this should not happen", body); } else { + // This should be very quick, no need to start a thread MDC.put("clusterName", app.getClusterName()); app.sendAMPL(); app.sendMetricList(); // re-send for solver @@ -438,7 +458,12 @@ public void onMessage(String key, String address, Map body, Message message, Con MDC.put("clusterName", app.getClusterName()); if (app.getState() == NebulousApp.State.RUNNING) { log.debug("Sending solver solution to application for redeployment"); - app.processSolution(json_body); + final Map contextMap = MDC.getCopyOfContextMap(); + new Thread(() -> { + MDC.setContextMap(contextMap); + log.debug("Received solver solution for application"); + app.redeployWithSolution(json_body); + }).start(); } else { // app.State==RUNNING gets checked once more inside // app.processSolution -- here we discard diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java index d688905..eee8dbc 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java @@ -76,7 +76,7 @@ public class LocalExecution implements Callable { if (deploy) { if (perf_msg != null) { log.info("Deploying application", connector.getAmplMessagePublisher()); - NebulousAppDeployer.deployUnmodifiedApplication(app); + app.deploy(); } else { log.warn("Performance indicators not supplied, cannot deploy"); } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java index 856b1f6..2529fae 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java @@ -654,7 +654,8 @@ private String getObjectiveFunction() { * "VariableValues" that can be processed by {@link * NebulousApp#rewriteKubevelaWithSolution}. */ - public void processSolution(ObjectNode solution) { + @Synchronized + public void redeployWithSolution(ObjectNode solution) { if (!solution.get("DeploySolution").asBoolean(false)) { // `asBoolean` returns its argument if node is missing or cannot // be converted to Boolean @@ -663,20 +664,22 @@ public void processSolution(ObjectNode solution) { ObjectNode variables = solution.withObjectProperty("VariableValues"); ObjectNode kubevela = rewriteKubevelaWithSolution(variables); if (deployGeneration > 0) { - // We assume that killing a node will confuse the application's - // Kubernetes cluster, therefore: - // 1. Recalculate node sets - // 2. Tell SAL to start fresh nodes, passing in the deployment - // scripts - // 3. Send updated KubeVela for redeployment - // 4. Shut down superfluous nodes NebulousAppDeployer.redeployApplication(this, kubevela); } else { - // 1. Calculate node sets, including Nebulous controller node - // 2. Tell SAL to start all nodes, passing in the deployment - // scripts - // 3. Send KubeVela file for deployment + // Since the solver is started as part of the initial deployment + // this branch is effectively dead code -- but in case the overall + // deployment flow changes and we somehow obtain a solution before + // deployment, can't hurt to do this. + log.warn("App received a solver solution before being deployed, this is unexpected. Boldly moving forward with initial deployment."); NebulousAppDeployer.deployApplication(this, kubevela); } } + + /** + * Perform initial deployment. + */ + @Synchronized + public void deploy() { + NebulousAppDeployer.deployApplication(this, getOriginalKubevela()); + } } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java index 4109cc9..056987e 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java @@ -219,7 +219,6 @@ public static void deployApplication(NebulousApp app, JsonNode kubevela) { String appUUID = app.getUUID(); String clusterName = app.getClusterName(); if (!app.setStateDeploying()) { - // TODO: wait until we got the performance indicators from Marta log.error("Trying to deploy app that is in state {} (should be READY), aborting deployment", app.getState().name()); app.setStateFailed(); @@ -504,16 +503,6 @@ public static void deployApplication(NebulousApp app, JsonNode kubevela) { log.info("App deployment finished."); } - /** - * Given an app, deploy the Kubevela file as specified in the initial app - * creation message. - * - * @param app the NebulOuS app object to deploy. - */ - public static void deployUnmodifiedApplication(NebulousApp app) { - deployApplication(app, app.getOriginalKubevela()); - } - /** * Given a KubeVela file, adapt the running application to its * specification.