From 7a16b538a37b406a35466324597602fad285405b Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Mon, 21 Aug 2023 21:27:23 +0200 Subject: [PATCH 1/5] Debugging tests on CI --- .../iosb/ilt/statests/TestSuite.java | 90 ------------------- .../ilt/statests/util/mqtt/MqttHelper.java | 2 +- .../ilt/statests/util/mqtt/MqttListener.java | 6 +- .../src/test/resources/logback-test.xml | 1 + 4 files changed, 5 insertions(+), 94 deletions(-) diff --git a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/TestSuite.java b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/TestSuite.java index 331c14bfb..4dcd3e863 100644 --- a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/TestSuite.java +++ b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/TestSuite.java @@ -35,55 +35,10 @@ import de.fraunhofer.iosb.ilt.statests.c01sensingcore.Capability1CoreOnlyTests11; import de.fraunhofer.iosb.ilt.statests.c01sensingcore.Capability1Tests10; import de.fraunhofer.iosb.ilt.statests.c01sensingcore.Capability1Tests11; -import de.fraunhofer.iosb.ilt.statests.c02cud.AdditionalTests10; -import de.fraunhofer.iosb.ilt.statests.c02cud.AdditionalTests11; -import de.fraunhofer.iosb.ilt.statests.c02cud.Capability2Tests10; -import de.fraunhofer.iosb.ilt.statests.c02cud.Capability2Tests11; -import de.fraunhofer.iosb.ilt.statests.c02cud.DeleteFilterTests10; -import de.fraunhofer.iosb.ilt.statests.c02cud.DeleteFilterTests11; -import de.fraunhofer.iosb.ilt.statests.c02cud.JsonPatchTests10; -import de.fraunhofer.iosb.ilt.statests.c02cud.JsonPatchTests11; -import de.fraunhofer.iosb.ilt.statests.c02cud.ResultTypesTests10; -import de.fraunhofer.iosb.ilt.statests.c02cud.ResultTypesTests11; -import de.fraunhofer.iosb.ilt.statests.c03filtering.Capability3Tests10; -import de.fraunhofer.iosb.ilt.statests.c03filtering.Capability3Tests11; -import de.fraunhofer.iosb.ilt.statests.c03filtering.DateTimeTests10; -import de.fraunhofer.iosb.ilt.statests.c03filtering.DateTimeTests11; -import de.fraunhofer.iosb.ilt.statests.c03filtering.FilterTests10; -import de.fraunhofer.iosb.ilt.statests.c03filtering.FilterTests11; -import de.fraunhofer.iosb.ilt.statests.c03filtering.GeoTests10; -import de.fraunhofer.iosb.ilt.statests.c03filtering.GeoTests11; -import de.fraunhofer.iosb.ilt.statests.c03filtering.JsonPropertiesTests10; -import de.fraunhofer.iosb.ilt.statests.c03filtering.JsonPropertiesTests11; -import de.fraunhofer.iosb.ilt.statests.c04batch.BatchTests10; -import de.fraunhofer.iosb.ilt.statests.c04batch.BatchTests11; -import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MdDateTimeTests10; -import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MdDateTimeTests11; -import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MultiDatastreamObsPropTests10; -import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MultiDatastreamObsPropTests11; -import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MultiDatastreamTests10; -import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MultiDatastreamTests11; -import de.fraunhofer.iosb.ilt.statests.c06dataarrays.DataArrayTests10; -import de.fraunhofer.iosb.ilt.statests.c06dataarrays.DataArrayTests11; import de.fraunhofer.iosb.ilt.statests.c07mqttcreate.Capability7Tests10; import de.fraunhofer.iosb.ilt.statests.c07mqttcreate.Capability7Tests11; import de.fraunhofer.iosb.ilt.statests.c08mqttsubscribe.Capability8Tests10; import de.fraunhofer.iosb.ilt.statests.c08mqttsubscribe.Capability8Tests11; -import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthAnonReadTests10; -import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthAnonReadTests11; -import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthCryptPwTests10; -import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthCryptPwTests11; -import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthTests10; -import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthTests11; -import de.fraunhofer.iosb.ilt.statests.f01auth.FineGrainedAuthTests11; -import de.fraunhofer.iosb.ilt.statests.f01auth.KeyCloakAnonReadTests10; -import de.fraunhofer.iosb.ilt.statests.f01auth.KeyCloakAnonReadTests11; -import de.fraunhofer.iosb.ilt.statests.f01auth.KeyCloakTests10; -import de.fraunhofer.iosb.ilt.statests.f01auth.KeyCloakTests11; -import de.fraunhofer.iosb.ilt.statests.f02customlinks.CustomLinksTests10; -import de.fraunhofer.iosb.ilt.statests.f02customlinks.CustomLinksTests11; -import de.fraunhofer.iosb.ilt.statests.f03metadata.MetadataTests10; -import de.fraunhofer.iosb.ilt.statests.f03metadata.MetadataTests11; import de.fraunhofer.iosb.ilt.statests.util.HTTPMethods; import de.fraunhofer.iosb.ilt.statests.util.HTTPMethods.HttpResponse; import java.io.IOException; @@ -131,55 +86,10 @@ Capability1CoreOnlyTests11.class, Capability1Tests10.class, Capability1Tests11.class, - Capability2Tests10.class, - Capability2Tests11.class, - AdditionalTests10.class, - AdditionalTests11.class, - DeleteFilterTests10.class, - DeleteFilterTests11.class, - JsonPatchTests10.class, - JsonPatchTests11.class, - ResultTypesTests10.class, - ResultTypesTests11.class, - Capability3Tests10.class, - Capability3Tests11.class, - DateTimeTests10.class, - DateTimeTests11.class, - FilterTests10.class, - FilterTests11.class, - GeoTests10.class, - GeoTests11.class, - JsonPropertiesTests10.class, - JsonPropertiesTests11.class, - BatchTests10.class, - BatchTests11.class, - MultiDatastreamTests10.class, - MultiDatastreamTests11.class, - MultiDatastreamObsPropTests10.class, - MultiDatastreamObsPropTests11.class, - MdDateTimeTests10.class, - MdDateTimeTests11.class, - DataArrayTests10.class, - DataArrayTests11.class, Capability7Tests10.class, Capability7Tests11.class, Capability8Tests10.class, Capability8Tests11.class, - BasicAuthTests10.class, - BasicAuthTests11.class, - BasicAuthAnonReadTests10.class, - BasicAuthAnonReadTests11.class, - BasicAuthCryptPwTests10.class, - BasicAuthCryptPwTests11.class, - FineGrainedAuthTests11.class, - KeyCloakTests10.class, - KeyCloakTests11.class, - KeyCloakAnonReadTests10.class, - KeyCloakAnonReadTests11.class, - CustomLinksTests10.class, - CustomLinksTests11.class, - MetadataTests10.class, - MetadataTests11.class, TestSuite.SuiteFinaliser.class }) @Suite diff --git a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttHelper.java b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttHelper.java index da33e7175..b9db3b7a9 100644 --- a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttHelper.java +++ b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttHelper.java @@ -129,7 +129,7 @@ public MqttBatchResult executeRequests(Callable action, String... topi waitMillis(WAIT_AFTER_SUBSCRIBE); try { - LOGGER.debug(" Calling action..."); + LOGGER.debug(" Calling action..."); result.setActionResult(action.call()); } catch (Exception ex) { LOGGER.error("Exception on server {} :", mqttServerUri, ex); diff --git a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttListener.java b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttListener.java index f8fe74737..fff06b6ff 100644 --- a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttListener.java +++ b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttListener.java @@ -81,9 +81,9 @@ public void messageArrived(String topic, MqttMessage mm) { if (barrier.getCount() > 0) { result = new JSONObject(new String(mm.getPayload(), StandardCharsets.UTF_8)); barrier.countDown(); - LOGGER.debug("Received on {}. To go: {}", topic, barrier.getCount()); + LOGGER.debug(" Received on {}. To go: {}", topic, barrier.getCount()); } else { - LOGGER.error("Received on {}. Barrier already empty!", topic); + LOGGER.error(" Received on {}. Barrier already empty!", topic); } } @@ -96,7 +96,7 @@ public void deliveryComplete(IMqttDeliveryToken imdt) { mqttClient.subscribe(topic, MqttHelper.QOS, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken imt) { - LOGGER.debug("Subscribed to {}", topic); + LOGGER.debug(" Subscribed to {}", topic); connectBarrier.countDown(); } diff --git a/FROST-Server.Tests/src/test/resources/logback-test.xml b/FROST-Server.Tests/src/test/resources/logback-test.xml index 42f6c00fa..698bccd44 100644 --- a/FROST-Server.Tests/src/test/resources/logback-test.xml +++ b/FROST-Server.Tests/src/test/resources/logback-test.xml @@ -18,6 +18,7 @@ + From 37c0424be3371a00cea2dd1981f4135f78053ed6 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Mon, 21 Aug 2023 22:48:27 +0200 Subject: [PATCH 2/5] Added more debug output --- .../iosb/ilt/frostserver/mqtt/SubscriptionManager.java | 5 +++++ FROST-Server.Tests/src/test/resources/logback-test.xml | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionManager.java b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionManager.java index 4b5b79652..8fff40fa3 100644 --- a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionManager.java +++ b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionManager.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A manger for subscriptions for a single entity type. @@ -35,6 +37,8 @@ */ class SubscriptionManager { + private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionManager.class.getName()); + /** * The main entity type of the subscriptions in this manager. */ @@ -75,6 +79,7 @@ public void handleEntityChanged(PersistenceManager persistenceManager, Entity en } public synchronized void addSubscription(Subscription subscription) { + LOGGER.debug("Adding subscription to {}", subscription); NavigationPropertyMain parentRelation = subscription.getParentRelation(); if (parentRelation != null) { SubscriptionSetDirectParent parentSet = parentedSubscriptions.computeIfAbsent(parentRelation, t -> new SubscriptionSetDirectParent(mqttManager, parentRelation, topicCount)); diff --git a/FROST-Server.Tests/src/test/resources/logback-test.xml b/FROST-Server.Tests/src/test/resources/logback-test.xml index 698bccd44..bc0633b52 100644 --- a/FROST-Server.Tests/src/test/resources/logback-test.xml +++ b/FROST-Server.Tests/src/test/resources/logback-test.xml @@ -8,7 +8,6 @@ - @@ -19,6 +18,7 @@ + From 3c75975ce867657811dbc5d8d843279d52e34c19 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Tue, 22 Aug 2023 20:36:47 +0200 Subject: [PATCH 3/5] Changed debug output --- .../de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java | 2 +- .../iosb/ilt/frostserver/mqtt/SubscriptionManager.java | 2 +- .../frostserver/mqtt/subscription/AbstractSubscription.java | 5 +++++ FROST-Server.Tests/src/test/resources/logback-test.xml | 1 + 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java index 5fc8f25eb..b3bbf8db0 100644 --- a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java +++ b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java @@ -167,7 +167,7 @@ private void handleEntityChangedEvent(EntityChangedMessage message) { logStatus.setEntityChangedQueueSize(entityChangedQueueSize.decrementAndGet()); final EntityChangedMessage.Type eventType = message.getEventType(); EntityType entityType = message.getEntityType(); - LOGGER.trace("Received a {} message for a {}.", eventType, entityType); + LOGGER.debug(" Received a {} message for a {}.", eventType, entityType); if (eventType == EntityChangedMessage.Type.DELETE) { // v1.0 does not do delete notification. return; diff --git a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionManager.java b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionManager.java index 8fff40fa3..accd7486c 100644 --- a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionManager.java +++ b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/SubscriptionManager.java @@ -79,7 +79,7 @@ public void handleEntityChanged(PersistenceManager persistenceManager, Entity en } public synchronized void addSubscription(Subscription subscription) { - LOGGER.debug("Adding subscription to {}", subscription); + LOGGER.debug(" Adding subscription to {}", subscription); NavigationPropertyMain parentRelation = subscription.getParentRelation(); if (parentRelation != null) { SubscriptionSetDirectParent parentSet = parentedSubscriptions.computeIfAbsent(parentRelation, t -> new SubscriptionSetDirectParent(mqttManager, parentRelation, topicCount)); diff --git a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/subscription/AbstractSubscription.java b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/subscription/AbstractSubscription.java index b2e80cf1b..274cc44b7 100644 --- a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/subscription/AbstractSubscription.java +++ b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/subscription/AbstractSubscription.java @@ -208,4 +208,9 @@ public boolean equals(Object obj) { return this.entityType == other.entityType; } + @Override + public String toString() { + return getTopic(); + } + } diff --git a/FROST-Server.Tests/src/test/resources/logback-test.xml b/FROST-Server.Tests/src/test/resources/logback-test.xml index bc0633b52..c5b4514ab 100644 --- a/FROST-Server.Tests/src/test/resources/logback-test.xml +++ b/FROST-Server.Tests/src/test/resources/logback-test.xml @@ -19,6 +19,7 @@ + From 0274f5c0603fc3327ad57706864ce69820e4db83 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Tue, 22 Aug 2023 23:58:54 +0200 Subject: [PATCH 4/5] Changed debug output --- .../iosb/ilt/frostserver/messagebus/MqttMessageBus.java | 6 ++++++ .../fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java | 2 +- FROST-Server.Tests/src/test/resources/logback-test.xml | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/messagebus/MqttMessageBus.java b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/messagebus/MqttMessageBus.java index eec16ae08..9901209b2 100644 --- a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/messagebus/MqttMessageBus.java +++ b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/messagebus/MqttMessageBus.java @@ -319,6 +319,9 @@ private void handleMessageSent(EntityChangedMessage message) { connect(); } client.publish(topicName, bytes, qosLevel, false); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(" Sent a {} message for a {}", message.getEventType(), message.getEntityType()); + } } catch (MqttException | JsonProcessingException ex) { LOGGER.error("Failed to publish message to bus.", ex); } @@ -345,6 +348,9 @@ public void messageArrived(String topic, MqttMessage mqttMessage) throws IOExcep LOGGER.debug("Failed to decode message: {}", serialisedEcMessage, ex); return; } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(" Received a {} message for a {}", ecMessage.getEventType(), ecMessage.getEntityType()); + } if (!recvQueue.offer(ecMessage)) { LOGGER.error("Failed to add message to receive-queue. Increase {}{} (currently {}) to allow a bigger buffer, or increase {}{} (currently {}) to empty the buffer quicker.", PREFIX_BUS, TAG_RECV_QUEUE_SIZE, recvQueueSize, PREFIX_BUS, TAG_RECV_WORKER_COUNT, recvPoolSize); diff --git a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java index b3bbf8db0..b02317be1 100644 --- a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java +++ b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java @@ -167,7 +167,7 @@ private void handleEntityChangedEvent(EntityChangedMessage message) { logStatus.setEntityChangedQueueSize(entityChangedQueueSize.decrementAndGet()); final EntityChangedMessage.Type eventType = message.getEventType(); EntityType entityType = message.getEntityType(); - LOGGER.debug(" Received a {} message for a {}.", eventType, entityType); + LOGGER.debug(" Handling a {} message for a {}.", eventType, entityType); if (eventType == EntityChangedMessage.Type.DELETE) { // v1.0 does not do delete notification. return; diff --git a/FROST-Server.Tests/src/test/resources/logback-test.xml b/FROST-Server.Tests/src/test/resources/logback-test.xml index c5b4514ab..5ffb737cf 100644 --- a/FROST-Server.Tests/src/test/resources/logback-test.xml +++ b/FROST-Server.Tests/src/test/resources/logback-test.xml @@ -20,6 +20,7 @@ + From 98f25ea4f651a8910b438c37659f70e8b35544b0 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Wed, 23 Aug 2023 13:53:09 +0200 Subject: [PATCH 5/5] Replaced wait-after-subscribe with listener --- .../ilt/frostserver/mqtt/MqttManager.java | 48 +++++++++++++++++++ .../iosb/ilt/statests/ServerSettings.java | 12 ++--- .../ilt/statests/util/mqtt/MqttHelper.java | 6 +-- .../ilt/statests/util/mqtt/MqttListener.java | 20 +++++++- 4 files changed, 74 insertions(+), 12 deletions(-) diff --git a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java index b02317be1..446cd0eb7 100644 --- a/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java +++ b/FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/mqtt/MqttManager.java @@ -73,6 +73,11 @@ public class MqttManager implements SubscriptionListener, MessageListener, Entit private static final Logger LOGGER = LoggerFactory.getLogger(MqttManager.class); + /** + * Listeners for integration-test use only. Not thread safe. + */ + private static List TEST_LISTENERS; + private final Map subscriptions = new HashMap<>(); private final CoreSettings settings; private final SubscriptionFactory subscriptionFactory; @@ -249,6 +254,7 @@ public void onSubscribe(SubscriptionEvent e) { subscriptions.get(subscription.getEntityType()) .addSubscription(subscription); logStatus.setTopicCount(topicCount.get()); + fireTestSubscriptionAdded(e); } @Override @@ -409,4 +415,46 @@ public LoggingStatus setTopicCount(Integer count) { } } + + private static void fireTestSubscriptionAdded(SubscriptionEvent s) { + if (TEST_LISTENERS == null) { + return; + } + for (SubscriptionListener l : TEST_LISTENERS) { + l.onSubscribe(s); + } + + } + + /** + * For test use only. + * + * @param l the listener to add. + */ + public static void addTestSubscriptionListener(SubscriptionListener l) { + if (TEST_LISTENERS == null) { + TEST_LISTENERS = new ArrayList<>(); + } + TEST_LISTENERS.add(l); + } + + /** + * For test use only. + * + * @param l the listener to remove. + */ + public static void removeTestSubscriptionListener(SubscriptionListener l) { + if (TEST_LISTENERS == null) { + return; + } + TEST_LISTENERS.remove(l); + } + + /** + * For test use only. + */ + public static void clearTestSubscriptionListeners() { + TEST_LISTENERS = null; + } + } diff --git a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/ServerSettings.java b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/ServerSettings.java index e79e2d904..399ec9478 100644 --- a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/ServerSettings.java +++ b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/ServerSettings.java @@ -35,6 +35,11 @@ public class ServerSettings { public static final Requirement TASKING_REQ = Requirement.IOT_TASKING_1_0_TASKING_CAPABILITY_PROPERTIES; public static final Requirement MULTIDATA_REQ = Requirement.IOT_SENSING_1_1_MULTI_DATASTREAM_CONSTRAINTS; + /** + * The timeout to use when waiting for MQTT messages. + */ + public static final long MQTT_TIMEOUT = 30000; + /** * The root of FROST, without the v1.0. */ @@ -48,11 +53,6 @@ public class ServerSettings { private final Set extensions = EnumSet.noneOf(Extension.class); private final Set enabledEntityTypes = EnumSet.noneOf(EntityType.class); - /** - * The timeout to use when waiting for MQTT messages. - */ - private final long mqttTimeOut = 30000; - public void setServiceRootUrl(String serviceRootUrl) { if (serviceRootUrl.endsWith("/")) { this.serviceRootUrl = serviceRootUrl.substring(0, serviceRootUrl.length() - 1); @@ -177,7 +177,7 @@ public boolean implementsVersion(ServerVersion version) { * @return the mqttTimeOut */ public long getMqttTimeOut() { - return mqttTimeOut; + return MQTT_TIMEOUT; } } diff --git a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttHelper.java b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttHelper.java index b9db3b7a9..b6d286b6e 100644 --- a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttHelper.java +++ b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttHelper.java @@ -27,6 +27,7 @@ import static de.fraunhofer.iosb.ilt.statests.util.EntityType.THING; import static org.junit.jupiter.api.Assertions.fail; +import de.fraunhofer.iosb.ilt.frostserver.mqtt.MqttManager; import de.fraunhofer.iosb.ilt.statests.ServerVersion; import de.fraunhofer.iosb.ilt.statests.util.EntityType; import de.fraunhofer.iosb.ilt.statests.util.Utils; @@ -61,7 +62,6 @@ public class MqttHelper { */ public static final int WAIT_AFTER_INSERT = 100; public static final int WAIT_AFTER_CLEANUP = 500; - public static final int WAIT_AFTER_SUBSCRIBE = 200; public static final int QOS = 2; public static final String CLIENT_ID = "STA-test_suite"; private final String mqttServerUri; @@ -125,9 +125,6 @@ public MqttBatchResult executeRequests(Callable action, String... topi tempResult.put(topic, executor.submit(listener)); } - // Give the MQTT server time to process the subscriptions. - waitMillis(WAIT_AFTER_SUBSCRIBE); - try { LOGGER.debug(" Calling action..."); result.setActionResult(action.call()); @@ -148,6 +145,7 @@ public MqttBatchResult executeRequests(Callable action, String... topi } finally { executor.shutdownNow(); } + MqttManager.clearTestSubscriptionListeners(); return result; } diff --git a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttListener.java b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttListener.java index fff06b6ff..f414d5827 100644 --- a/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttListener.java +++ b/FROST-Server.Tests/src/test/java/de/fraunhofer/iosb/ilt/statests/util/mqtt/MqttListener.java @@ -19,6 +19,10 @@ import static org.junit.jupiter.api.Assertions.fail; +import de.fraunhofer.iosb.ilt.frostserver.mqtt.MqttManager; +import de.fraunhofer.iosb.ilt.frostserver.mqtt.subscription.SubscriptionEvent; +import de.fraunhofer.iosb.ilt.frostserver.mqtt.subscription.SubscriptionListener; +import de.fraunhofer.iosb.ilt.statests.ServerSettings; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.Callable; @@ -62,10 +66,22 @@ public MqttListener(String mqttServer, String topic) { public void connect() { try { - final CountDownLatch connectBarrier = new CountDownLatch(1); + final CountDownLatch connectBarrier = new CountDownLatch(2); mqttClient = new MqttAsyncClient(mqttServerUri, MqttHelper.CLIENT_ID + "-" + topic + "-" + UUID.randomUUID(), new MemoryPersistence()); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); + MqttManager.addTestSubscriptionListener(new SubscriptionListener() { + @Override + public void onSubscribe(SubscriptionEvent subscription) { + if (topic.equals(subscription.getTopic())) { + connectBarrier.countDown(); + } + } + + @Override + public void onUnsubscribe(SubscriptionEvent subscription) { + } + }); mqttClient.connect(connOpts, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { @@ -119,7 +135,7 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) { } }); try { - connectBarrier.await(); + connectBarrier.await(ServerSettings.MQTT_TIMEOUT, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { LOGGER.error("Exception:", ex); }