Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc improvements #52

Merged
merged 180 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
180 commits
Select commit Hold shift + click to select a range
2ab28fb
RD: Added device-view.html for viewing and editing device details. Fi…
ipatini Oct 6, 2023
0d0f291
RD: Moved 'metrics' section side-by-side to 'device info' section in …
ipatini Oct 6, 2023
4ba968f
RD: changed mongo db name to 'resource_discovery' in application.yml
ipatini Oct 7, 2023
46c5108
RD: Minor GUI improvements
ipatini Oct 7, 2023
7bdf6ed
RD: Added DeviceMonitorService to watch the status updates from EMS. …
ipatini Oct 7, 2023
4ede903
RD: Created AbstractMonitorService (to factor out common topic monito…
ipatini Oct 8, 2023
19756ca
RD: Added banner
ipatini Oct 8, 2023
299a585
RD: Added DeviceMetricsMonitorService and DeviceMetrics classes. Upda…
ipatini Oct 8, 2023
724802a
RD: Updated application.yml
ipatini Oct 9, 2023
8f6a075
RD: Added device metrics display in device-view.html. Added (EMS) dev…
ipatini Oct 9, 2023
958f200
RD: Updated device-view.html to display device metrics
ipatini Oct 9, 2023
2184a55
RD: Modified Device in monitoring subsystem to not have its credentia…
ipatini Oct 10, 2023
c08e439
RD: Added check for a request's device IP address being in use (in an…
ipatini Oct 10, 2023
69379f6
RD: various minor code improvements
ipatini Oct 10, 2023
1c635ce
RD: Added better error reporting in request-edit.html
ipatini Oct 10, 2023
d12c02b
RD: Various small GUI improvements and a fix
ipatini Oct 10, 2023
5cc0d2c
RD: Fixed registration Device in order toString() not to print creden…
ipatini Oct 11, 2023
f63597e
RD: Minor change in device-view.html to move 'count-' metrics to the …
ipatini Oct 11, 2023
60cd1f6
RD: Changed monitoring Device to not serialize credentials
ipatini Oct 11, 2023
413771c
RD: Code cleanup in RegistrationRequestProcessor
ipatini Oct 11, 2023
cb5e694
RD: Improved GUI (added page shortcuts at top-right)
ipatini Oct 12, 2023
2c6839e
RD: Implementing re-onboarding, off-boarding, and request-for-info (d…
ipatini Oct 12, 2023
0de73a8
RD: Added DeviceLifeCycleRequestService and DeviceLifeCycleResponseSe…
ipatini Oct 13, 2023
8e8205c
RD: Added device archiving and unarchiving in GUI
ipatini Oct 13, 2023
546fa0d
RD: Implemented DeviceProcessor to periodically archive off-boarded d…
ipatini Oct 13, 2023
114c630
RD: Changed devices.html to always display 'Archive' button to Admin …
ipatini Oct 13, 2023
23f5830
RD: Added archived-device-view.html, renamed archived-view.html to ar…
ipatini Oct 13, 2023
287bf41
RD: Fixed DeviceManagementController to allow plain users (device own…
ipatini Oct 13, 2023
550da0a
RD: Extended DeviceProcessor to check for suspect and failed devices.…
ipatini Oct 13, 2023
a54736e
RD: Added shortcuts (top-right corner) to all detail pages
ipatini Oct 13, 2023
f1fab9c
RD: Minor change in index.html (grayed out settings image)
ipatini Oct 13, 2023
21be064
RD: Improved AbstractMonitorService (reuse of single connection, bett…
ipatini Oct 16, 2023
7cd9728
RD: Minor code (mostly logging) and GUI improvements
ipatini Oct 16, 2023
9f20620
RD: Added 'UnknownDeviceRegistrationService' class to monitor for unk…
ipatini Oct 16, 2023
b4e75bb
RD: Added prompting admin to provide device credentials when un-archi…
ipatini Oct 16, 2023
151fb16
RD: Moved REQUEST_TYPE enum to 'common' package and updated code
ipatini Oct 17, 2023
ccd667a
RD: Changed authentication to use BCrypt-encrypted passwords
ipatini Oct 17, 2023
5305269
RD: Moved message broker communication code to BrokerUtil class, and …
ipatini Oct 17, 2023
101f2c8
RD: Minor code tidy up
ipatini Oct 17, 2023
fc3c1af
RD: Added support for SSL connections to ActiveMQ broker in BrokerUti…
ipatini Oct 17, 2023
e75198e
RD: Fixed issue where UnknownDeviceRegistrationService registers a de…
ipatini Oct 18, 2023
362e007
RD: Modified pom.xml to build an image named 'eu.nebulous.resource-di…
ipatini Oct 18, 2023
ede964c
RD: Added 'DeviceLocation' class and 'location' field in both 'Device…
ipatini Oct 23, 2023
024686e
RD: Added StatusController to report current application status (curr…
ipatini Oct 30, 2023
7ad2cf0
RD: Fixed a few minor issues:
ipatini Oct 30, 2023
8960460
RD: Changes in pom.xml and application.yml:
ipatini Oct 30, 2023
6969d68
RD: Renamed 'management' module to 'resource-discovery'
ipatini Oct 30, 2023
0961229
RD:
ipatini Nov 2, 2023
db290cf
RD: Modified base images in Dockerfile
ipatini Nov 2, 2023
1c25f5e
RD: Modified Dockerfile in order to both compile the app and build th…
ipatini Nov 3, 2023
94260c4
RD: Improved Dockerfile
ipatini Nov 3, 2023
7800bde
RD: Improved Dockerfile
ipatini Nov 8, 2023
cb9b3f4
RD: Upgraded SB version to SB 3.2.1 and JRE to version 21. Updated de…
ipatini Jan 15, 2024
ef4e840
RD: Fixed Dockerfile
ipatini Mar 6, 2024
d8dc54d
Initial changes to support registration and deregistration of edge de…
Mar 11, 2024
2aec433
Merge remote-tracking branch 'origin/master'
Mar 11, 2024
15c3aa3
EMS: Added K8sNetdataCollector [WIP]
ipatini Mar 13, 2024
84ede18
RD: Added 'port' in forms and models
ipatini Mar 14, 2024
8170b97
RD: Added two TODOs
ipatini Mar 14, 2024
38ea2cb
RD: Deactivated UnknownDeviceRegistrationService
ipatini Mar 14, 2024
b755df9
Merge remote-tracking branch 'origin/master'
Mar 15, 2024
d807458
Initial changes to support registration and deregistration of edge de…
Mar 22, 2024
e00d3ff
Improvement in the initialization of SALRegistrationService
Mar 23, 2024
cc0943c
Improvement in the initialization of SALRegistrationService
Mar 23, 2024
58e05e2
Merge remote-tracking branch 'origin/add-sal-connectivity' into add-s…
Mar 23, 2024
e9a62f3
Increased logging to debug setting of processorProperties I94a6fdb461…
Mar 23, 2024
da0fee3
Modified logging to debug the setting of processorProperties I94a6fdb…
Mar 23, 2024
b5b7bae
Correction of syntactic error I94a6fdb4612de192c24511445f1236cdce94b000
Mar 23, 2024
6f4e49b
Addition of needed configuration properties I94a6fdb4612de192c2451144…
Mar 23, 2024
40479c1
Testing of alternative initialization of SAL registration service I94…
Mar 23, 2024
13b8c8b
Testing of alternative initialization of SAL registration service I94…
Mar 23, 2024
0669380
Merge remote-tracking branch 'origin/add-sal-connectivity' into add-s…
Mar 26, 2024
8a307b6
Updates on the topic and the payload format used to communicate with …
Apr 2, 2024
6a8d7ec
Allow setting and using a custom broker port
Apr 2, 2024
135fb65
Pass a default port to the configuration of the Resource discovery se…
Apr 2, 2024
ac13693
Log debugging information related to the port of the NebulOuS broker
Apr 2, 2024
0bfe18d
Publishing of the appropriate message to the broker
Apr 2, 2024
bbb7478
RD: Removed truststore settings from application.yml. Upgraded to SB …
ipatini Apr 4, 2024
c1d06dd
RD: Fixed banner.txt
ipatini Apr 4, 2024
d85d653
RD: Modified application.yml
ipatini Apr 4, 2024
31b103c
RD: Added temp. debug messages
ipatini Apr 6, 2024
738f604
RD: Added temp. debug messages 2
ipatini Apr 6, 2024
8b7e5f4
Revert "RD: Added temp. debug messages 2"
ipatini Apr 6, 2024
49ca35c
Revert "RD: Added temp. debug messages"
ipatini Apr 6, 2024
a681f26
RD: Modified SALRegistrationService to run SAL registration in dedica…
ipatini Apr 8, 2024
2dae658
RD: Added temp. log messages
ipatini Apr 9, 2024
0f35b82
Revert "RD: Added temp. log messages"
ipatini Apr 9, 2024
e3bd385
RD: Updated DeviceMetricsMonitorService to match metric events to dev…
ipatini Apr 9, 2024
f8014e7
Merge remote-tracking branch 'origin/master'
Apr 25, 2024
6739c2d
Merge branch 'master' into add-sal-connectivity
Apr 25, 2024
58ae0e1
Improvements on edge device data propagation
Apr 29, 2024
4f27d5e
Merge branch 'add-sal-connectivity' of https://gitlab.com/nebulous-pr…
Apr 29, 2024
c03fcb2
Integration of changes performed in commit e3bd3852 but accidentally …
Apr 30, 2024
8e5ecdb
RD: Updated Dockerfile
ipatini May 1, 2024
5133aa6
RD: Updated run.sh and added wait_for_mongodb.sh
ipatini May 1, 2024
cceea79
RD: Improved wait_for_mongodb.sh script
ipatini May 9, 2024
c5d653e
Various Improvements
May 13, 2024
baa6857
Attempt to fix a problem when publishing to the broker to register in…
May 15, 2024
bf51577
Merge remote-tracking branch 'refs/remotes/ioannis/master'
atsag Oct 7, 2024
644c98e
Miscellaneous improvements
atsag Oct 7, 2024
a635914
RD: Added API Key authentication
ipatini Oct 8, 2024
32dbaa7
RD: Upgraded SB to 3.2.10, and Lombok, commons-lang3 dependencies to …
ipatini Oct 8, 2024
24ac944
RD: Code improvements (esp. wrt @PreAuthorize annotations)
ipatini Oct 8, 2024
ab99d9b
RD: Made SALRegistrationService service conditionally enabled (by 'di…
ipatini Oct 8, 2024
a74a89e
RD: Updated RegistrationRequestService (and its uses) to take authent…
ipatini Oct 8, 2024
e52d8e8
RD: Added 'Device.ref' field and added its initialization. Ref field …
ipatini Oct 8, 2024
9356f40
RD: Updated GUI pages to include the new 'ref' field. A few more impr…
ipatini Oct 8, 2024
b295c6e
RD: Merged main branch changes into add-apikey-and-appid branch
ipatini Oct 8, 2024
6e19d12
Merge branch 'add-apikey-and-appid' into 'main'
ipatini Oct 8, 2024
32146a8
Merge remote-tracking branch 'origin/main'
atsag Oct 8, 2024
05c022c
Use device reference instead of name to register to SAL
atsag Oct 8, 2024
e31fc02
Merge branch 'use_ref_in_sal_request' into 'main'
ipatini Oct 8, 2024
f39977b
Improvements in device registration and component communication with …
atsag Oct 15, 2024
23773ab
Merge branch 'misc_improvements' into 'main'
ipatini Oct 15, 2024
5f02a30
Small improvements in device registration
atsag Oct 15, 2024
25930dd
Merge branch 'small_registration_improvements' into 'main'
ipatini Oct 15, 2024
9ac2a2b
Small improvement in getting device registration details
atsag Oct 15, 2024
a351bf3
Merge branch 'small_registration_improvements' into 'main'
Oct 15, 2024
9ff7121
Merge remote-tracking branch 'origin/main'
atsag Oct 15, 2024
dd55eda
Small improvement in getting device registration details
atsag Oct 15, 2024
10f81c4
Merge remote-tracking branch 'origin/main'
atsag Oct 15, 2024
bc28d48
RD: Fix in SALRegistrationService class
ipatini Oct 17, 2024
9346f01
RD: Fixed fontawesome cdn url
ipatini Oct 17, 2024
919e11d
Initial deregistration support
atsag Oct 21, 2024
ff1a20a
Merge remote-tracking branch 'origin/main'
atsag Oct 21, 2024
ebbbcb2
RD: Made device on-boarding authorization configurable (MANUAL, ALWAY…
ipatini Oct 21, 2024
fb4dd8c
Merge branch '2024-oct/extend-dev-authorization' into 'main'
ipatini Oct 21, 2024
ebc2a27
RD: Updated RegistrationRequestProcessor to include Device Ref in onb…
ipatini Oct 21, 2024
9e6d42d
Merge remote-tracking branch 'origin/main'
atsag Nov 5, 2024
d47887e
Improvements in the handling of AMQP connections
atsag Nov 6, 2024
e9605ac
Merge branch 'main' into 'amqp_improvements'
Nov 6, 2024
4c93dbe
Merge branch 'amqp_improvements' into 'main'
Nov 6, 2024
93a1479
Addition of provider field, miscellaneous improvements
atsag Nov 11, 2024
f287007
Merge remote-tracking branch 'origin/main'
atsag Nov 11, 2024
90c18b0
Minor logging improvements
atsag Nov 11, 2024
898d82d
Merge remote-tracking branch 'origin/main'
atsag Nov 11, 2024
aecb6b5
Deregistration process improvement
atsag Nov 11, 2024
91c62fb
Stopping the device lost publisher
atsag Nov 11, 2024
0bde76b
Refactoring to use the original Connector class instead of ExtendedCo…
atsag Nov 11, 2024
de70790
Merge remote-tracking branch 'origin/main'
atsag Nov 11, 2024
95cb5ff
Merge remote-tracking branch 'origin/main'
atsag Nov 11, 2024
96c1951
Minor logging improvement
atsag Nov 11, 2024
6f34236
Merge branch 'main' of https://gitlab.com/nebulous-project/resource-d…
atsag Nov 11, 2024
3abb2c2
Only try to stop the connector if it has been previously initialized
atsag Nov 12, 2024
32ae5d7
Merge remote-tracking branch 'origin/main'
atsag Nov 12, 2024
e40c231
Miscellaneous improvements
atsag Nov 18, 2024
1da5a61
Merge remote-tracking branch 'origin/main'
atsag Nov 18, 2024
c29abe4
Password improvements
atsag Nov 19, 2024
9b64c7c
Merge remote-tracking branch 'origin/main'
atsag Nov 19, 2024
982b8a6
Debugging message commit
atsag Nov 20, 2024
37d9213
Merge remote-tracking branch 'origin/main'
atsag Nov 20, 2024
bb4bdc2
Work on Deregistration support
atsag Nov 21, 2024
0663266
Added more logging statements to debug device registration (work by i…
atsag Nov 21, 2024
57ca53b
Fix parsing the json response of SAL
atsag Nov 21, 2024
03458d3
Merge remote-tracking branch 'origin/main'
atsag Nov 21, 2024
2c3acd7
Changes to avoid stopping the connector
atsag Nov 21, 2024
ac4c892
Merge remote-tracking branch 'origin/main'
atsag Nov 21, 2024
4beb9b8
Sal response parsing improvement and other improvements (#35)
robert-sanfeliu Nov 22, 2024
cae220b
Avoid starting new connectors, mark device as offboarded
atsag Nov 22, 2024
dd5cd7f
Merge remote-tracking branch 'origin/main'
atsag Nov 22, 2024
eda6077
RD: Fix for missing device info
ipatini Nov 22, 2024
8d6ad82
Merge remote-tracking branch 'origin/main'
atsag Nov 22, 2024
6e83928
Merge branch 'main' of https://gitlab.com/nebulous-project/resource-d…
atsag Nov 22, 2024
9dbffad
Merge changes from cd branch
atsag Nov 22, 2024
052a7a5
R1 candidate changes (#39)
atsag Nov 22, 2024
adecbd4
Merge remote-tracking branch 'origin/r1' into r1
atsag Nov 22, 2024
916c1e0
Merge remote-tracking branch 'origin/main'
atsag Nov 22, 2024
8023dde
Merge branch 'main' into r1
atsag Nov 22, 2024
3ed6586
Candidate r1 improvements (#41)
atsag Nov 22, 2024
1241387
Fix uninitialized connector issue
atsag Nov 22, 2024
2a3bef2
Merge remote-tracking branch 'origin/main'
atsag Nov 22, 2024
4f9148b
Merge remote-tracking branch 'origin/main'
atsag Nov 22, 2024
861354f
Merge remote-tracking branch 'origin/r1' into r1
atsag Nov 22, 2024
ace0544
Merge branch 'r1'
atsag Nov 22, 2024
6fa5c88
Candidate r1 improvements (#44)
atsag Nov 22, 2024
f9a4f6d
Merge latest stable improvements (#47)
atsag Dec 4, 2024
b868e10
Merge remote-tracking branch 'origin/main' into r1
atsag Dec 4, 2024
77f5c2d
Miscellaneous improvements
atsag Dec 5, 2024
20dcd0a
Merge branch 'r1'
atsag Dec 5, 2024
36e5928
Registration process changes
atsag Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.nebulous.resource.discovery;

import eu.nebulous.resource.discovery.registration.controller.RegistrationRequestController;
import jakarta.servlet.Filter;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
Expand All @@ -15,6 +16,7 @@
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configurers.AbstractHttpConfigurer;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.User;
Expand All @@ -26,6 +28,7 @@

import java.security.SecureRandom;
import java.util.Collections;
import java.util.HashMap;

import static org.springframework.security.config.Customizer.withDefaults;

Expand All @@ -37,6 +40,8 @@
public class SecurityConfig {
private final static String USERNAME_REQUEST_HEADER = "X-SSO-USER";
private final static String USERNAME_REQUEST_PARAM = "ssoUser";
private final static String NONCE_REQUEST_PARAM = "nonce";
private final static String APPID_REQUEST_PARAM = "appId";
private final static String API_KEY_REQUEST_HEADER = "X-API-KEY";
private final static String API_KEY_REQUEST_PARAM = "apiKey";

Expand All @@ -54,6 +59,7 @@ public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity) throws
"/discovery/**", "/*.html").authenticated())
.authorizeHttpRequests(authorize -> authorize.anyRequest().permitAll())
.addFilterAfter(apiKeyAuthenticationFilter(), UsernamePasswordAuthenticationFilter.class)
.addFilterAfter(nonceAuthenticationFilter(), UsernamePasswordAuthenticationFilter.class)
.csrf(AbstractHttpConfigurer::disable)
.sessionManagement(sm -> sm.sessionCreationPolicy(SessionCreationPolicy.ALWAYS));

Expand Down Expand Up @@ -100,6 +106,13 @@ public boolean matches(CharSequence rawPassword, String encodedPassword) {

public Filter apiKeyAuthenticationFilter() {
return (servletRequest, servletResponse, filterChain) -> {

Authentication auth = SecurityContextHolder.getContext().getAuthentication();
if (auth!=null && auth.isAuthenticated()) {
filterChain.doFilter(servletRequest, servletResponse);
return;
}

if (properties.isApiKeyAuthenticationEnabled() && StringUtils.isNotBlank(properties.getApiKeyValue())) {
if (servletRequest instanceof HttpServletRequest request && servletResponse instanceof HttpServletResponse) {

Expand Down Expand Up @@ -149,4 +162,65 @@ public Filter apiKeyAuthenticationFilter() {
filterChain.doFilter(servletRequest, servletResponse);
};
}

public Filter nonceAuthenticationFilter(){
return (servletRequest, servletResponse, filterChain) -> {
try {
HttpServletRequest request = ((HttpServletRequest )servletRequest);
// HttpSession session = request.getSession(false);
//
// if(session!=null){
// filterChain.doFilter(servletRequest, servletResponse);
// return;
// }
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
if (auth!=null && auth.isAuthenticated()) {
filterChain.doFilter(servletRequest, servletResponse);
return;
}
StringBuilder requestURL = new StringBuilder(request.getRequestURL().toString());
String queryString = request.getQueryString();

if (queryString == null) {
log.warn( requestURL.toString());
} else {
log.warn(requestURL.append('?').append(queryString).toString());
}
log.warn(servletRequest.toString());

String nonce = servletRequest.getParameter(NONCE_REQUEST_PARAM);
String appId = servletRequest.getParameter(APPID_REQUEST_PARAM);

String username =null;
HashMap<String, String> map = new HashMap<>();
map.put(NONCE_REQUEST_PARAM, nonce);
map.put(APPID_REQUEST_PARAM, appId);
username = RegistrationRequestController.getNonceUsername(map);
// if ((nonce != null && appId != null) && (!nonce.isEmpty())) {
// HashMap<String, String> map = new HashMap<>();
// map.put(NONCE_REQUEST_PARAM, nonce);
// map.put(APPID_REQUEST_PARAM, appId);
// username = RegistrationRequestController.getNonceUsername(map);
// }

if (username != null) {
UsernamePasswordAuthenticationToken authentication =
new UsernamePasswordAuthenticationToken(username, nonce,
Collections.singletonList(new SimpleGrantedAuthority(SSO_USER_ROLE)));
// store completed authentication in security context
SecurityContextHolder.getContext().setAuthentication(authentication);
log.debug("User was authenticated using a nonce token");
}

} catch (Exception e) {
log.error("nonceAuthenticationFilter: EXCEPTION: ", e);
}

// continue down the chain
filterChain.doFilter(servletRequest, servletResponse);

};
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.json.simple.JSONValue;

import java.util.*;
Expand All @@ -15,11 +16,13 @@

import static eu.nebulous.resource.discovery.broker_communication.BrokerPublisher.EMPTY;


@Slf4j
public class BrokerSubscriber {

private AtomicBoolean stop_signal = new AtomicBoolean();

private Connector connector;
private final AtomicBoolean stop_signal = new AtomicBoolean(false);

private class MessageProcessingHandler extends Handler {
private BrokerSubscriptionDetails broker_details;
private static final BiFunction temporary_function = (Object o, Object o2) -> {
Expand All @@ -31,8 +34,18 @@ private class MessageProcessingHandler extends Handler {

@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
log.info("Handling message for address " + address);
processing_function.apply(broker_details, JSONValue.toJSONString(body));
log.warn("Handling message for address " + address);
String precise_topic;
try {
precise_topic = message.to();
} catch (ClientException e) {
throw new RuntimeException(e);
}
if (precise_topic!=null){
broker_details.setTopic(precise_topic);
}

processing_function.apply(new BrokerSubscriptionDetails(broker_ip,broker_port,brokerUsername,brokerPassword,application_name,precise_topic), JSONValue.toJSONString(body));
}

public MessageProcessingHandler(BrokerSubscriptionDetails broker_details) {
Expand All @@ -56,10 +69,11 @@ public void setProcessing_function(BiFunction processing_function) {

private static HashMap<String, HashSet<String>> broker_and_topics_to_subscribe_to = new HashMap<>();
private static HashMap<String, HashMap<String, Consumer>> active_consumers_per_topic_per_broker_ip = new HashMap<>();
private static HashMap<String, Connector> current_connectors = new HashMap<>();
private static final HashMap<String, CustomConnectorHandler> current_connector_handlers = new HashMap<>();
ArrayList<Consumer> consumers = new ArrayList<>();
private String topic;
private String broker_ip;
private String application_name;
private int broker_port;
private String brokerUsername;
private String brokerPassword;
Expand Down Expand Up @@ -101,32 +115,77 @@ public BrokerSubscriber(String topic, String broker_ip, int broker_port, String
}
}
if (subscriber_configuration_changed) {
Consumer current_consumer;
if (application_name != null && !application_name.equals(EMPTY)) { //Create a consumer for one application
log.info("APP level subscriber " + topic);
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), application_name, true, true);
} else { //Allow the consumer to get information from any publisher
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), true, true);
log.info("HIGH level subscriber " + topic);
}
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, current_consumer);

this.application_name = application_name;
this.topic = topic;
this.broker_ip = broker_ip;
this.broker_port = broker_port;
this.brokerUsername = brokerUsername;
this.brokerPassword = brokerPassword;
add_topic_consumer_to_broker_connector(current_consumer);

}
}


public void stop(){
synchronized (stop_signal) {
stop_signal.set(true);
}
}

/**
* This method updates the global connector of Resource manager to the AMQP server, by adding support for one more component
*/
private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
if (current_connectors.get(broker_ip) != null) {
//current_connectors.get(broker_ip).stop(consumers,new ArrayList<>());
current_connectors.get(broker_ip).stop();
/* Consumer current_consumer;
if (application_name != null && !application_name.equals(EMPTY)) { //Create a consumer for one application
Logger.getAnonymousLogger().log(INFO,"APP level subscriber " + topic);
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), application_name, true, true);
} else { //Allow the consumer to get information from any publisher
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), true, true);
Logger.getAnonymousLogger().log(INFO,"HIGH level subscriber " + topic);
}
current_consumer.setProperty("topic",topic);*/

active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer);

CustomConnectorHandler current_connector_handler = current_connector_handlers.get(broker_ip);
if (current_connector_handler==null){
current_connector_handler = new CustomConnectorHandler() {};
this.connector = new Connector("slo_violation_detector_consumer",
current_connector_handler,
List.of(),
List.of(new_consumer),
false,
false,
new StaticExnConfig(
broker_ip,
broker_port,
brokerUsername,
brokerPassword,
60,
EMPTY
)
);
connector.start();
}else if (!active_consumers_per_topic_per_broker_ip.get(broker_ip).containsKey(topic)){
//current_connector_handler.remove_consumer_with_key(topic);

synchronized (current_connector_handler.getReady()) {
while (!current_connector_handler.getReady().get()) {
try {
current_connector_handler.getReady().wait();
log.info("Unable to register connector handler as the connector is unexpectedly not ready");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
log.warn("Adding new consumer " + new_consumer.key());
current_connector_handler.add_consumer(new_consumer);
}
/*if (current_connectors.get(broker_ip) != null) {
current_connectors.get(broker_ip).stop(consumers,new ArrayList<>());
}
if (consumers.isEmpty()){
consumers = new ArrayList<>();
Expand All @@ -140,7 +199,7 @@ private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
if(!do_not_add_new_consumer) {
consumers.add(new_consumer);
}
Connector connector = new Connector("resource_manager",
Connector extended_connector = new Connector("resource_manager",
new CustomConnectorHandler() {
},
List.of(),
Expand All @@ -156,21 +215,13 @@ private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
EMPTY
)
);
connector.start();
current_connectors.put(broker_ip, connector);
}

private void remove_topic_from_broker_connector(String topic_key) {
if (current_connectors.get(broker_ip) != null) {
//current_connectors.get(broker_ip).remove_consumer_with_key(topic_key);
}
extended_connector.start();
current_connectors.put(broker_ip, extended_connector);*/
}


public int subscribe (BiFunction function, String application_name) {
return subscribe(function,application_name,stop_signal);
}

public int subscribe(BiFunction function, String application_name, AtomicBoolean stop_signal) {

int exit_status = -1;
log.info("ESTABLISHING SUBSCRIPTION for " + topic);
/*
Expand All @@ -181,7 +232,8 @@ public int subscribe(BiFunction function, String application_name, AtomicBoolean
} else {
active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
}
*/

*/
//Then add the new consumer
Consumer new_consumer;
if (application_name != null && !application_name.equals(EMPTY)) {
Expand All @@ -191,7 +243,13 @@ public int subscribe(BiFunction function, String application_name, AtomicBoolean
new_consumer = new Consumer(topic, topic, new MessageProcessingHandler(function, broker_details), true, true);
}
new_consumer.setProperty("topic", topic);
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer);

if (active_consumers_per_topic_per_broker_ip.containsKey(broker_ip)){
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer);
}else {
active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
}

add_topic_consumer_to_broker_connector(new_consumer);

log.info("ESTABLISHED SUBSCRIPTION to topic " + topic);
Expand All @@ -209,8 +267,9 @@ public int subscribe(BiFunction function, String application_name, AtomicBoolean
}
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
//remove_topic_from_broker_connector(topic);
current_connectors.get(broker_ip).stop();
//current_connectors.get(broker_ip).stop();
exit_status = 0;
return exit_status;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.nebulous.resource.discovery.broker_communication;

import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;

Expand All @@ -20,6 +21,10 @@ public void onReady(Context context) {
public void remove_consumer_with_key(String key){
context.unregisterConsumer(key);
}

public void add_consumer(Consumer consumer){
context.registerConsumer(consumer);
}

public Context getContext() {
return context;
Expand Down
Loading