Skip to content

Commit

Permalink
hotfix-deleteing-filtering-application (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmarchel7bulls authored Sep 25, 2024
1 parent b9f7e4c commit 28b5755
Showing 1 changed file with 10 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
package eu.nebulouscloud.predictionorchestrator.communication.consumers;

import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.predictionorchestrator.config.MethodConfig;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Handler;
import eu.nebulouscloud.predictionorchestrator.Orchestrator;
import eu.nebulouscloud.predictionorchestrator.Properties;
import eu.nebulouscloud.predictionorchestrator.communication.messages.Metric;
import eu.nebulouscloud.predictionorchestrator.communication.messages.MetricListMessage;
import eu.nebulouscloud.predictionorchestrator.communication.messages.StartForecastingMessage;
import eu.nebulouscloud.predictionorchestrator.communication.publishers.StartForecastingPublisher;
import eu.nebulouscloud.predictionorchestrator.config.MethodConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.protonj2.client.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import eu.nebulouscloud.exn.core.Handler;
import eu.nebulouscloud.exn.core.Context;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -83,13 +82,11 @@ public void onMessage(String key, String address, Map body, Message message, Con

long epochStart = System.currentTimeMillis() / 1000; // Ensure consistency by using seconds

if (Objects.equals(metricListMessage.getName(), "_Application1")) {
// Register the application with the orchestrator
orchestrator.addApplication(appName,
LocalDateTime.ofEpochSecond(epochStart, 0, ZoneOffset.UTC),
metricNames
);
}
// Register the application with the orchestrator
orchestrator.addApplication(appName,
LocalDateTime.ofEpochSecond(epochStart, 0, ZoneOffset.UTC),
metricNames
);

// Prepare StartForecastingMessage with consistent timestamping
StartForecastingMessage startForecastingMessage = mapToStartForecastingMessage(
Expand Down Expand Up @@ -132,10 +129,8 @@ public void onMessage(String key, String address, Map body, Message message, Con

// Safely send the message using the publisher
try {
if (Objects.equals(metricListMessage.getName(), "_Application1")) {
startForecastingPublisher.send(startForecastingMessageToMap(startForecastingMessage), metricListMessage.getName());
log.info("Start forecasting event published for application {}; method {}; message: {}", appName, method, startForecastingMessageToMap(startForecastingMessage));
}
startForecastingPublisher.send(startForecastingMessageToMap(startForecastingMessage), metricListMessage.getName());
log.info("Start forecasting event published for application {}; method {}; message: {}", appName, method, startForecastingMessageToMap(startForecastingMessage));
} catch (NullPointerException e) {
log.error("Publisher is null for method {}. Could not send message. Details: {}", method, e.getMessage(), e);
} catch (Exception e) {
Expand Down

0 comments on commit 28b5755

Please sign in to comment.