Skip to content

Commit

Permalink
Merge branch 'r1'
Browse files Browse the repository at this point in the history
# Conflicts:
#	resource-discovery/src/main/java/eu/nebulous/resource/discovery/common/BrokerUtil.java
  • Loading branch information
atsag committed Dec 5, 2024
2 parents ace0544 + 77f5c2d commit 20dcd0a
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.nebulous.resource.discovery;

import eu.nebulous.resource.discovery.registration.controller.RegistrationRequestController;
import jakarta.servlet.Filter;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
Expand All @@ -15,6 +16,7 @@
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configurers.AbstractHttpConfigurer;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.User;
Expand All @@ -26,6 +28,7 @@

import java.security.SecureRandom;
import java.util.Collections;
import java.util.HashMap;

import static org.springframework.security.config.Customizer.withDefaults;

Expand All @@ -37,6 +40,8 @@
public class SecurityConfig {
private final static String USERNAME_REQUEST_HEADER = "X-SSO-USER";
private final static String USERNAME_REQUEST_PARAM = "ssoUser";
private final static String NONCE_REQUEST_PARAM = "nonce";
private final static String APPID_REQUEST_PARAM = "appId";
private final static String API_KEY_REQUEST_HEADER = "X-API-KEY";
private final static String API_KEY_REQUEST_PARAM = "apiKey";

Expand All @@ -54,6 +59,7 @@ public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity) throws
"/discovery/**", "/*.html").authenticated())
.authorizeHttpRequests(authorize -> authorize.anyRequest().permitAll())
.addFilterAfter(apiKeyAuthenticationFilter(), UsernamePasswordAuthenticationFilter.class)
.addFilterAfter(nonceAuthenticationFilter(), UsernamePasswordAuthenticationFilter.class)
.csrf(AbstractHttpConfigurer::disable)
.sessionManagement(sm -> sm.sessionCreationPolicy(SessionCreationPolicy.ALWAYS));

