Skip to content

Commit

Permalink
New approach to scheduling tasks (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmarchel7bulls authored Oct 28, 2024
1 parent de9be82 commit e266f92
Showing 1 changed file with 92 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package eu.nebulouscloud.predictionorchestrator;

import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.predictionorchestrator.communication.PublisherFactory;
import eu.nebulouscloud.predictionorchestrator.communication.connectors.BrokerConnectorHandler;
import eu.nebulouscloud.predictionorchestrator.communication.publishers.PredictedMetricsPublisher;
import eu.nebulouscloud.predictionorchestrator.ensembling.EnsemblingMechanism;
import eu.nebulouscloud.predictionorchestrator.ensembling.EnsemblingMechanismFactory;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -13,6 +11,8 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
Expand All @@ -30,6 +30,7 @@ public class Orchestrator {
private final EnsemblingMechanism ensemblingMechanism;
private final Properties properties;
private final PredictionRegistry predictionRegistry;
private final ConcurrentHashMap<String, Boolean> scheduledTasks = new ConcurrentHashMap<>();

@Autowired
private PublisherFactory publisherFactory;
Expand All @@ -44,10 +45,15 @@ public Orchestrator(Properties properties, PredictionRegistry predictionRegistry
this.properties = properties;
this.predictionRegistry = predictionRegistry;
this.ensemblingMechanism = EnsemblingMechanismFactory.getEnsemblingMechanism(properties);
log.info("Orchestrator initialized with a pool size of 10 for the TaskScheduler.");
log.info("Orchestrator initialized with a pool size of {} for the TaskScheduler.", properties.getInitial_forward_prediction_number());
}

public void addApplication(String appName, LocalDateTime epochStart, List<String> metricNames) {
if (epochStartMap.containsKey(appName)) {
log.warn("Application {} is already scheduled. Skipping addition.", appName);
return;
}

log.info("Adding application {} with epoch start {} and time horizon {} seconds.", appName, epochStart, properties.getInitial_prediction_horizon());

epochStartMap.put(appName, epochStart);
Expand All @@ -64,48 +70,101 @@ private void scheduleTasks(String appName) {
int timeHorizon = timeHorizonMap.get(appName);
List<String> metricNames = metricNamesMap.get(appName);

// Start scheduling the next task based on epochStart
scheduleNextTask(appName, epochStart, timeHorizon, metricNames);
LocalDateTime now = LocalDateTime.now();
long epochStartEpoch = epochStart.atZone(ZoneId.systemDefault()).toEpochSecond();
long nowEpoch = now.atZone(ZoneId.systemDefault()).toEpochSecond();
long timeHorizonSeconds = timeHorizon;

long elapsed = nowEpoch - epochStartEpoch;

long intervalsPassed;
if (elapsed < 0) {
intervalsPassed = 0;
log.debug("Current time is before epochStart for application {}.", appName);
} else {
intervalsPassed = (elapsed + timeHorizonSeconds) / timeHorizonSeconds;
log.debug("Elapsed time since epochStart: {} seconds. Intervals passed: {}.", elapsed, intervalsPassed);
}

long nextAdaptationEpoch = epochStartEpoch + (intervalsPassed * timeHorizonSeconds);
LocalDateTime nextAdaptationTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(nextAdaptationEpoch), ZoneId.systemDefault());

log.info("Next adaptation time for application {}: {}.", appName, nextAdaptationTime);

scheduleNextTask(appName, nextAdaptationTime, timeHorizon, metricNames);
}

private void scheduleNextTask(String appName, LocalDateTime adaptationTime, int timeHorizon, List<String> metricNames) {
log.info("Scheduling next task for application {} at {}.", appName, adaptationTime);
private void scheduleNextTask(String appName, LocalDateTime adaptationEvaluationTime, int timeHorizon, List<String> metricNames) {
log.info("Scheduling ensembling tasks for application {} at {}.", appName, adaptationEvaluationTime);

// Calculate fetch time
LocalDateTime fetchTime = adaptationTime.minusSeconds(10);
log.debug("Fetch time for application {}: {} (10 seconds before adaptation).", appName, fetchTime);
int bufferSeconds = 10;
int forwardPredictionCount = properties.getInitial_forward_prediction_number();
log.debug("initial_forward_prediction_number: {}", forwardPredictionCount);

long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond();
log.debug("Epoch time for application {}: {}", appName, adaptationTimeEpoch);

// Schedule ensembling for each metric
for (String metricName : metricNames) {
scheduler.schedule(() -> {
log.debug("Scheduled task running for metric {} at time {}.", metricName, adaptationTime);
pullAndEnsemblePredictions(appName, metricName, adaptationTime);
}, fetchTime.atZone(ZoneId.systemDefault()).toInstant());
final LocalDateTime now = LocalDateTime.now();

if (adaptationEvaluationTime == null) {
adaptationEvaluationTime = now.plusSeconds(timeHorizon + bufferSeconds);
}

// Schedule cleanup of old predictions
LocalDateTime cleanupTime = adaptationTime.plusSeconds(60);
log.debug("Scheduling cleanup for application {} at {} (1 minute after adaptation).", appName, cleanupTime);
for (int i = 0; i < forwardPredictionCount; i++) {
log.debug("Iteration {} of {} for application {}", i + 1, forwardPredictionCount, appName);
final LocalDateTime adaptationTime = adaptationEvaluationTime.plusSeconds((i + 1) * timeHorizon);

Duration durationUntilAdaptation = Duration.between(now, adaptationTime);
long secondsUntilAdaptation = durationUntilAdaptation.getSeconds();
log.debug("Seconds until adaptation for iteration {}: {}", i + 1, secondsUntilAdaptation);

if (secondsUntilAdaptation <= timeHorizon) {
log.warn("Adaptation time {} for application {} is within the time horizon of {} seconds. Skipping immediate ensembling.", adaptationTime, appName, timeHorizon);
continue;
}

for (String metricName : metricNames) {
scheduler.schedule(() -> {
log.debug("Cleaning up old predictions for application {}, metric {}, at timestamp {}.", appName, metricName, adaptationTimeEpoch);
predictionRegistry.cleanupOldPredictions(appName, metricName, adaptationTimeEpoch);
}, cleanupTime.atZone(ZoneId.systemDefault()).toInstant());
LocalDateTime calculatedEnsembleExecutionTime = adaptationTime.minusSeconds(bufferSeconds + timeHorizon);
final LocalDateTime ensembleExecutionTime = calculatedEnsembleExecutionTime.isBefore(now.plusSeconds(1)) ? now.plusSeconds(1) : calculatedEnsembleExecutionTime;

log.debug("Ensembling execution time for application {}: {} (buffer of {} seconds before adaptation evaluation).", appName, ensembleExecutionTime, bufferSeconds);

for (String metricName : metricNames) {
String taskKey = appName + ":" + metricName + ":" + adaptationTime;
if (scheduledTasks.putIfAbsent(taskKey, true) == null) {
scheduler.schedule(() -> {
try {
log.debug("Scheduled ensembling task running for metric {} at {}.", metricName, ensembleExecutionTime);
pullAndEnsemblePredictions(appName, metricName, adaptationTime);
} finally {
scheduledTasks.remove(taskKey);
}
}, ensembleExecutionTime.atZone(ZoneId.systemDefault()).toInstant());
}
}

final LocalDateTime cleanupTime = adaptationTime.plusSeconds(60);
log.debug("Scheduling cleanup for application {} at {} (60 seconds after adaptation).", appName, cleanupTime);

for (String metricName : metricNames) {
String taskKey = appName + ":cleanup:" + metricName + ":" + adaptationTime;
if (scheduledTasks.putIfAbsent(taskKey, true) == null) {
scheduler.schedule(() -> {
try {
long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond();
log.debug("Cleaning up old predictions for application {}, metric {}, at timestamp {}.", appName, metricName, adaptationTimeEpoch);
predictionRegistry.cleanupOldPredictions(appName, metricName, adaptationTimeEpoch);
} finally {
scheduledTasks.remove(taskKey);
}
}, cleanupTime.atZone(ZoneId.systemDefault()).toInstant());
}
}
}

// Calculate next adaptation time and recursively schedule the next task
LocalDateTime nextAdaptationTime = adaptationTime.plusSeconds(timeHorizon);
log.info("Next adaptation time for application {}: {}.", appName, nextAdaptationTime);
final LocalDateTime nextAdaptationEvaluationTime = adaptationEvaluationTime.plusSeconds(timeHorizon);
log.info("Next adaptation evaluation time for application {}: {}.", appName, nextAdaptationEvaluationTime);

scheduler.schedule(() -> scheduleNextTask(appName, nextAdaptationTime, timeHorizon, metricNames),
nextAdaptationTime.atZone(ZoneId.systemDefault()).toInstant());
scheduler.schedule(() -> scheduleNextTask(appName, nextAdaptationEvaluationTime, timeHorizon, metricNames),
nextAdaptationEvaluationTime.atZone(ZoneId.systemDefault()).toInstant());
}


private void pullAndEnsemblePredictions(String appName, String metricName, LocalDateTime adaptationTime) {
long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond();
log.info("Pulling predictions for application '{}' and metric '{}' at timestamp {}.", appName, metricName, adaptationTimeEpoch);
Expand All @@ -116,20 +175,13 @@ private void pullAndEnsemblePredictions(String appName, String metricName, Local
if (!predictionsByMethod.isEmpty()) {
log.debug("Retrieved {} predictions for application '{}' and metric '{}' at timestamp {}.", predictionsByMethod.size(), appName, metricName, adaptationTimeEpoch);

// Perform ensembling
Prediction ensembledPrediction = ensemblingMechanism.poolPredictions(predictionsByMethod, metricName);
log.info("Ensembled prediction created for application '{}' and metric '{}' with value: {}", appName, metricName, ensembledPrediction);
//TODO
// Store the ensembled prediction in the registry
// predictionRegistry.storeEnsembledPrediction(appName, ensembledPrediction);
// log.info("Ensembled prediction for application '{}' and metric '{}' stored successfully.", appName, metricName);

// Retrieve the Publisher via PublisherFactory
Publisher publisher = publisherFactory.getOrCreatePublisher(metricName);

if (publisher != null) {
try {
// Send the ensembled prediction for publishing
publisher.send(Prediction.toMap(ensembledPrediction), appName);
log.info("Ensembled prediction for application '{}' and metric '{}' sent to publisher successfully.", appName, metricName);
} catch (Exception e) {
Expand Down

0 comments on commit e266f92

Please sign in to comment.