diff --git a/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/Orchestrator.java b/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/Orchestrator.java index 80d0f2e..097755c 100644 --- a/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/Orchestrator.java +++ b/prediction-orchestrator/src/main/java/eu/nebulouscloud/predictionorchestrator/Orchestrator.java @@ -1,10 +1,8 @@ package eu.nebulouscloud.predictionorchestrator; -import eu.nebulouscloud.exn.core.Context; import eu.nebulouscloud.exn.core.Publisher; import eu.nebulouscloud.predictionorchestrator.communication.PublisherFactory; import eu.nebulouscloud.predictionorchestrator.communication.connectors.BrokerConnectorHandler; -import eu.nebulouscloud.predictionorchestrator.communication.publishers.PredictedMetricsPublisher; import eu.nebulouscloud.predictionorchestrator.ensembling.EnsemblingMechanism; import eu.nebulouscloud.predictionorchestrator.ensembling.EnsemblingMechanismFactory; import lombok.extern.slf4j.Slf4j; @@ -13,6 +11,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; +import java.time.Duration; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.List; @@ -30,6 +30,7 @@ public class Orchestrator { private final EnsemblingMechanism ensemblingMechanism; private final Properties properties; private final PredictionRegistry predictionRegistry; + private final ConcurrentHashMap scheduledTasks = new ConcurrentHashMap<>(); @Autowired private PublisherFactory publisherFactory; @@ -44,10 +45,15 @@ public Orchestrator(Properties properties, PredictionRegistry predictionRegistry this.properties = properties; this.predictionRegistry = predictionRegistry; this.ensemblingMechanism = EnsemblingMechanismFactory.getEnsemblingMechanism(properties); - log.info("Orchestrator initialized with a pool size of 10 for the TaskScheduler."); + log.info("Orchestrator initialized with a pool size of {} for the TaskScheduler.", properties.getInitial_forward_prediction_number()); } public void addApplication(String appName, LocalDateTime epochStart, List metricNames) { + if (epochStartMap.containsKey(appName)) { + log.warn("Application {} is already scheduled. Skipping addition.", appName); + return; + } + log.info("Adding application {} with epoch start {} and time horizon {} seconds.", appName, epochStart, properties.getInitial_prediction_horizon()); epochStartMap.put(appName, epochStart); @@ -64,48 +70,101 @@ private void scheduleTasks(String appName) { int timeHorizon = timeHorizonMap.get(appName); List metricNames = metricNamesMap.get(appName); - // Start scheduling the next task based on epochStart - scheduleNextTask(appName, epochStart, timeHorizon, metricNames); + LocalDateTime now = LocalDateTime.now(); + long epochStartEpoch = epochStart.atZone(ZoneId.systemDefault()).toEpochSecond(); + long nowEpoch = now.atZone(ZoneId.systemDefault()).toEpochSecond(); + long timeHorizonSeconds = timeHorizon; + + long elapsed = nowEpoch - epochStartEpoch; + + long intervalsPassed; + if (elapsed < 0) { + intervalsPassed = 0; + log.debug("Current time is before epochStart for application {}.", appName); + } else { + intervalsPassed = (elapsed + timeHorizonSeconds) / timeHorizonSeconds; + log.debug("Elapsed time since epochStart: {} seconds. Intervals passed: {}.", elapsed, intervalsPassed); + } + + long nextAdaptationEpoch = epochStartEpoch + (intervalsPassed * timeHorizonSeconds); + LocalDateTime nextAdaptationTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(nextAdaptationEpoch), ZoneId.systemDefault()); + + log.info("Next adaptation time for application {}: {}.", appName, nextAdaptationTime); + + scheduleNextTask(appName, nextAdaptationTime, timeHorizon, metricNames); } - private void scheduleNextTask(String appName, LocalDateTime adaptationTime, int timeHorizon, List metricNames) { - log.info("Scheduling next task for application {} at {}.", appName, adaptationTime); + private void scheduleNextTask(String appName, LocalDateTime adaptationEvaluationTime, int timeHorizon, List metricNames) { + log.info("Scheduling ensembling tasks for application {} at {}.", appName, adaptationEvaluationTime); - // Calculate fetch time - LocalDateTime fetchTime = adaptationTime.minusSeconds(10); - log.debug("Fetch time for application {}: {} (10 seconds before adaptation).", appName, fetchTime); + int bufferSeconds = 10; + int forwardPredictionCount = properties.getInitial_forward_prediction_number(); + log.debug("initial_forward_prediction_number: {}", forwardPredictionCount); - long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond(); - log.debug("Epoch time for application {}: {}", appName, adaptationTimeEpoch); - - // Schedule ensembling for each metric - for (String metricName : metricNames) { - scheduler.schedule(() -> { - log.debug("Scheduled task running for metric {} at time {}.", metricName, adaptationTime); - pullAndEnsemblePredictions(appName, metricName, adaptationTime); - }, fetchTime.atZone(ZoneId.systemDefault()).toInstant()); + final LocalDateTime now = LocalDateTime.now(); + + if (adaptationEvaluationTime == null) { + adaptationEvaluationTime = now.plusSeconds(timeHorizon + bufferSeconds); } - // Schedule cleanup of old predictions - LocalDateTime cleanupTime = adaptationTime.plusSeconds(60); - log.debug("Scheduling cleanup for application {} at {} (1 minute after adaptation).", appName, cleanupTime); + for (int i = 0; i < forwardPredictionCount; i++) { + log.debug("Iteration {} of {} for application {}", i + 1, forwardPredictionCount, appName); + final LocalDateTime adaptationTime = adaptationEvaluationTime.plusSeconds((i + 1) * timeHorizon); + + Duration durationUntilAdaptation = Duration.between(now, adaptationTime); + long secondsUntilAdaptation = durationUntilAdaptation.getSeconds(); + log.debug("Seconds until adaptation for iteration {}: {}", i + 1, secondsUntilAdaptation); + + if (secondsUntilAdaptation <= timeHorizon) { + log.warn("Adaptation time {} for application {} is within the time horizon of {} seconds. Skipping immediate ensembling.", adaptationTime, appName, timeHorizon); + continue; + } - for (String metricName : metricNames) { - scheduler.schedule(() -> { - log.debug("Cleaning up old predictions for application {}, metric {}, at timestamp {}.", appName, metricName, adaptationTimeEpoch); - predictionRegistry.cleanupOldPredictions(appName, metricName, adaptationTimeEpoch); - }, cleanupTime.atZone(ZoneId.systemDefault()).toInstant()); + LocalDateTime calculatedEnsembleExecutionTime = adaptationTime.minusSeconds(bufferSeconds + timeHorizon); + final LocalDateTime ensembleExecutionTime = calculatedEnsembleExecutionTime.isBefore(now.plusSeconds(1)) ? now.plusSeconds(1) : calculatedEnsembleExecutionTime; + + log.debug("Ensembling execution time for application {}: {} (buffer of {} seconds before adaptation evaluation).", appName, ensembleExecutionTime, bufferSeconds); + + for (String metricName : metricNames) { + String taskKey = appName + ":" + metricName + ":" + adaptationTime; + if (scheduledTasks.putIfAbsent(taskKey, true) == null) { + scheduler.schedule(() -> { + try { + log.debug("Scheduled ensembling task running for metric {} at {}.", metricName, ensembleExecutionTime); + pullAndEnsemblePredictions(appName, metricName, adaptationTime); + } finally { + scheduledTasks.remove(taskKey); + } + }, ensembleExecutionTime.atZone(ZoneId.systemDefault()).toInstant()); + } + } + + final LocalDateTime cleanupTime = adaptationTime.plusSeconds(60); + log.debug("Scheduling cleanup for application {} at {} (60 seconds after adaptation).", appName, cleanupTime); + + for (String metricName : metricNames) { + String taskKey = appName + ":cleanup:" + metricName + ":" + adaptationTime; + if (scheduledTasks.putIfAbsent(taskKey, true) == null) { + scheduler.schedule(() -> { + try { + long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond(); + log.debug("Cleaning up old predictions for application {}, metric {}, at timestamp {}.", appName, metricName, adaptationTimeEpoch); + predictionRegistry.cleanupOldPredictions(appName, metricName, adaptationTimeEpoch); + } finally { + scheduledTasks.remove(taskKey); + } + }, cleanupTime.atZone(ZoneId.systemDefault()).toInstant()); + } + } } - // Calculate next adaptation time and recursively schedule the next task - LocalDateTime nextAdaptationTime = adaptationTime.plusSeconds(timeHorizon); - log.info("Next adaptation time for application {}: {}.", appName, nextAdaptationTime); + final LocalDateTime nextAdaptationEvaluationTime = adaptationEvaluationTime.plusSeconds(timeHorizon); + log.info("Next adaptation evaluation time for application {}: {}.", appName, nextAdaptationEvaluationTime); - scheduler.schedule(() -> scheduleNextTask(appName, nextAdaptationTime, timeHorizon, metricNames), - nextAdaptationTime.atZone(ZoneId.systemDefault()).toInstant()); + scheduler.schedule(() -> scheduleNextTask(appName, nextAdaptationEvaluationTime, timeHorizon, metricNames), + nextAdaptationEvaluationTime.atZone(ZoneId.systemDefault()).toInstant()); } - private void pullAndEnsemblePredictions(String appName, String metricName, LocalDateTime adaptationTime) { long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond(); log.info("Pulling predictions for application '{}' and metric '{}' at timestamp {}.", appName, metricName, adaptationTimeEpoch); @@ -116,20 +175,13 @@ private void pullAndEnsemblePredictions(String appName, String metricName, Local if (!predictionsByMethod.isEmpty()) { log.debug("Retrieved {} predictions for application '{}' and metric '{}' at timestamp {}.", predictionsByMethod.size(), appName, metricName, adaptationTimeEpoch); - // Perform ensembling Prediction ensembledPrediction = ensemblingMechanism.poolPredictions(predictionsByMethod, metricName); log.info("Ensembled prediction created for application '{}' and metric '{}' with value: {}", appName, metricName, ensembledPrediction); - //TODO - // Store the ensembled prediction in the registry -// predictionRegistry.storeEnsembledPrediction(appName, ensembledPrediction); -// log.info("Ensembled prediction for application '{}' and metric '{}' stored successfully.", appName, metricName); - // Retrieve the Publisher via PublisherFactory Publisher publisher = publisherFactory.getOrCreatePublisher(metricName); if (publisher != null) { try { - // Send the ensembled prediction for publishing publisher.send(Prediction.toMap(ensembledPrediction), appName); log.info("Ensembled prediction for application '{}' and metric '{}' sent to publisher successfully.", appName, metricName); } catch (Exception e) {