diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java index 7ff3345..0702c7b 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java @@ -67,7 +67,8 @@ public class ResourceDiscoveryProperties { private boolean salRegistrationEnabled = true; private long salRegistrationTimeout = 60*1000; private String registration_topic_name = "eu.nebulouscloud.exn.sal.node.create"; - + private String deregistration_topic_prefix = "eu.nebulouscloud.exn.sal.node.delete"; + // Failed devices detection private boolean automaticFailedDetection = true; private long suspectDeviceThreshold = 5; // in minutes diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/SecurityConfig.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/SecurityConfig.java index 4849cb3..9b838af 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/SecurityConfig.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/SecurityConfig.java @@ -1,5 +1,6 @@ package eu.nebulous.resource.discovery; +import eu.nebulous.resource.discovery.registration.controller.RegistrationRequestController; import jakarta.servlet.Filter; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; @@ -15,6 +16,7 @@ import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; import org.springframework.security.config.annotation.web.configurers.AbstractHttpConfigurer; import org.springframework.security.config.http.SessionCreationPolicy; +import org.springframework.security.core.Authentication; import org.springframework.security.core.authority.SimpleGrantedAuthority; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.userdetails.User; @@ -26,6 +28,7 @@ import java.security.SecureRandom; import java.util.Collections; +import java.util.HashMap; import static org.springframework.security.config.Customizer.withDefaults; @@ -37,6 +40,8 @@ public class SecurityConfig { private final static String USERNAME_REQUEST_HEADER = "X-SSO-USER"; private final static String USERNAME_REQUEST_PARAM = "ssoUser"; + private final static String NONCE_REQUEST_PARAM = "nonce"; + private final static String APPID_REQUEST_PARAM = "appId"; private final static String API_KEY_REQUEST_HEADER = "X-API-KEY"; private final static String API_KEY_REQUEST_PARAM = "apiKey"; @@ -54,6 +59,7 @@ public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity) throws "/discovery/**", "/*.html").authenticated()) .authorizeHttpRequests(authorize -> authorize.anyRequest().permitAll()) .addFilterAfter(apiKeyAuthenticationFilter(), UsernamePasswordAuthenticationFilter.class) + .addFilterAfter(nonceAuthenticationFilter(), UsernamePasswordAuthenticationFilter.class) .csrf(AbstractHttpConfigurer::disable) .sessionManagement(sm -> sm.sessionCreationPolicy(SessionCreationPolicy.ALWAYS)); @@ -100,6 +106,13 @@ public boolean matches(CharSequence rawPassword, String encodedPassword) { public Filter apiKeyAuthenticationFilter() { return (servletRequest, servletResponse, filterChain) -> { + + Authentication auth = SecurityContextHolder.getContext().getAuthentication(); + if (auth!=null && auth.isAuthenticated()) { + filterChain.doFilter(servletRequest, servletResponse); + return; + } + if (properties.isApiKeyAuthenticationEnabled() && StringUtils.isNotBlank(properties.getApiKeyValue())) { if (servletRequest instanceof HttpServletRequest request && servletResponse instanceof HttpServletResponse) { @@ -149,4 +162,65 @@ public Filter apiKeyAuthenticationFilter() { filterChain.doFilter(servletRequest, servletResponse); }; } + + public Filter nonceAuthenticationFilter(){ + return (servletRequest, servletResponse, filterChain) -> { + try { + HttpServletRequest request = ((HttpServletRequest )servletRequest); +// HttpSession session = request.getSession(false); +// +// if(session!=null){ +// filterChain.doFilter(servletRequest, servletResponse); +// return; +// } + Authentication auth = SecurityContextHolder.getContext().getAuthentication(); + if (auth!=null && auth.isAuthenticated()) { + filterChain.doFilter(servletRequest, servletResponse); + return; + } + StringBuilder requestURL = new StringBuilder(request.getRequestURL().toString()); + String queryString = request.getQueryString(); + + if (queryString == null) { + log.warn( requestURL.toString()); + } else { + log.warn(requestURL.append('?').append(queryString).toString()); + } + log.warn(servletRequest.toString()); + + String nonce = servletRequest.getParameter(NONCE_REQUEST_PARAM); + String appId = servletRequest.getParameter(APPID_REQUEST_PARAM); + + String username =null; + HashMap map = new HashMap<>(); + map.put(NONCE_REQUEST_PARAM, nonce); + map.put(APPID_REQUEST_PARAM, appId); + username = RegistrationRequestController.getNonceUsername(map); +// if ((nonce != null && appId != null) && (!nonce.isEmpty())) { +// HashMap map = new HashMap<>(); +// map.put(NONCE_REQUEST_PARAM, nonce); +// map.put(APPID_REQUEST_PARAM, appId); +// username = RegistrationRequestController.getNonceUsername(map); +// } + + if (username != null) { + UsernamePasswordAuthenticationToken authentication = + new UsernamePasswordAuthenticationToken(username, nonce, + Collections.singletonList(new SimpleGrantedAuthority(SSO_USER_ROLE))); + // store completed authentication in security context + SecurityContextHolder.getContext().setAuthentication(authentication); + log.debug("User was authenticated using a nonce token"); + } + + } catch (Exception e) { + log.error("nonceAuthenticationFilter: EXCEPTION: ", e); + } + + // continue down the chain + filterChain.doFilter(servletRequest, servletResponse); + + }; + } + + } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java index 6863080..42a59ee 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java @@ -63,7 +63,11 @@ public BrokerPublisher(String topic, String broker_ip, int broker_port, String b throw new RuntimeException(e); } } - active_connector.stop(); + try { + active_connector.stop(); //TODO reassure expected stop() functionality is working here when this is necessary + }catch (Exception e){ + e.printStackTrace(); + } } } @@ -104,23 +108,27 @@ public BrokerPublisher(String topic, String broker_ip, int broker_port, String b //TODO The methods below assume that the only content to be sent is json-like public void publish (String json_string_content, Collection application_names){ - for (String application_name : application_names) { - JSONParser parser = new JSONParser(); - JSONObject json_object = new JSONObject(); - try { - json_object = (JSONObject) parser.parse(json_string_content); - } catch (ParseException p) { - log.warn( "publish: Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content); - } - if (!is_publisher_null()) { - private_publisher_instance.send(json_object); - log.info("Sent new message\n"+json_object.toJSONString()); - } else { - log.error( "Could not send message to AMQP broker, as the publisher instance is null"); - } + publish(json_string_content); } } + + public void publish (String json_string_content){ + JSONParser parser = new JSONParser(); + JSONObject json_object = new JSONObject(); + try { + json_object = (JSONObject) parser.parse(json_string_content); + } catch (ParseException p) { + log.warn( "publish: Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content); + } + if (!is_publisher_null()) { + private_publisher_instance.send(json_object); + log.info("Sent new message\n"+json_object.toJSONString()); + } else { + log.error( "Could not send message to AMQP broker, as the publisher instance is null"); + } + } + public boolean is_publisher_null(){ return (private_publisher_instance == null); } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriber.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriber.java index 8963f9d..cbc0622 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriber.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriber.java @@ -7,6 +7,7 @@ import eu.nebulouscloud.exn.settings.StaticExnConfig; import lombok.extern.slf4j.Slf4j; import org.apache.qpid.protonj2.client.Message; +import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.json.simple.JSONValue; import java.util.*; @@ -15,11 +16,13 @@ import static eu.nebulous.resource.discovery.broker_communication.BrokerPublisher.EMPTY; + @Slf4j public class BrokerSubscriber { - private AtomicBoolean stop_signal = new AtomicBoolean(); - + private Connector connector; + private final AtomicBoolean stop_signal = new AtomicBoolean(false); + private class MessageProcessingHandler extends Handler { private BrokerSubscriptionDetails broker_details; private static final BiFunction temporary_function = (Object o, Object o2) -> { @@ -31,8 +34,18 @@ private class MessageProcessingHandler extends Handler { @Override public void onMessage(String key, String address, Map body, Message message, Context context) { - log.info("Handling message for address " + address); - processing_function.apply(broker_details, JSONValue.toJSONString(body)); + log.warn("Handling message for address " + address); + String precise_topic; + try { + precise_topic = message.to(); + } catch (ClientException e) { + throw new RuntimeException(e); + } + if (precise_topic!=null){ + broker_details.setTopic(precise_topic); + } + + processing_function.apply(new BrokerSubscriptionDetails(broker_ip,broker_port,brokerUsername,brokerPassword,application_name,precise_topic), JSONValue.toJSONString(body)); } public MessageProcessingHandler(BrokerSubscriptionDetails broker_details) { @@ -56,10 +69,11 @@ public void setProcessing_function(BiFunction processing_function) { private static HashMap> broker_and_topics_to_subscribe_to = new HashMap<>(); private static HashMap> active_consumers_per_topic_per_broker_ip = new HashMap<>(); - private static HashMap current_connectors = new HashMap<>(); + private static final HashMap current_connector_handlers = new HashMap<>(); ArrayList consumers = new ArrayList<>(); private String topic; private String broker_ip; + private String application_name; private int broker_port; private String brokerUsername; private String brokerPassword; @@ -101,32 +115,77 @@ public BrokerSubscriber(String topic, String broker_ip, int broker_port, String } } if (subscriber_configuration_changed) { - Consumer current_consumer; - if (application_name != null && !application_name.equals(EMPTY)) { //Create a consumer for one application - log.info("APP level subscriber " + topic); - current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), application_name, true, true); - } else { //Allow the consumer to get information from any publisher - current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), true, true); - log.info("HIGH level subscriber " + topic); - } - active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, current_consumer); + this.application_name = application_name; this.topic = topic; this.broker_ip = broker_ip; this.broker_port = broker_port; this.brokerUsername = brokerUsername; this.brokerPassword = brokerPassword; - add_topic_consumer_to_broker_connector(current_consumer); + } } + + public void stop(){ + synchronized (stop_signal) { + stop_signal.set(true); + } + } + /** * This method updates the global connector of Resource manager to the AMQP server, by adding support for one more component */ private void add_topic_consumer_to_broker_connector(Consumer new_consumer) { - if (current_connectors.get(broker_ip) != null) { - //current_connectors.get(broker_ip).stop(consumers,new ArrayList<>()); - current_connectors.get(broker_ip).stop(); +/* Consumer current_consumer; + if (application_name != null && !application_name.equals(EMPTY)) { //Create a consumer for one application + Logger.getAnonymousLogger().log(INFO,"APP level subscriber " + topic); + current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), application_name, true, true); + } else { //Allow the consumer to get information from any publisher + current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), true, true); + Logger.getAnonymousLogger().log(INFO,"HIGH level subscriber " + topic); + } + current_consumer.setProperty("topic",topic);*/ + + active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer); + + CustomConnectorHandler current_connector_handler = current_connector_handlers.get(broker_ip); + if (current_connector_handler==null){ + current_connector_handler = new CustomConnectorHandler() {}; + this.connector = new Connector("slo_violation_detector_consumer", + current_connector_handler, + List.of(), + List.of(new_consumer), + false, + false, + new StaticExnConfig( + broker_ip, + broker_port, + brokerUsername, + brokerPassword, + 60, + EMPTY + ) + ); + connector.start(); + }else if (!active_consumers_per_topic_per_broker_ip.get(broker_ip).containsKey(topic)){ + //current_connector_handler.remove_consumer_with_key(topic); + + synchronized (current_connector_handler.getReady()) { + while (!current_connector_handler.getReady().get()) { + try { + current_connector_handler.getReady().wait(); + log.info("Unable to register connector handler as the connector is unexpectedly not ready"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + log.warn("Adding new consumer " + new_consumer.key()); + current_connector_handler.add_consumer(new_consumer); + } + /*if (current_connectors.get(broker_ip) != null) { + current_connectors.get(broker_ip).stop(consumers,new ArrayList<>()); } if (consumers.isEmpty()){ consumers = new ArrayList<>(); @@ -140,7 +199,7 @@ private void add_topic_consumer_to_broker_connector(Consumer new_consumer) { if(!do_not_add_new_consumer) { consumers.add(new_consumer); } - Connector connector = new Connector("resource_manager", + Connector extended_connector = new Connector("resource_manager", new CustomConnectorHandler() { }, List.of(), @@ -156,21 +215,13 @@ private void add_topic_consumer_to_broker_connector(Consumer new_consumer) { EMPTY ) ); - connector.start(); - current_connectors.put(broker_ip, connector); - } - - private void remove_topic_from_broker_connector(String topic_key) { - if (current_connectors.get(broker_ip) != null) { - //current_connectors.get(broker_ip).remove_consumer_with_key(topic_key); - } + extended_connector.start(); + current_connectors.put(broker_ip, extended_connector);*/ } + public int subscribe (BiFunction function, String application_name) { - return subscribe(function,application_name,stop_signal); - } - - public int subscribe(BiFunction function, String application_name, AtomicBoolean stop_signal) { + int exit_status = -1; log.info("ESTABLISHING SUBSCRIPTION for " + topic); /* @@ -181,7 +232,8 @@ public int subscribe(BiFunction function, String application_name, AtomicBoolean } else { active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>()); } - */ + + */ //Then add the new consumer Consumer new_consumer; if (application_name != null && !application_name.equals(EMPTY)) { @@ -191,7 +243,13 @@ public int subscribe(BiFunction function, String application_name, AtomicBoolean new_consumer = new Consumer(topic, topic, new MessageProcessingHandler(function, broker_details), true, true); } new_consumer.setProperty("topic", topic); - active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer); + + if (active_consumers_per_topic_per_broker_ip.containsKey(broker_ip)){ + active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer); + }else { + active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>()); + } + add_topic_consumer_to_broker_connector(new_consumer); log.info("ESTABLISHED SUBSCRIPTION to topic " + topic); @@ -209,8 +267,9 @@ public int subscribe(BiFunction function, String application_name, AtomicBoolean } active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic); //remove_topic_from_broker_connector(topic); - current_connectors.get(broker_ip).stop(); + //current_connectors.get(broker_ip).stop(); exit_status = 0; return exit_status; } + } \ No newline at end of file diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java index 93c9cdd..1ea0b62 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java @@ -1,5 +1,6 @@ package eu.nebulous.resource.discovery.broker_communication; +import eu.nebulouscloud.exn.core.Consumer; import eu.nebulouscloud.exn.core.Context; import eu.nebulouscloud.exn.handlers.ConnectorHandler; @@ -20,6 +21,10 @@ public void onReady(Context context) { public void remove_consumer_with_key(String key){ context.unregisterConsumer(key); } + + public void add_consumer(Consumer consumer){ + context.registerConsumer(consumer); + } public Context getContext() { return context; diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SALCommunicator.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SALCommunicator.java index b7a598e..526525e 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SALCommunicator.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SALCommunicator.java @@ -102,9 +102,9 @@ private static void register_devices(String request_body_file, String sessionID, ((JSONObject) ((JSONArray) json.get("ipAddresses")).get(0)).put("value", internal_ip_address + counter); ((JSONObject) json.get("nodeProperties")).put("disk", new Random().nextInt(1, 101)); - ((JSONObject) json.get("nodeProperties")).put("memory", new Random().nextInt(1, 17)); + ((JSONObject) json.get("nodeProperties")).put("ram", new Random().nextInt(1, 17)); ((JSONObject) json.get("nodeProperties")).put("providerId", String.valueOf(new Random().nextInt(1, 21))); - ((JSONObject) json.get("nodeProperties")).put("numberOfCores", new Random().nextInt(1, 17)); + ((JSONObject) json.get("nodeProperties")).put("cores", new Random().nextInt(1, 17)); String[] country_choices = {"Greece", "Poland", "France"}; String[] city_choices = {"Athens", "Warsaw", "Nice"}; @@ -119,9 +119,9 @@ private static void register_devices(String request_body_file, String sessionID, ((JSONObject) ((JSONArray) json.get("ipAddresses")).get(0)).put("value", internal_ip_address); ((JSONObject) ((JSONArray) json.get("ipAddresses")).get(1)).put("value", external_ip_address); ((JSONObject) json.get("nodeProperties")).put("disk", disk_gb); - ((JSONObject) json.get("nodeProperties")).put("memory", ram_gb); + ((JSONObject) json.get("nodeProperties")).put("ram", ram_gb); ((JSONObject) json.get("nodeProperties")).put("providerId", provider_id); - ((JSONObject) json.get("nodeProperties")).put("numberOfCores", cpu_cores); + ((JSONObject) json.get("nodeProperties")).put("cores", cpu_cores); ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("country", country_name); ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("city", city_name); ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("latitude", new Random().nextFloat(-90, 90)); @@ -144,11 +144,11 @@ private static void register_devices(String request_body_file, String sessionID, public static String get_device_deregistration_json(Device device){ JSONObject root_json_object = new JSONObject(); - root_json_object.put("name",device.getRef()); + //root_json_object.put("name",device.getRef()); return root_json_object.toJSONString(); } - public static String get_device_registration_json(String internal_ip_address, String external_ip_address, int external_access_port, String os_family, String os_architecture, String jar_url, int os_version, int cpu_cores, long ram_gb, long disk_gb, String device_name,Double price, String provider_id, String city_name, String country_name, String device_username, String device_password, String private_key, double device_longitude, double device_latitude) { + public static String get_device_registration_json(String internal_ip_address, String external_ip_address, int external_access_port, String os_family, String os_architecture, String jar_url, int os_version, int cpu_cores, long ram_gb, long disk_gb, int number_of_gpus, String device_name,Double price, String provider_id, String city_name, String country_name, String device_username, String device_password, String private_key, double device_longitude, double device_latitude) { JSONObject root_json_object = new JSONObject(); JSONObject loginCredential = new JSONObject(); @@ -182,10 +182,11 @@ public static String get_device_registration_json(String internal_ip_address, St geoLocation.put("longitude", device_latitude); nodeProperties.put("providerId", provider_id); - nodeProperties.put("numberOfCores", cpu_cores); - nodeProperties.put("memory", ram_gb); + nodeProperties.put("cores", cpu_cores); + nodeProperties.put("ram", ram_gb); nodeProperties.put("disk", disk_gb); nodeProperties.put("price", price); + nodeProperties.put("gpu", number_of_gpus); nodeProperties.put("operatingSystem", operatingSystem); nodeProperties.put("geoLocation", geoLocation); @@ -209,9 +210,9 @@ public static String get_device_registration_json(String internal_ip_address, St //((JSONObject) ((JSONArray) root_json_object.get("ipAddresses")).get(0)).put("value", internal_ip_address); //((JSONObject) ((JSONArray) root_json_object.get("ipAddresses")).get(1)).put("value", external_ip_address); //((JSONObject) root_json_object.get("nodeProperties")).put("disk", disk_gb); - //((JSONObject) root_json_object.get("nodeProperties")).put("memory", ram_gb); + //((JSONObject) root_json_object.get("nodeProperties")).put("ram", ram_gb); //((JSONObject) root_json_object.get("nodeProperties")).put("providerId", provider_id); - //((JSONObject) root_json_object.get("nodeProperties")).put("numberOfCores", cpu_cores); + //((JSONObject) root_json_object.get("nodeProperties")).put("cores", cpu_cores); //((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("country", country_name); //((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("city", city_name); //((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("latitude", new Random().nextFloat(-90, 90)); diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/common/BrokerUtil.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/common/BrokerUtil.java index d308d64..964e966 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/common/BrokerUtil.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/common/BrokerUtil.java @@ -200,9 +200,18 @@ public void onMessage(Message message) { Object obj = objectMapper.readerFor(typeRef).readValue(payload); if (obj instanceof Map dataMap) { - handlePayload(((ActiveMQTextMessage) message).getDestination().getPhysicalName(), dataMap); + String topic = ((ActiveMQTextMessage) message).getDestination().getPhysicalName(); + // Print response messages except the EMS node status reports (_ui_instance_info, _client_metrics) + if (StringUtils.isNotBlank(topic) + && ! topic.equals(properties.getDeviceStatusMonitorTopic()) + && ! topic.equals(properties.getDeviceMetricsMonitorTopic())) + { + log.warn("BrokerUtil: Received a new message: topic: {}", topic); + log.warn("BrokerUtil: Received a new message: payload: {}", dataMap); + } + handlePayload(topic, dataMap); } else { - log.debug("BrokerUtil: Message payload is not recognized. Expected Map but got: type={}, object={}", obj.getClass().getName(), obj); + log.warn("BrokerUtil: Message payload is not recognized. Expected Map but got: type={}, object={}", obj.getClass().getName(), obj); } } else { log.debug("BrokerUtil: Message type is not supported: {}", message); diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java index 2ea34c3..eaa5c79 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java @@ -49,6 +49,7 @@ public class DeviceProcessor implements InitializingBean { private final TaskScheduler taskScheduler; private final AtomicBoolean isRunning = new AtomicBoolean(false); private final Optional salDeregistrationService; + private BrokerPublisher device_lost_publisher; @Override @@ -144,13 +145,13 @@ private void processFailedDevices() { lost_device_message.put("device_name",device.getName()); Clock clock = Clock.systemUTC(); lost_device_message.put("timestamp",(int)(clock.millis()/1000)); - log.info("Creating new BrokerPublisher to publish device lost message"); - BrokerPublisher device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); + log.info("Trying to use existing BrokerPublisher to publish device lost message"); + //device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); int sending_attempt = 1; - while (device_lost_publisher.is_publisher_null()){ + while (device_lost_publisher==null || device_lost_publisher.is_publisher_null()){ try { - log.warn("Will now make attempt No "+sending_attempt+" to recreate the BrokerPublisher connector for the lost device topic"); + log.warn("Will now make attempt No "+sending_attempt+" to (re)create the BrokerPublisher connector for the lost device topic"); log.info("The topic name is "+processorProperties.getLost_device_topic()+", the broker ip is "+ processorProperties.getNebulous_broker_ip_address()+", the broker port is "+ processorProperties.getNebulous_broker_port()+", the username is "+ processorProperties.getNebulous_broker_username()+", and the password is "+ processorProperties.getNebulous_broker_password()); device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); @@ -163,9 +164,9 @@ private void processFailedDevices() { } device_lost_publisher.publish(lost_device_message.toJSONString(), Collections.singleton("")); log.warn("processFailedDevices: Marked as FAILED device with Id: {}", device.getId()); - device_lost_publisher.stop(); + //device_lost_publisher.stop(); } - + device.setStatus(DeviceStatus.OFFBOARDED); deviceManagementService.update(device); } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/controller/DeviceManagementController.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/controller/DeviceManagementController.java index 48dbc45..8755e78 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/controller/DeviceManagementController.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/controller/DeviceManagementController.java @@ -91,6 +91,7 @@ public Device getDeviceByIpAddress(@PathVariable String ipAddress) { @PutMapping(value = "/device", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public Device createDevice(@RequestBody Device device) { + log.warn("DeviceManagementController: createDevice: device: {}", device); salRegistrationService.ifPresent(salRegistrationService -> salRegistrationService.register(device)); return deviceService.save(device); diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java index f3df47e..b1eafa8 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java @@ -16,8 +16,9 @@ @NoArgsConstructor @Document(collection = "device") public class Device { - private String id; - private String ref; + private String id; //This is the internal id of the device in the Resource Discovery component + private String sal_id; //This identifier is used by SAL, and is used for deregistration + private String ref; //This identifier is used by the Cloud Fog Service Broker, to ascertain which devices are available for which applications private Double price; private String os; private String name; diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/UnknownDeviceRegistrationService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/UnknownDeviceRegistrationService.java index b544fcc..65fe627 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/UnknownDeviceRegistrationService.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/UnknownDeviceRegistrationService.java @@ -253,7 +253,7 @@ private void processDeviceDetailsResponses() { log.info("UnknownDeviceRegistrationService: Registered device: {}", newDevice); if (salRegistrationService.isPresent()) { - log.info("Registering the device {} to SAL...", newDevice); + log.info("Registering the device to SAL: {}", newDevice); salRegistrationService.get().register(newDevice); } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java index df5e583..bee88c4 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java @@ -240,6 +240,7 @@ private void processResponse(@NonNull Map response) { long timestamp = Long.parseLong(response.getOrDefault("timestamp", "-1").toString().trim()); RegistrationRequest registrationRequest = registrationRequestService.getById(requestId).orElse(null); + log.warn("RegistrationRequestProcessor: processResponse: request: {}", registrationRequest); if (registrationRequest!=null) { RegistrationRequestStatus currStatus = registrationRequest.getStatus(); RegistrationRequestStatus newStatus = switch (currStatus) { @@ -288,6 +289,20 @@ private void processResponse(@NonNull Map response) { boolean doArchive = false; Object obj = response.get("nodeInfo"); + log.warn("RegistrationRequestProcessor: processResponse: nodeInfo: {} {}", obj==null?null:obj.getClass().getTypeName(), obj); + + // If device info are missing copy them from the registration request + if (obj==null || obj instanceof Map && ((Map) obj).isEmpty()) { + log.warn("RegistrationRequestProcessor: processResponse: nodeInfo: ** NO DEVICE INFO IN RESPONSE **"); + if (! registrationRequest.getDevice().getDeviceInfo().isEmpty()) { + obj = registrationRequest.getDevice().getDeviceInfo(); + log.warn("RegistrationRequestProcessor: processResponse: nodeInfo: ** DEVICE INFO COPIED FROM REGISTRATION REQUEST **"); + log.warn("RegistrationRequestProcessor: processResponse: nodeInfo: {}", obj); + } else { + log.warn("RegistrationRequestProcessor: processResponse: nodeInfo: ** PROBLEM: REGISTRATION REQUEST DOES NOT CONTAIN DEVICE INFO EITHER **"); + } + } + if (obj instanceof Map devInfo) { // Update request info registrationRequest.setLastUpdateDate(Instant.ofEpochMilli(timestamp)); @@ -297,12 +312,14 @@ private void processResponse(@NonNull Map response) { boolean allowAllKeys = processorProperties.getAllowedDeviceInfoKeys().contains("*"); final Map processedDevInfo = new LinkedHashMap<>(); devInfo.forEach((key, value) -> { + log.warn("RegistrationRequestProcessor: processResponse: Dev-info pair: {} = {}", key, value); if (key!=null && value!=null) { String k = key.toString().trim(); String v = value.toString().trim(); if (StringUtils.isNotBlank(k) && StringUtils.isNotBlank(v)) { if (allowAllKeys || processorProperties.getAllowedDeviceInfoKeys().contains(k.toUpperCase())) { processedDevInfo.put(k, v); + log.warn("RegistrationRequestProcessor: processResponse: Dev-info pair ADDED: {} = {}", key, value); } else { log.debug("processResponse: Not allowed device info key for request: id={}, key={}", requestId, k); } @@ -364,6 +381,8 @@ private RegistrationRequestStatus getNextStatus(RegistrationRequestStatus currSt } private void copyDeviceToMonitoring(RegistrationRequest registrationRequest) { + log.warn("RegistrationRequestProcessor: copyDeviceToMonitoring: BEGIN: request: {}", registrationRequest); + log.warn("RegistrationRequestProcessor: copyDeviceToMonitoring: request-DEVICE: {}", registrationRequest.getDevice()); Device device = objectMapper.convertValue(registrationRequest.getDevice(), Device.class); // override values device.setId(null); @@ -379,6 +398,7 @@ private void copyDeviceToMonitoring(RegistrationRequest registrationRequest) { device.setRequestId(registrationRequest.getId()); device.setNodeReference(registrationRequest.getNodeReference()); deviceManagementService.save(device); + log.warn("RegistrationRequestProcessor: copyDeviceToMonitoring: COPIED-DEVICE: {}", device); if (processorProperties.isSalRegistrationEnabled()) salRegistrationService.ifPresent(salRegistrationService -> salRegistrationService.queueForRegistration(device)); } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/controller/RegistrationRequestController.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/controller/RegistrationRequestController.java index 55f084a..1da46ce 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/controller/RegistrationRequestController.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/controller/RegistrationRequestController.java @@ -1,5 +1,9 @@ package eu.nebulous.resource.discovery.registration.controller; +import eu.nebulous.resource.discovery.ResourceDiscoveryProperties; +import eu.nebulous.resource.discovery.broker_communication.BrokerPublisher; +import eu.nebulous.resource.discovery.broker_communication.BrokerSubscriber; +import eu.nebulous.resource.discovery.broker_communication.BrokerSubscriptionDetails; import eu.nebulous.resource.discovery.registration.IRegistrationRequestProcessor; import eu.nebulous.resource.discovery.registration.model.ArchivedRegistrationRequest; import eu.nebulous.resource.discovery.registration.model.RegistrationRequest; @@ -9,28 +13,72 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.springframework.beans.factory.InitializingBean; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.core.Authentication; import org.springframework.web.bind.annotation.*; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.function.BiFunction; @Slf4j @RestController @RequiredArgsConstructor @RequestMapping("/discovery") -public class RegistrationRequestController { +public class RegistrationRequestController implements InitializingBean { private final static String REQUIRES_ADMIN_ROLE = "hasAuthority('ROLE_ADMIN')"; - + private final static String GET_USER_TOPIC = "eu.nebulouscloud.ui.user.get"; + private static ResourceDiscoveryProperties processorPropertiesStatic; + private final ResourceDiscoveryProperties processorProperties; private final RegistrationRequestService registrationRequestService; private final IRegistrationRequestProcessor registrationRequestProcessor; + + private static final Map nonce_messages = Collections.synchronizedMap(new HashMap<>()); + private static final Set nonce_message_published = Collections.synchronizedSet(new HashSet<>()); + private static boolean has_initialized_nonce_connector = false; + private static BrokerSubscriber nonce_subscriber; + private static BrokerPublisher nonce_publisher; + private static final int TIMEOUT_DURATION_SECONDS = 5; + + + @Override + public void afterPropertiesSet() throws Exception { + processorPropertiesStatic = processorProperties; + + log.debug("Initializing connector"); + if (!has_initialized_nonce_connector){ + nonce_publisher = new BrokerPublisher(GET_USER_TOPIC,processorPropertiesStatic.getNebulous_broker_ip_address(), processorPropertiesStatic.getNebulous_broker_port(), processorPropertiesStatic.getNebulous_broker_username(), processorPropertiesStatic.getNebulous_broker_password(), ""); + + nonce_subscriber = new BrokerSubscriber(GET_USER_TOPIC+".>",processorPropertiesStatic.getNebulous_broker_ip_address(), processorPropertiesStatic.getNebulous_broker_port(), processorPropertiesStatic.getNebulous_broker_username(),processorPropertiesStatic.getNebulous_broker_password(), "",""); + + log.debug("Defining function"); + BiFunction function = (broker_subscription_details, message_body) -> { + + String topic_suffix = broker_subscription_details.getTopic().replace("topic://"+GET_USER_TOPIC,""); + if (topic_suffix!=null && !topic_suffix.isEmpty()){ + String nonce_from_topic = StringUtils.substringAfterLast(topic_suffix,"."); + log.warn("Received message"+message_body+" at "+broker_subscription_details.getTopic()); + nonce_messages.put(nonce_from_topic,message_body); + nonce_message_published.add(nonce_from_topic); + } + return message_body; + }; + log.debug("Starting subscription thread"); + Thread subscriber_thread = new Thread (()-> nonce_subscriber.subscribe(function,""));//Could have a particular application name instead of empty here if needed + subscriber_thread.start(); + has_initialized_nonce_connector = true; + } + + } + @GetMapping(value = "/whoami", produces = MediaType.APPLICATION_JSON_VALUE) public Map whoami(Authentication authentication) { List roles = authentication != null @@ -70,6 +118,64 @@ public RegistrationRequest getRequest(@PathVariable String id, Authentication au .orElseThrow(() -> new RegistrationRequestException("Not found registration request with id: "+id)); } + //@GetMapping(value= "/nonce_username", produces = MediaType.APPLICATION_JSON_VALUE) + public static String getNonceUsername(@RequestParam Map data) { + + //Get the nonce from the provided data, along with the appId + //initialize the BrokerPublisher + //Parse the response of the SyncedPublisher + //Return an appropriate json object to the client + log.debug("Initializing processing"); + String nonce = (String) data.get("nonce"); + String appId = (String) data.get("appId"); + + JSONObject json_request = new JSONObject(); + json_request.put("nonce",nonce); + json_request.put("appId",appId); + + String empty_response = null; //"{\"nonce\": \"" + nonce + "\", \"username\": \"" + "" + "\"}"; + + log.debug("Sending nonce message to middleware"); + nonce_publisher.publish(json_request.toJSONString(),List.of("")); + + + int cumulative_sleep = 0; + int sleep_duration_millis = 500; + + JSONParser parser = new JSONParser(); + + while (!nonce_message_published.contains(nonce)){ + log.debug("While iteration, nonce messages {}",nonce_messages.keySet()); + log.debug("While iteration, nonce message published {}",nonce_message_published); + if (cumulative_sleep>= TIMEOUT_DURATION_SECONDS *1000){ + //nonce_message_published.remove(nonce); + //nonce_messages.remove(nonce); + return empty_response; + } + try { + Thread.sleep(sleep_duration_millis); + cumulative_sleep = cumulative_sleep + sleep_duration_millis; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + try { + + //nonce_message_published.remove(nonce); + //nonce_messages.remove(nonce); + + JSONObject response = (JSONObject) parser.parse(nonce_messages.get(nonce)); + String username = (String) response.get("username"); + + // Return an appropriate JSON object to the client + //return "{\"nonce\": \"" + nonce + "\", \"username\": \"" + username + "\"}"; + return username; + } catch (ParseException e) { + throw new RuntimeException(e); + } + + } + @PutMapping(value = "/request", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public RegistrationRequest createRequest(@RequestBody RegistrationRequest registrationRequest, Authentication authentication) { return registrationRequestService.saveAsUser(registrationRequest, authentication); @@ -159,4 +265,5 @@ public Map handleRegistrationRequestException(RegistrationRequest "message", exception.getMessage() ); } + } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALDeregistrationService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALDeregistrationService.java index 95099a2..cbad55c 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALDeregistrationService.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALDeregistrationService.java @@ -62,7 +62,7 @@ public void deregister(Device device) { if (processorProperties.isDeregistration_emulated()){ return; } - SynchronousBrokerPublisher deregister_device_publisher = new SynchronousBrokerPublisher(get_deregistration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); + SynchronousBrokerPublisher deregister_device_publisher = new SynchronousBrokerPublisher(get_deregistration_topic_name(device.getSal_id()), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); int sending_attempt = 1; while (deregister_device_publisher.is_publisher_null()) { if (sending_attempt <= 2) { @@ -81,7 +81,7 @@ public void deregister(Device device) { //TODO handle the response here if (deregister_device_message_string!=null && !deregister_device_message_string.isEmpty()) { Map response = deregister_device_publisher.publish_for_response(deregister_device_message_string, Collections.singleton(application_name)); - log.info("The response received while trying to deregister device " + device.getRef() + " is " + response.toString()); + log.warn("The response received while trying to deregister device " + device.getRef() + " is " + response.toString()); }else{ log.warn("Deregistration was to be initiated with an empty deregistration payload"); } @@ -100,9 +100,8 @@ public void deregister(Device device) { } - private String get_deregistration_topic_name(String application_name) { - return processorProperties.getRegistration_topic_name(); - //return ("eu.nebulouscloud.exn.sal.edge." + application_name); + private String get_deregistration_topic_name(String sal_id) { + return processorProperties.getDeregistration_topic_prefix()+"."+sal_id; } @Override diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java index a5b769a..7d3faf0 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java @@ -8,6 +8,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.core.task.TaskExecutor; @@ -37,20 +40,21 @@ public void queueForRegistration(@NonNull Device device) { queue.add(device); } - public void register(Device device) { - + public String register(Device device) { + log.warn("SALRegistrationService: register: BEGIN: device: {}", device); String application_name = device.getRef().split("\\|")[1]; if (application_name.equals("all_applications")){ application_name=""; } - String public_ip = System.getenv("NEBULOUS_IP"); + /*String public_ip = System.getenv("NEBULOUS_IP"); if (public_ip==null || public_ip.isEmpty()){ - log.warn("Using default IP address ("+processorProperties.getNebulous_server_ip_address()+") to fetch Proactive client jar files from, as the environmental variable was not set or found"); public_ip = processorProperties.getNebulous_server_ip_address(); - } + log.warn("Using default IP address ({}) to fetch Proactive client jar files from, as the environmental variable was not set or found", public_ip);}*/ + String public_ip = processorProperties.getNebulous_server_ip_address(); Map device_info = device.getDeviceInfo(); + log.warn("SALRegistrationService: register: DEVICE-INFO: {}", device_info); /* Information available from the EMS, based on https://gitlab.com/nebulous-project/ems-main/-/blob/master/ems-core/bin/detect.sh?ref_type=heads echo CPU_SOCKETS=$TMP_NUM_CPUS echo CPU_CORES=$TMP_NUM_CORES @@ -71,6 +75,7 @@ public void register(Device device) { String device_name = device.getRef(); Double price = device.getPrice(); + int number_of_gpus = 0; int cores = Integer.parseInt(device_info.get("CPU_PROCESSORS")); long ram_mb = Math.round(Integer.parseInt(device_info.get("RAM_TOTAL_KB"))*1.0/1000); long disk_mb = Math.round(Integer.parseInt(device_info.get("DISK_TOTAL_KB"))*1.0/1000); @@ -117,7 +122,7 @@ public void register(Device device) { //register_device_message.put("timestamp",(int)(clock.millis()/1000)); - String register_device_message_string = get_device_registration_json(internal_ip,external_ip_address,external_access_port,os_family,os_architecture,jar_url,os_version,cores,ram_mb,disk_mb,device_name,price,provider_id,city_name,country_name, device_username, device_password,private_key,device_longitude, device_latitude); + String register_device_message_string = get_device_registration_json(internal_ip,external_ip_address,external_access_port,os_family,os_architecture,jar_url,os_version,cores,ram_mb,disk_mb,number_of_gpus,device_name,price,provider_id,city_name,country_name, device_username, device_password,private_key,device_longitude, device_latitude); log.info("topic is {}", get_registration_topic_name(application_name)); log.info("broker ip is {}", processorProperties.getNebulous_broker_ip_address()); log.info("broker port is {}", processorProperties.getNebulous_broker_port()); @@ -144,7 +149,17 @@ public void register(Device device) { } //TODO handle the response here Map response = register_device_publisher.publish_for_response(register_device_message_string, Collections.singleton(application_name)); - log.info("The response received while trying to register device " + device_name + " is "+response.toString()); + log.warn("The response received while trying to register device " + device_name + " is "+response.toString()); + JSONObject response_json = new JSONObject(response); + JSONParser json_parser = new JSONParser(); + JSONObject response_json_body = null; + try { + response_json_body = (JSONObject) json_parser.parse(String.valueOf(response_json.get("body"))); + } catch (ParseException e) { + throw new RuntimeException(e); + } + String device_id = (String) response_json_body.get("id"); + return device_id; //} /* This is some realtime information, could be retrieved with a different call to the EMS. @@ -197,7 +212,8 @@ public void processQueue() { device = queue.take(); log.warn("SALRegistrationService: processQueue(): Will register device: {}", device); lastRegistrationStartTimestamp = System.currentTimeMillis(); - register(device); + String device_sal_id = register(device); + device.setSal_id(device_sal_id); lastRegistrationStartTimestamp = -1L; device.setRegisteredToSAL(true); deviceManagementService.update(device);