Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debugging tests on CI #1684

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ private void handleMessageSent(EntityChangedMessage message) {
connect();
}
client.publish(topicName, bytes, qosLevel, false);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(" Sent a {} message for a {}", message.getEventType(), message.getEntityType());
}
} catch (MqttException | JsonProcessingException ex) {
LOGGER.error("Failed to publish message to bus.", ex);
}
Expand All @@ -345,6 +348,9 @@ public void messageArrived(String topic, MqttMessage mqttMessage) throws IOExcep
LOGGER.debug("Failed to decode message: {}", serialisedEcMessage, ex);
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(" Received a {} message for a {}", ecMessage.getEventType(), ecMessage.getEntityType());
}
if (!recvQueue.offer(ecMessage)) {
LOGGER.error("Failed to add message to receive-queue. Increase {}{} (currently {}) to allow a bigger buffer, or increase {}{} (currently {}) to empty the buffer quicker.",
PREFIX_BUS, TAG_RECV_QUEUE_SIZE, recvQueueSize, PREFIX_BUS, TAG_RECV_WORKER_COUNT, recvPoolSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public class MqttManager implements SubscriptionListener, MessageListener, Entit

private static final Logger LOGGER = LoggerFactory.getLogger(MqttManager.class);

/**
* Listeners for integration-test use only. Not thread safe.
*/
private static List<SubscriptionListener> TEST_LISTENERS;

private final Map<EntityType, SubscriptionManager> subscriptions = new HashMap<>();
private final CoreSettings settings;
private final SubscriptionFactory subscriptionFactory;
Expand Down Expand Up @@ -167,7 +172,7 @@ private void handleEntityChangedEvent(EntityChangedMessage message) {
logStatus.setEntityChangedQueueSize(entityChangedQueueSize.decrementAndGet());
final EntityChangedMessage.Type eventType = message.getEventType();
EntityType entityType = message.getEntityType();
LOGGER.trace("Received a {} message for a {}.", eventType, entityType);
LOGGER.debug(" Handling a {} message for a {}.", eventType, entityType);
if (eventType == EntityChangedMessage.Type.DELETE) {
// v1.0 does not do delete notification.
return;
Expand Down Expand Up @@ -249,6 +254,7 @@ public void onSubscribe(SubscriptionEvent e) {
subscriptions.get(subscription.getEntityType())
.addSubscription(subscription);
logStatus.setTopicCount(topicCount.get());
fireTestSubscriptionAdded(e);
}

@Override
Expand Down Expand Up @@ -409,4 +415,46 @@ public LoggingStatus setTopicCount(Integer count) {
}

}

private static void fireTestSubscriptionAdded(SubscriptionEvent s) {
if (TEST_LISTENERS == null) {
return;
}
for (SubscriptionListener l : TEST_LISTENERS) {
l.onSubscribe(s);
}

}

/**
* For test use only.
*
* @param l the listener to add.
*/
public static void addTestSubscriptionListener(SubscriptionListener l) {
if (TEST_LISTENERS == null) {
TEST_LISTENERS = new ArrayList<>();
}
TEST_LISTENERS.add(l);
}

/**
* For test use only.
*
* @param l the listener to remove.
*/
public static void removeTestSubscriptionListener(SubscriptionListener l) {
if (TEST_LISTENERS == null) {
return;
}
TEST_LISTENERS.remove(l);
}

/**
* For test use only.
*/
public static void clearTestSubscriptionListeners() {
TEST_LISTENERS = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A manger for subscriptions for a single entity type.
Expand All @@ -35,6 +37,8 @@
*/
class SubscriptionManager {

private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionManager.class.getName());

/**
* The main entity type of the subscriptions in this manager.
*/
Expand Down Expand Up @@ -75,6 +79,7 @@ public void handleEntityChanged(PersistenceManager persistenceManager, Entity en
}

public synchronized void addSubscription(Subscription subscription) {
LOGGER.debug(" Adding subscription to {}", subscription);
NavigationPropertyMain parentRelation = subscription.getParentRelation();
if (parentRelation != null) {
SubscriptionSetDirectParent parentSet = parentedSubscriptions.computeIfAbsent(parentRelation, t -> new SubscriptionSetDirectParent(mqttManager, parentRelation, topicCount));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,9 @@ public boolean equals(Object obj) {
return this.entityType == other.entityType;
}

@Override
public String toString() {
return getTopic();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public class ServerSettings {
public static final Requirement TASKING_REQ = Requirement.IOT_TASKING_1_0_TASKING_CAPABILITY_PROPERTIES;
public static final Requirement MULTIDATA_REQ = Requirement.IOT_SENSING_1_1_MULTI_DATASTREAM_CONSTRAINTS;

/**
* The timeout to use when waiting for MQTT messages.
*/
public static final long MQTT_TIMEOUT = 30000;

/**
* The root of FROST, without the v1.0.
*/
Expand All @@ -48,11 +53,6 @@ public class ServerSettings {
private final Set<Extension> extensions = EnumSet.noneOf(Extension.class);
private final Set<EntityType> enabledEntityTypes = EnumSet.noneOf(EntityType.class);

/**
* The timeout to use when waiting for MQTT messages.
*/
private final long mqttTimeOut = 30000;

public void setServiceRootUrl(String serviceRootUrl) {
if (serviceRootUrl.endsWith("/")) {
this.serviceRootUrl = serviceRootUrl.substring(0, serviceRootUrl.length() - 1);
Expand Down Expand Up @@ -177,7 +177,7 @@ public boolean implementsVersion(ServerVersion version) {
* @return the mqttTimeOut
*/
public long getMqttTimeOut() {
return mqttTimeOut;
return MQTT_TIMEOUT;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,55 +35,10 @@
import de.fraunhofer.iosb.ilt.statests.c01sensingcore.Capability1CoreOnlyTests11;
import de.fraunhofer.iosb.ilt.statests.c01sensingcore.Capability1Tests10;
import de.fraunhofer.iosb.ilt.statests.c01sensingcore.Capability1Tests11;
import de.fraunhofer.iosb.ilt.statests.c02cud.AdditionalTests10;
import de.fraunhofer.iosb.ilt.statests.c02cud.AdditionalTests11;
import de.fraunhofer.iosb.ilt.statests.c02cud.Capability2Tests10;
import de.fraunhofer.iosb.ilt.statests.c02cud.Capability2Tests11;
import de.fraunhofer.iosb.ilt.statests.c02cud.DeleteFilterTests10;
import de.fraunhofer.iosb.ilt.statests.c02cud.DeleteFilterTests11;
import de.fraunhofer.iosb.ilt.statests.c02cud.JsonPatchTests10;
import de.fraunhofer.iosb.ilt.statests.c02cud.JsonPatchTests11;
import de.fraunhofer.iosb.ilt.statests.c02cud.ResultTypesTests10;
import de.fraunhofer.iosb.ilt.statests.c02cud.ResultTypesTests11;
import de.fraunhofer.iosb.ilt.statests.c03filtering.Capability3Tests10;
import de.fraunhofer.iosb.ilt.statests.c03filtering.Capability3Tests11;
import de.fraunhofer.iosb.ilt.statests.c03filtering.DateTimeTests10;
import de.fraunhofer.iosb.ilt.statests.c03filtering.DateTimeTests11;
import de.fraunhofer.iosb.ilt.statests.c03filtering.FilterTests10;
import de.fraunhofer.iosb.ilt.statests.c03filtering.FilterTests11;
import de.fraunhofer.iosb.ilt.statests.c03filtering.GeoTests10;
import de.fraunhofer.iosb.ilt.statests.c03filtering.GeoTests11;
import de.fraunhofer.iosb.ilt.statests.c03filtering.JsonPropertiesTests10;
import de.fraunhofer.iosb.ilt.statests.c03filtering.JsonPropertiesTests11;
import de.fraunhofer.iosb.ilt.statests.c04batch.BatchTests10;
import de.fraunhofer.iosb.ilt.statests.c04batch.BatchTests11;
import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MdDateTimeTests10;
import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MdDateTimeTests11;
import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MultiDatastreamObsPropTests10;
import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MultiDatastreamObsPropTests11;
import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MultiDatastreamTests10;
import de.fraunhofer.iosb.ilt.statests.c05multidatastream.MultiDatastreamTests11;
import de.fraunhofer.iosb.ilt.statests.c06dataarrays.DataArrayTests10;
import de.fraunhofer.iosb.ilt.statests.c06dataarrays.DataArrayTests11;
import de.fraunhofer.iosb.ilt.statests.c07mqttcreate.Capability7Tests10;
import de.fraunhofer.iosb.ilt.statests.c07mqttcreate.Capability7Tests11;
import de.fraunhofer.iosb.ilt.statests.c08mqttsubscribe.Capability8Tests10;
import de.fraunhofer.iosb.ilt.statests.c08mqttsubscribe.Capability8Tests11;
import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthAnonReadTests10;
import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthAnonReadTests11;
import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthCryptPwTests10;
import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthCryptPwTests11;
import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthTests10;
import de.fraunhofer.iosb.ilt.statests.f01auth.BasicAuthTests11;
import de.fraunhofer.iosb.ilt.statests.f01auth.FineGrainedAuthTests11;
import de.fraunhofer.iosb.ilt.statests.f01auth.KeyCloakAnonReadTests10;
import de.fraunhofer.iosb.ilt.statests.f01auth.KeyCloakAnonReadTests11;
import de.fraunhofer.iosb.ilt.statests.f01auth.KeyCloakTests10;
import de.fraunhofer.iosb.ilt.statests.f01auth.KeyCloakTests11;
import de.fraunhofer.iosb.ilt.statests.f02customlinks.CustomLinksTests10;
import de.fraunhofer.iosb.ilt.statests.f02customlinks.CustomLinksTests11;
import de.fraunhofer.iosb.ilt.statests.f03metadata.MetadataTests10;
import de.fraunhofer.iosb.ilt.statests.f03metadata.MetadataTests11;
import de.fraunhofer.iosb.ilt.statests.util.HTTPMethods;
import de.fraunhofer.iosb.ilt.statests.util.HTTPMethods.HttpResponse;
import java.io.IOException;
Expand Down Expand Up @@ -131,55 +86,10 @@
Capability1CoreOnlyTests11.class,
Capability1Tests10.class,
Capability1Tests11.class,
Capability2Tests10.class,
Capability2Tests11.class,
AdditionalTests10.class,
AdditionalTests11.class,
DeleteFilterTests10.class,
DeleteFilterTests11.class,
JsonPatchTests10.class,
JsonPatchTests11.class,
ResultTypesTests10.class,
ResultTypesTests11.class,
Capability3Tests10.class,
Capability3Tests11.class,
DateTimeTests10.class,
DateTimeTests11.class,
FilterTests10.class,
FilterTests11.class,
GeoTests10.class,
GeoTests11.class,
JsonPropertiesTests10.class,
JsonPropertiesTests11.class,
BatchTests10.class,
BatchTests11.class,
MultiDatastreamTests10.class,
MultiDatastreamTests11.class,
MultiDatastreamObsPropTests10.class,
MultiDatastreamObsPropTests11.class,
MdDateTimeTests10.class,
MdDateTimeTests11.class,
DataArrayTests10.class,
DataArrayTests11.class,
Capability7Tests10.class,
Capability7Tests11.class,
Capability8Tests10.class,
Capability8Tests11.class,
BasicAuthTests10.class,
BasicAuthTests11.class,
BasicAuthAnonReadTests10.class,
BasicAuthAnonReadTests11.class,
BasicAuthCryptPwTests10.class,
BasicAuthCryptPwTests11.class,
FineGrainedAuthTests11.class,
KeyCloakTests10.class,
KeyCloakTests11.class,
KeyCloakAnonReadTests10.class,
KeyCloakAnonReadTests11.class,
CustomLinksTests10.class,
CustomLinksTests11.class,
MetadataTests10.class,
MetadataTests11.class,
TestSuite.SuiteFinaliser.class
})
@Suite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static de.fraunhofer.iosb.ilt.statests.util.EntityType.THING;
import static org.junit.jupiter.api.Assertions.fail;

import de.fraunhofer.iosb.ilt.frostserver.mqtt.MqttManager;
import de.fraunhofer.iosb.ilt.statests.ServerVersion;
import de.fraunhofer.iosb.ilt.statests.util.EntityType;
import de.fraunhofer.iosb.ilt.statests.util.Utils;
Expand Down Expand Up @@ -61,7 +62,6 @@ public class MqttHelper {
*/
public static final int WAIT_AFTER_INSERT = 100;
public static final int WAIT_AFTER_CLEANUP = 500;
public static final int WAIT_AFTER_SUBSCRIBE = 200;
public static final int QOS = 2;
public static final String CLIENT_ID = "STA-test_suite";
private final String mqttServerUri;
Expand Down Expand Up @@ -125,11 +125,8 @@ public <T> MqttBatchResult<T> executeRequests(Callable<T> action, String... topi
tempResult.put(topic, executor.submit(listener));
}

// Give the MQTT server time to process the subscriptions.
waitMillis(WAIT_AFTER_SUBSCRIBE);

try {
LOGGER.debug(" Calling action...");
LOGGER.debug(" Calling action...");
result.setActionResult(action.call());
} catch (Exception ex) {
LOGGER.error("Exception on server {} :", mqttServerUri, ex);
Expand All @@ -148,6 +145,7 @@ public <T> MqttBatchResult<T> executeRequests(Callable<T> action, String... topi
} finally {
executor.shutdownNow();
}
MqttManager.clearTestSubscriptionListeners();
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

import static org.junit.jupiter.api.Assertions.fail;

import de.fraunhofer.iosb.ilt.frostserver.mqtt.MqttManager;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.subscription.SubscriptionEvent;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.subscription.SubscriptionListener;
import de.fraunhofer.iosb.ilt.statests.ServerSettings;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -62,10 +66,22 @@ public MqttListener(String mqttServer, String topic) {

public void connect() {
try {
final CountDownLatch connectBarrier = new CountDownLatch(1);
final CountDownLatch connectBarrier = new CountDownLatch(2);
mqttClient = new MqttAsyncClient(mqttServerUri, MqttHelper.CLIENT_ID + "-" + topic + "-" + UUID.randomUUID(), new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
MqttManager.addTestSubscriptionListener(new SubscriptionListener() {
@Override
public void onSubscribe(SubscriptionEvent subscription) {
if (topic.equals(subscription.getTopic())) {
connectBarrier.countDown();
}
}

@Override
public void onUnsubscribe(SubscriptionEvent subscription) {
}
});
mqttClient.connect(connOpts, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Expand All @@ -81,9 +97,9 @@ public void messageArrived(String topic, MqttMessage mm) {
if (barrier.getCount() > 0) {
result = new JSONObject(new String(mm.getPayload(), StandardCharsets.UTF_8));
barrier.countDown();
LOGGER.debug("Received on {}. To go: {}", topic, barrier.getCount());
LOGGER.debug(" Received on {}. To go: {}", topic, barrier.getCount());
} else {
LOGGER.error("Received on {}. Barrier already empty!", topic);
LOGGER.error(" Received on {}. Barrier already empty!", topic);
}
}

Expand All @@ -96,7 +112,7 @@ public void deliveryComplete(IMqttDeliveryToken imdt) {
mqttClient.subscribe(topic, MqttHelper.QOS, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken imt) {
LOGGER.debug("Subscribed to {}", topic);
LOGGER.debug(" Subscribed to {}", topic);
connectBarrier.countDown();
}

Expand All @@ -119,7 +135,7 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
try {
connectBarrier.await();
connectBarrier.await(ServerSettings.MQTT_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
LOGGER.error("Exception:", ex);
}
Expand Down
5 changes: 4 additions & 1 deletion FROST-Server.Tests/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
</appender>

<logger name="de.fraunhofer.iosb.ilt.frostserver.parser" level="OFF"/>
<logger name="de.fraunhofer.iosb.ilt.statests" level="INFO"/>
<logger name="io.moquette" level="WARN"/>
<logger name="io.moquette.broker.metrics.MQTTMessageLogger" level="WARN"/>
<logger name="io.moquette.broker.SessionEventLoop" level="WARN"/>
Expand All @@ -18,6 +17,10 @@
<logger name="de.fraunhofer.iosb.ilt.frostclient.query.Query" level="INFO"/>
<logger name="de.fraunhofer.iosb.ilt.frostserver.persistence.pgjooq" level="INFO"/>
<logger name="de.fraunhofer.iosb.ilt.statests" level="INFO"/>
<logger name="de.fraunhofer.iosb.ilt.statests.util.mqtt" level="DEBUG"/>
<logger name="de.fraunhofer.iosb.ilt.frostserver.mqtt.SubscriptionManager" level="DEBUG"/>
<logger name="de.fraunhofer.iosb.ilt.frostserver.mqtt.MqttManager" level="DEBUG"/>
<logger name="de.fraunhofer.iosb.ilt.frostserver.messagebus" level="DEBUG"/>

<root level="INFO">
<appender-ref ref="STDOUT" />
Expand Down