Expand Down Expand Up @@ -100,6 +106,13 @@ public boolean matches(CharSequence rawPassword, String encodedPassword) {

public Filter apiKeyAuthenticationFilter() {
return (servletRequest, servletResponse, filterChain) -> {

Authentication auth = SecurityContextHolder.getContext().getAuthentication();
if (auth!=null && auth.isAuthenticated()) {
filterChain.doFilter(servletRequest, servletResponse);
return;
}

if (properties.isApiKeyAuthenticationEnabled() && StringUtils.isNotBlank(properties.getApiKeyValue())) {
if (servletRequest instanceof HttpServletRequest request && servletResponse instanceof HttpServletResponse) {

Expand Down Expand Up @@ -149,4 +162,65 @@ public Filter apiKeyAuthenticationFilter() {
filterChain.doFilter(servletRequest, servletResponse);
};
}

public Filter nonceAuthenticationFilter(){
return (servletRequest, servletResponse, filterChain) -> {
try {
HttpServletRequest request = ((HttpServletRequest )servletRequest);
// HttpSession session = request.getSession(false);
//
// if(session!=null){
// filterChain.doFilter(servletRequest, servletResponse);
// return;
// }
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
if (auth!=null && auth.isAuthenticated()) {
filterChain.doFilter(servletRequest, servletResponse);
return;
}
StringBuilder requestURL = new StringBuilder(request.getRequestURL().toString());
String queryString = request.getQueryString();

if (queryString == null) {
log.warn( requestURL.toString());
} else {
log.warn(requestURL.append('?').append(queryString).toString());
}
log.warn(servletRequest.toString());

String nonce = servletRequest.getParameter(NONCE_REQUEST_PARAM);
String appId = servletRequest.getParameter(APPID_REQUEST_PARAM);

String username =null;
HashMap<String, String> map = new HashMap<>();
map.put(NONCE_REQUEST_PARAM, nonce);
map.put(APPID_REQUEST_PARAM, appId);
username = RegistrationRequestController.getNonceUsername(map);
// if ((nonce != null && appId != null) && (!nonce.isEmpty())) {
// HashMap<String, String> map = new HashMap<>();
// map.put(NONCE_REQUEST_PARAM, nonce);
// map.put(APPID_REQUEST_PARAM, appId);
// username = RegistrationRequestController.getNonceUsername(map);
// }

if (username != null) {
UsernamePasswordAuthenticationToken authentication =
new UsernamePasswordAuthenticationToken(username, nonce,
Collections.singletonList(new SimpleGrantedAuthority(SSO_USER_ROLE)));
// store completed authentication in security context
SecurityContextHolder.getContext().setAuthentication(authentication);
log.debug("User was authenticated using a nonce token");
}

} catch (Exception e) {
log.error("nonceAuthenticationFilter: EXCEPTION: ", e);
}

// continue down the chain
filterChain.doFilter(servletRequest, servletResponse);

};
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.json.simple.JSONValue;

import java.util.*;
Expand All @@ -15,11 +16,13 @@

import static eu.nebulous.resource.discovery.broker_communication.BrokerPublisher.EMPTY;


@Slf4j
public class BrokerSubscriber {

private AtomicBoolean stop_signal = new AtomicBoolean();

private Connector connector;
private final AtomicBoolean stop_signal = new AtomicBoolean(false);

private class MessageProcessingHandler extends Handler {
private BrokerSubscriptionDetails broker_details;
private static final BiFunction temporary_function = (Object o, Object o2) -> {
Expand All @@ -31,8 +34,18 @@ private class MessageProcessingHandler extends Handler {

@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
log.info("Handling message for address " + address);
processing_function.apply(broker_details, JSONValue.toJSONString(body));
log.warn("Handling message for address " + address);
String precise_topic;
try {
precise_topic = message.to();
} catch (ClientException e) {
throw new RuntimeException(e);
}
if (precise_topic!=null){
broker_details.setTopic(precise_topic);
}

processing_function.apply(new BrokerSubscriptionDetails(broker_ip,broker_port,brokerUsername,brokerPassword,application_name,precise_topic), JSONValue.toJSONString(body));
}

public MessageProcessingHandler(BrokerSubscriptionDetails broker_details) {
Expand All @@ -56,10 +69,11 @@ public void setProcessing_function(BiFunction processing_function) {

private static HashMap<String, HashSet<String>> broker_and_topics_to_subscribe_to = new HashMap<>();
private static HashMap<String, HashMap<String, Consumer>> active_consumers_per_topic_per_broker_ip = new HashMap<>();
private static HashMap<String, Connector> current_connectors = new HashMap<>();
private static final HashMap<String, CustomConnectorHandler> current_connector_handlers = new HashMap<>();
ArrayList<Consumer> consumers = new ArrayList<>();
private String topic;
private String broker_ip;
private String application_name;
private int broker_port;
private String brokerUsername;
private String brokerPassword;
Expand Down Expand Up @@ -101,32 +115,77 @@ public BrokerSubscriber(String topic, String broker_ip, int broker_port, String
}
}
if (subscriber_configuration_changed) {
Consumer current_consumer;
if (application_name != null && !application_name.equals(EMPTY)) { //Create a consumer for one application
log.info("APP level subscriber " + topic);
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), application_name, true, true);
} else { //Allow the consumer to get information from any publisher
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), true, true);
log.info("HIGH level subscriber " + topic);
}
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, current_consumer);

this.application_name = application_name;
this.topic = topic;
this.broker_ip = broker_ip;
this.broker_port = broker_port;
this.brokerUsername = brokerUsername;
this.brokerPassword = brokerPassword;
add_topic_consumer_to_broker_connector(current_consumer);

}
}


public void stop(){
synchronized (stop_signal) {
stop_signal.set(true);
}
}

/**
* This method updates the global connector of Resource manager to the AMQP server, by adding support for one more component
*/
private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
if (current_connectors.get(broker_ip) != null) {
//current_connectors.get(broker_ip).stop(consumers,new ArrayList<>());
current_connectors.get(broker_ip).stop();
/* Consumer current_consumer;
if (application_name != null && !application_name.equals(EMPTY)) { //Create a consumer for one application
Logger.getAnonymousLogger().log(INFO,"APP level subscriber " + topic);
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), application_name, true, true);
} else { //Allow the consumer to get information from any publisher
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), true, true);
Logger.getAnonymousLogger().log(INFO,"HIGH level subscriber " + topic);
}
current_consumer.setProperty("topic",topic);*/

active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer);

CustomConnectorHandler current_connector_handler = current_connector_handlers.get(broker_ip);
if (current_connector_handler==null){
current_connector_handler = new CustomConnectorHandler() {};
this.connector = new Connector("slo_violation_detector_consumer",
current_connector_handler,
List.of(),
List.of(new_consumer),
false,
false,
new StaticExnConfig(
broker_ip,
broker_port,
brokerUsername,
brokerPassword,
60,
EMPTY
)
);
connector.start();
}else if (!active_consumers_per_topic_per_broker_ip.get(broker_ip).containsKey(topic)){
//current_connector_handler.remove_consumer_with_key(topic);

synchronized (current_connector_handler.getReady()) {
while (!current_connector_handler.getReady().get()) {
try {
current_connector_handler.getReady().wait();
log.info("Unable to register connector handler as the connector is unexpectedly not ready");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
log.warn("Adding new consumer " + new_consumer.key());
current_connector_handler.add_consumer(new_consumer);
}
/*if (current_connectors.get(broker_ip) != null) {
current_connectors.get(broker_ip).stop(consumers,new ArrayList<>());
}
if (consumers.isEmpty()){
consumers = new ArrayList<>();
Expand All @@ -140,7 +199,7 @@ private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
if(!do_not_add_new_consumer) {
consumers.add(new_consumer);
}
Connector connector = new Connector("resource_manager",
Connector extended_connector = new Connector("resource_manager",
new CustomConnectorHandler() {
},
List.of(),
Expand All @@ -156,21 +215,13 @@ private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
EMPTY
)
);
connector.start();
current_connectors.put(broker_ip, connector);
}

private void remove_topic_from_broker_connector(String topic_key) {
if (current_connectors.get(broker_ip) != null) {
//current_connectors.get(broker_ip).remove_consumer_with_key(topic_key);
}
extended_connector.start();
current_connectors.put(broker_ip, extended_connector);*/
}


public int subscribe (BiFunction function, String application_name) {
return subscribe(function,application_name,stop_signal);
}

public int subscribe(BiFunction function, String application_name, AtomicBoolean stop_signal) {

int exit_status = -1;
log.info("ESTABLISHING SUBSCRIPTION for " + topic);
/*
Expand All @@ -181,7 +232,8 @@ public int subscribe(BiFunction function, String application_name, AtomicBoolean
} else {
active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
}
*/
*/
//Then add the new consumer
Consumer new_consumer;
if (application_name != null && !application_name.equals(EMPTY)) {
Expand All @@ -191,7 +243,13 @@ public int subscribe(BiFunction function, String application_name, AtomicBoolean
new_consumer = new Consumer(topic, topic, new MessageProcessingHandler(function, broker_details), true, true);
}
new_consumer.setProperty("topic", topic);
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer);

if (active_consumers_per_topic_per_broker_ip.containsKey(broker_ip)){
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer);
}else {
active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
}

add_topic_consumer_to_broker_connector(new_consumer);

log.info("ESTABLISHED SUBSCRIPTION to topic " + topic);
Expand All @@ -209,8 +267,9 @@ public int subscribe(BiFunction function, String application_name, AtomicBoolean
}
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
//remove_topic_from_broker_connector(topic);
current_connectors.get(broker_ip).stop();
//current_connectors.get(broker_ip).stop();
exit_status = 0;
return exit_status;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.nebulous.resource.discovery.broker_communication;

import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;

Expand All @@ -20,6 +21,10 @@ public void onReady(Context context) {
public void remove_consumer_with_key(String key){
context.unregisterConsumer(key);
}

public void add_consumer(Consumer consumer){
context.registerConsumer(consumer);
}

public Context getContext() {
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,16 @@ public void onMessage(Message message) {
Object obj = objectMapper.readerFor(typeRef).readValue(payload);

if (obj instanceof Map<?,?> dataMap) {
log.warn("BrokerUtil: Received a new message: topic: {}", ((ActiveMQTextMessage) message).getDestination().getPhysicalName());
log.warn("BrokerUtil: Received a new message: payload: {}", dataMap);
handlePayload(((ActiveMQTextMessage) message).getDestination().getPhysicalName(), dataMap);
String topic = ((ActiveMQTextMessage) message).getDestination().getPhysicalName();
// Print response messages except the EMS node status reports (_ui_instance_info, _client_metrics)
if (StringUtils.isNotBlank(topic)
&& ! topic.equals(properties.getDeviceStatusMonitorTopic())
&& ! topic.equals(properties.getDeviceMetricsMonitorTopic()))
{
log.warn("BrokerUtil: Received a new message: topic: {}", topic);
log.warn("BrokerUtil: Received a new message: payload: {}", dataMap);
}
handlePayload(topic, dataMap);
} else {
log.warn("BrokerUtil: Message payload is not recognized. Expected Map but got: type={}, object={}", obj.getClass().getName(), obj);
}
Expand Down
Loading

0 comments on commit 20dcd0a

Please sign in to comment.