Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New approach to scheduling tasks #12

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package eu.nebulouscloud.predictionorchestrator;

import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.predictionorchestrator.communication.PublisherFactory;
import eu.nebulouscloud.predictionorchestrator.communication.connectors.BrokerConnectorHandler;
import eu.nebulouscloud.predictionorchestrator.communication.publishers.PredictedMetricsPublisher;
import eu.nebulouscloud.predictionorchestrator.ensembling.EnsemblingMechanism;
import eu.nebulouscloud.predictionorchestrator.ensembling.EnsemblingMechanismFactory;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -13,6 +11,8 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
Expand All @@ -30,6 +30,7 @@ public class Orchestrator {
private final EnsemblingMechanism ensemblingMechanism;
private final Properties properties;
private final PredictionRegistry predictionRegistry;
private final ConcurrentHashMap<String, Boolean> scheduledTasks = new ConcurrentHashMap<>();

@Autowired
private PublisherFactory publisherFactory;
Expand All @@ -44,10 +45,15 @@ public Orchestrator(Properties properties, PredictionRegistry predictionRegistry
this.properties = properties;
this.predictionRegistry = predictionRegistry;
this.ensemblingMechanism = EnsemblingMechanismFactory.getEnsemblingMechanism(properties);
log.info("Orchestrator initialized with a pool size of 10 for the TaskScheduler.");
log.info("Orchestrator initialized with a pool size of {} for the TaskScheduler.", properties.getInitial_forward_prediction_number());
}

public void addApplication(String appName, LocalDateTime epochStart, List<String> metricNames) {
if (epochStartMap.containsKey(appName)) {
log.warn("Application {} is already scheduled. Skipping addition.", appName);
return;
}

log.info("Adding application {} with epoch start {} and time horizon {} seconds.", appName, epochStart, properties.getInitial_prediction_horizon());

epochStartMap.put(appName, epochStart);
Expand All @@ -64,48 +70,101 @@ private void scheduleTasks(String appName) {
int timeHorizon = timeHorizonMap.get(appName);
List<String> metricNames = metricNamesMap.get(appName);

// Start scheduling the next task based on epochStart
scheduleNextTask(appName, epochStart, timeHorizon, metricNames);
LocalDateTime now = LocalDateTime.now();
long epochStartEpoch = epochStart.atZone(ZoneId.systemDefault()).toEpochSecond();
long nowEpoch = now.atZone(ZoneId.systemDefault()).toEpochSecond();
long timeHorizonSeconds = timeHorizon;

long elapsed = nowEpoch - epochStartEpoch;

long intervalsPassed;
if (elapsed < 0) {
intervalsPassed = 0;
log.debug("Current time is before epochStart for application {}.", appName);
} else {
intervalsPassed = (elapsed + timeHorizonSeconds) / timeHorizonSeconds;
log.debug("Elapsed time since epochStart: {} seconds. Intervals passed: {}.", elapsed, intervalsPassed);
}

long nextAdaptationEpoch = epochStartEpoch + (intervalsPassed * timeHorizonSeconds);
LocalDateTime nextAdaptationTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(nextAdaptationEpoch), ZoneId.systemDefault());

log.info("Next adaptation time for application {}: {}.", appName, nextAdaptationTime);

scheduleNextTask(appName, nextAdaptationTime, timeHorizon, metricNames);
}

private void scheduleNextTask(String appName, LocalDateTime adaptationTime, int timeHorizon, List<String> metricNames) {
log.info("Scheduling next task for application {} at {}.", appName, adaptationTime);
private void scheduleNextTask(String appName, LocalDateTime adaptationEvaluationTime, int timeHorizon, List<String> metricNames) {
log.info("Scheduling ensembling tasks for application {} at {}.", appName, adaptationEvaluationTime);

// Calculate fetch time
LocalDateTime fetchTime = adaptationTime.minusSeconds(10);
log.debug("Fetch time for application {}: {} (10 seconds before adaptation).", appName, fetchTime);
int bufferSeconds = 10;
int forwardPredictionCount = properties.getInitial_forward_prediction_number();
log.debug("initial_forward_prediction_number: {}", forwardPredictionCount);

long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond();
log.debug("Epoch time for application {}: {}", appName, adaptationTimeEpoch);

// Schedule ensembling for each metric
for (String metricName : metricNames) {
scheduler.schedule(() -> {
log.debug("Scheduled task running for metric {} at time {}.", metricName, adaptationTime);
pullAndEnsemblePredictions(appName, metricName, adaptationTime);
}, fetchTime.atZone(ZoneId.systemDefault()).toInstant());
final LocalDateTime now = LocalDateTime.now();

if (adaptationEvaluationTime == null) {
adaptationEvaluationTime = now.plusSeconds(timeHorizon + bufferSeconds);
}

// Schedule cleanup of old predictions
LocalDateTime cleanupTime = adaptationTime.plusSeconds(60);
log.debug("Scheduling cleanup for application {} at {} (1 minute after adaptation).", appName, cleanupTime);
for (int i = 0; i < forwardPredictionCount; i++) {
log.debug("Iteration {} of {} for application {}", i + 1, forwardPredictionCount, appName);
final LocalDateTime adaptationTime = adaptationEvaluationTime.plusSeconds((i + 1) * timeHorizon);

Duration durationUntilAdaptation = Duration.between(now, adaptationTime);
long secondsUntilAdaptation = durationUntilAdaptation.getSeconds();
log.debug("Seconds until adaptation for iteration {}: {}", i + 1, secondsUntilAdaptation);

if (secondsUntilAdaptation <= timeHorizon) {
log.warn("Adaptation time {} for application {} is within the time horizon of {} seconds. Skipping immediate ensembling.", adaptationTime, appName, timeHorizon);
continue;
}

for (String metricName : metricNames) {
scheduler.schedule(() -> {
log.debug("Cleaning up old predictions for application {}, metric {}, at timestamp {}.", appName, metricName, adaptationTimeEpoch);
predictionRegistry.cleanupOldPredictions(appName, metricName, adaptationTimeEpoch);
}, cleanupTime.atZone(ZoneId.systemDefault()).toInstant());
LocalDateTime calculatedEnsembleExecutionTime = adaptationTime.minusSeconds(bufferSeconds + timeHorizon);
final LocalDateTime ensembleExecutionTime = calculatedEnsembleExecutionTime.isBefore(now.plusSeconds(1)) ? now.plusSeconds(1) : calculatedEnsembleExecutionTime;

log.debug("Ensembling execution time for application {}: {} (buffer of {} seconds before adaptation evaluation).", appName, ensembleExecutionTime, bufferSeconds);

for (String metricName : metricNames) {
String taskKey = appName + ":" + metricName + ":" + adaptationTime;
if (scheduledTasks.putIfAbsent(taskKey, true) == null) {
scheduler.schedule(() -> {
try {
log.debug("Scheduled ensembling task running for metric {} at {}.", metricName, ensembleExecutionTime);
pullAndEnsemblePredictions(appName, metricName, adaptationTime);
} finally {
scheduledTasks.remove(taskKey);
}
}, ensembleExecutionTime.atZone(ZoneId.systemDefault()).toInstant());
}
}

final LocalDateTime cleanupTime = adaptationTime.plusSeconds(60);
log.debug("Scheduling cleanup for application {} at {} (60 seconds after adaptation).", appName, cleanupTime);

for (String metricName : metricNames) {
String taskKey = appName + ":cleanup:" + metricName + ":" + adaptationTime;
if (scheduledTasks.putIfAbsent(taskKey, true) == null) {
scheduler.schedule(() -> {
try {
long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond();
log.debug("Cleaning up old predictions for application {}, metric {}, at timestamp {}.", appName, metricName, adaptationTimeEpoch);
predictionRegistry.cleanupOldPredictions(appName, metricName, adaptationTimeEpoch);
} finally {
scheduledTasks.remove(taskKey);
}
}, cleanupTime.atZone(ZoneId.systemDefault()).toInstant());
}
}
}

// Calculate next adaptation time and recursively schedule the next task
LocalDateTime nextAdaptationTime = adaptationTime.plusSeconds(timeHorizon);
log.info("Next adaptation time for application {}: {}.", appName, nextAdaptationTime);
final LocalDateTime nextAdaptationEvaluationTime = adaptationEvaluationTime.plusSeconds(timeHorizon);
log.info("Next adaptation evaluation time for application {}: {}.", appName, nextAdaptationEvaluationTime);

scheduler.schedule(() -> scheduleNextTask(appName, nextAdaptationTime, timeHorizon, metricNames),
nextAdaptationTime.atZone(ZoneId.systemDefault()).toInstant());
scheduler.schedule(() -> scheduleNextTask(appName, nextAdaptationEvaluationTime, timeHorizon, metricNames),
nextAdaptationEvaluationTime.atZone(ZoneId.systemDefault()).toInstant());
}


private void pullAndEnsemblePredictions(String appName, String metricName, LocalDateTime adaptationTime) {
long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond();
log.info("Pulling predictions for application '{}' and metric '{}' at timestamp {}.", appName, metricName, adaptationTimeEpoch);
Expand All @@ -116,20 +175,13 @@ private void pullAndEnsemblePredictions(String appName, String metricName, Local
if (!predictionsByMethod.isEmpty()) {
log.debug("Retrieved {} predictions for application '{}' and metric '{}' at timestamp {}.", predictionsByMethod.size(), appName, metricName, adaptationTimeEpoch);

// Perform ensembling
Prediction ensembledPrediction = ensemblingMechanism.poolPredictions(predictionsByMethod, metricName);
log.info("Ensembled prediction created for application '{}' and metric '{}' with value: {}", appName, metricName, ensembledPrediction);
//TODO
// Store the ensembled prediction in the registry
// predictionRegistry.storeEnsembledPrediction(appName, ensembledPrediction);
// log.info("Ensembled prediction for application '{}' and metric '{}' stored successfully.", appName, metricName);

// Retrieve the Publisher via PublisherFactory
Publisher publisher = publisherFactory.getOrCreatePublisher(metricName);

if (publisher != null) {
try {
// Send the ensembled prediction for publishing
publisher.send(Prediction.toMap(ensembledPrediction), appName);
log.info("Ensembled prediction for application '{}' and metric '{}' sent to publisher successfully.", appName, metricName);
} catch (Exception e) {
Expand Down