Skip to content

Commit

Permalink
Prediction orchestrator v2 (#7)
Browse files Browse the repository at this point in the history
* [WIP] introducing orchestrator and Ensembler

* New version of prediction orchestrator
  • Loading branch information
jmarchel7bulls authored Sep 25, 2024
1 parent 76983c8 commit b7b43fe
Show file tree
Hide file tree
Showing 39 changed files with 1,481 additions and 150 deletions.
172 changes: 100 additions & 72 deletions prediction-orchestrator/pom.xml
Original file line number Diff line number Diff line change
@@ -1,84 +1,112 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>eu.nebulouscloud</groupId>
<artifactId>predictionorchestrator</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>prediction-orchestrator</name>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.0</version> <!-- Use the latest version available -->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version> <!-- Update to the latest version -->
</dependency>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>eu.nebulouscloud</groupId>
<artifactId>predictionorchestrator</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>prediction-orchestrator</name>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>eu.nebulouscloud</groupId>
<artifactId>exn-connector-java</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.0</version> <!-- Use the latest version available -->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version> <!-- Update to the latest version -->
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20240303</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>eu.nebulouscloud</groupId>
<artifactId>exn-connector-java</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.8.0</version> <!-- Update this to the latest version if necessary -->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>1.1.0.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>

<repositories>
<!-- Repository for SNAPSHOT versions -->
<repository>
<id>oss-sonatype-snapshots</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<repositories>
<!-- Repository for SNAPSHOT versions -->
<repository>
<id>oss-sonatype-snapshots</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package eu.nebulouscloud.predictionorchestrator;

import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.predictionorchestrator.communication.PublisherFactory;
import eu.nebulouscloud.predictionorchestrator.communication.connectors.BrokerConnectorHandler;
import eu.nebulouscloud.predictionorchestrator.communication.publishers.PredictedMetricsPublisher;
import eu.nebulouscloud.predictionorchestrator.ensembling.EnsemblingMechanism;
import eu.nebulouscloud.predictionorchestrator.ensembling.EnsemblingMechanismFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
public class Orchestrator {

private final Map<String, LocalDateTime> epochStartMap = new ConcurrentHashMap<>();
private final Map<String, Integer> timeHorizonMap = new ConcurrentHashMap<>();
private final Map<String, List<String>> metricNamesMap = new ConcurrentHashMap<>();
private final TaskScheduler scheduler;
private final EnsemblingMechanism ensemblingMechanism;
private final Properties properties;
private final PredictionRegistry predictionRegistry;

@Autowired
private PublisherFactory publisherFactory;

@Autowired
public Orchestrator(Properties properties, PredictionRegistry predictionRegistry, BrokerConnectorHandler brokerConnectorHandler) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(properties.getInitial_forward_prediction_number());
taskScheduler.initialize();
this.scheduler = taskScheduler;

this.properties = properties;
this.predictionRegistry = predictionRegistry;
this.ensemblingMechanism = EnsemblingMechanismFactory.getEnsemblingMechanism(properties);
log.info("Orchestrator initialized with a pool size of 10 for the TaskScheduler.");
}

public void addApplication(String appName, LocalDateTime epochStart, List<String> metricNames) {
log.info("Adding application {} with epoch start {} and time horizon {} seconds.", appName, epochStart, properties.getInitial_prediction_horizon());

epochStartMap.put(appName, epochStart);
timeHorizonMap.put(appName, properties.getInitial_prediction_horizon());
metricNamesMap.put(appName, metricNames);

scheduleTasks(appName);
}

private void scheduleTasks(String appName) {
log.info("Scheduling tasks for application {}.", appName);

LocalDateTime epochStart = epochStartMap.get(appName);
int timeHorizon = timeHorizonMap.get(appName);
List<String> metricNames = metricNamesMap.get(appName);

// Start scheduling the next task based on epochStart
scheduleNextTask(appName, epochStart, timeHorizon, metricNames);
}

private void scheduleNextTask(String appName, LocalDateTime adaptationTime, int timeHorizon, List<String> metricNames) {
log.info("Scheduling next task for application {} at {}.", appName, adaptationTime);

// Calculate fetch time
LocalDateTime fetchTime = adaptationTime.minusSeconds(10);
log.debug("Fetch time for application {}: {} (10 seconds before adaptation).", appName, fetchTime);

long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond();
log.debug("Epoch time for application {}: {}", appName, adaptationTimeEpoch);

// Schedule ensembling for each metric
for (String metricName : metricNames) {
scheduler.schedule(() -> {
log.debug("Scheduled task running for metric {} at time {}.", metricName, adaptationTime);
pullAndEnsemblePredictions(appName, metricName, adaptationTime);
}, fetchTime.atZone(ZoneId.systemDefault()).toInstant());
}

// Schedule cleanup of old predictions
LocalDateTime cleanupTime = adaptationTime.plusSeconds(60);
log.debug("Scheduling cleanup for application {} at {} (1 minute after adaptation).", appName, cleanupTime);

for (String metricName : metricNames) {
scheduler.schedule(() -> {
log.debug("Cleaning up old predictions for application {}, metric {}, at timestamp {}.", appName, metricName, adaptationTimeEpoch);
predictionRegistry.cleanupOldPredictions(appName, metricName, adaptationTimeEpoch);
}, cleanupTime.atZone(ZoneId.systemDefault()).toInstant());
}

// Calculate next adaptation time and recursively schedule the next task
LocalDateTime nextAdaptationTime = adaptationTime.plusSeconds(timeHorizon);
log.info("Next adaptation time for application {}: {}.", appName, nextAdaptationTime);

scheduler.schedule(() -> scheduleNextTask(appName, nextAdaptationTime, timeHorizon, metricNames),
nextAdaptationTime.atZone(ZoneId.systemDefault()).toInstant());
}


private void pullAndEnsemblePredictions(String appName, String metricName, LocalDateTime adaptationTime) {
long adaptationTimeEpoch = adaptationTime.atZone(ZoneId.systemDefault()).toEpochSecond();
log.info("Pulling predictions for application '{}' and metric '{}' at timestamp {}.", appName, metricName, adaptationTimeEpoch);

if (predictionRegistry != null) {
Map<String, Prediction> predictionsByMethod = predictionRegistry.getPredictionsByMethodAndTimestamp(appName, metricName, adaptationTimeEpoch);

if (!predictionsByMethod.isEmpty()) {
log.debug("Retrieved {} predictions for application '{}' and metric '{}' at timestamp {}.", predictionsByMethod.size(), appName, metricName, adaptationTimeEpoch);

// Perform ensembling
Prediction ensembledPrediction = ensemblingMechanism.poolPredictions(predictionsByMethod, metricName);
log.info("Ensembled prediction created for application '{}' and metric '{}' with value: {}", appName, metricName, ensembledPrediction);
//TODO
// Store the ensembled prediction in the registry
// predictionRegistry.storeEnsembledPrediction(appName, ensembledPrediction);
// log.info("Ensembled prediction for application '{}' and metric '{}' stored successfully.", appName, metricName);

// Retrieve the Publisher via PublisherFactory
Publisher publisher = publisherFactory.getOrCreatePublisher(metricName);

if (publisher != null) {
try {
// Send the ensembled prediction for publishing
publisher.send(Prediction.toMap(ensembledPrediction), appName);
log.info("Ensembled prediction for application '{}' and metric '{}' sent to publisher successfully.", appName, metricName);
} catch (Exception e) {
log.error("Failed to send ensembled prediction for application '{}' and metric '{}'. Exception: {}", appName, metricName, e.getMessage(), e);
}
} else {
log.error("Unable to obtain Publisher for metric '{}' and application '{}'.", metricName, appName);
}

} else {
log.warn("No predictions found for application '{}' and metric '{}' at timestamp {}.", appName, metricName, adaptationTimeEpoch);
}
} else {
log.error("PredictionRegistry is null. Cannot pull predictions for application '{}' and metric '{}'.", appName, metricName);
}
}
}

This file was deleted.

Loading

0 comments on commit b7b43fe

Please sign in to comment.