Skip to content

Commit

Permalink
Test Shared Subscription (#587)
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred2g authored Dec 21, 2023
1 parent 0bd6389 commit 5908519
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 8 deletions.
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ if(NOT BYO_CRYPTO)
add_net_test_case(Mqtt5InterruptUnsub)
add_net_test_case(Mqtt5InterruptPublishQoS1)
add_net_test_case(Mqtt5OperationStatisticsSimple)
add_net_test_case(Mqtt5SharedSubscriptionTest)

# Mqtt5-to-3 Adapter
add_test_case(Mqtt5to3AdapterNewConnectionMin)
Expand Down
230 changes: 222 additions & 8 deletions tests/Mqtt5ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <aws/iot/MqttCommon.h>
#include <aws/testing/aws_test_harness.h>

#include <atomic>
#include <utility>

using namespace Aws::Crt;
Expand All @@ -29,18 +30,18 @@ static void s_setupConnectionLifeCycle(
{
mqtt5Options.WithClientConnectionSuccessCallback(
[&connectionPromise, clientName](const OnConnectionSuccessEventData &) {
printf("[MQTT5]%s Connection Success.", clientName);
printf("[MQTT5]%s Connection Success.\n", clientName);
connectionPromise.set_value(true);
});

mqtt5Options.WithClientConnectionFailureCallback(
[&connectionPromise, clientName](const OnConnectionFailureEventData &eventData) {
printf("[MQTT5]%s Connection failed with error : %s", clientName, aws_error_debug_str(eventData.errorCode));
connectionPromise.set_value(false);
});
mqtt5Options.WithClientConnectionFailureCallback([&connectionPromise,
clientName](const OnConnectionFailureEventData &eventData) {
printf("[MQTT5]%s Connection failed with error : %s\n", clientName, aws_error_debug_str(eventData.errorCode));
connectionPromise.set_value(false);
});

mqtt5Options.WithClientStoppedCallback([&stoppedPromise, clientName](const OnStoppedEventData &) {
printf("[MQTT5]%s Stopped", clientName);
printf("[MQTT5]%s Stopped\n", clientName);
stoppedPromise.set_value();
});
}
Expand Down Expand Up @@ -1914,7 +1915,9 @@ static int s_TestMqtt5WillTest(Aws::Crt::Allocator *allocator, void *)
/* Subscribe to test topic */
Mqtt5::Subscription subscription(TEST_TOPIC, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
std::shared_ptr<Mqtt5::SubscribePacket> subscribe = std::make_shared<Mqtt5::SubscribePacket>(allocator);

subscribe->WithSubscription(std::move(subscription));

std::promise<void> subscribed;
ASSERT_TRUE(subscriber->Subscribe(
subscribe, [&subscribed](int, std::shared_ptr<Mqtt5::SubAckPacket>) { subscribed.set_value(); }));
Expand All @@ -1936,6 +1939,217 @@ static int s_TestMqtt5WillTest(Aws::Crt::Allocator *allocator, void *)
}
AWS_TEST_CASE(Mqtt5WillTest, s_TestMqtt5WillTest)

/*
* Shared Subscription test
*/
static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, void *)
{
Mqtt5TestEnvVars mqtt5TestVars(allocator, MQTT5CONNECT_IOT_CORE);
if (!mqtt5TestVars)
{
printf("Environment Variables are not set for the test, skip the test");
return AWS_OP_SKIP;
}

ApiHandle apiHandle(allocator);

String currentUUID = Aws::Crt::UUID().ToString();

const String TEST_TOPIC = "test/MQTT5_Binding_CPP_" + currentUUID;
const String sharedTopicFilter = "$share/crttest/test/MQTT5_Binding_CPP_" + currentUUID;

const int MESSAGE_NUMBER = 10;
std::atomic<int> client_messages(0);
bool client1_received = false;
bool client2_received = false;

std::vector<int> receivedMessages(MESSAGE_NUMBER);
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
receivedMessages.push_back(0);
}

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(subscribe_builder);

std::promise<void> client_received;

auto get_on_message_callback = [&](bool &received) {
return [&](const PublishReceivedEventData &eventData) -> int {
String topic = eventData.publishPacket->getTopic();
if (topic == TEST_TOPIC)
{
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
++receivedMessages[message_int];
received = true; // this line has changed

bool exchanged = false;
int desired = 11;
int tested = 10;
client_messages++;
exchanged = client_messages.compare_exchange_strong(tested, desired);
if (exchanged == true)
{
client_received.set_value();
}
}
return 0;
};
};
auto onMessage_client1 = get_on_message_callback(client1_received);
auto onMessage_client2 = get_on_message_callback(client2_received);

subscribe_builder->WithPublishReceivedCallback(onMessage_client1);

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder2 =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(subscribe_builder2);

subscribe_builder2->WithPublishReceivedCallback(onMessage_client2);

Aws::Iot::Mqtt5ClientBuilder *publish_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(publish_builder);

std::promise<bool> connectionPromise;
std::promise<void> stoppedPromise;

std::promise<bool> connectionPromise2;
std::promise<void> stoppedPromise2;

std::promise<bool> connectionPromise3;
std::promise<void> stoppedPromise3;

std::shared_ptr<Aws::Crt::Mqtt5::ConnectPacket> packetConnect = std::make_shared<Aws::Crt::Mqtt5::ConnectPacket>();

/* first subscriber */
/* set a different client id so we are not disconnected from the server */
packetConnect->WithClientId("s_TestMqtt5SharedSubscriptionTest" + Aws::Crt::UUID().ToString());
subscribe_builder->WithConnectOptions(packetConnect);
s_setupConnectionLifeCycle(subscribe_builder, connectionPromise, stoppedPromise, "Subscriber 1");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client = subscribe_builder->Build();
ASSERT_TRUE(mqtt5Client);

/* second subscriber */
/* set a different client id so we are not disconnected from the server */
packetConnect->WithClientId("s_TestMqtt5SharedSubscriptionTest" + Aws::Crt::UUID().ToString());
subscribe_builder2->WithConnectOptions(packetConnect);
s_setupConnectionLifeCycle(subscribe_builder2, connectionPromise2, stoppedPromise2, "Subscriber 2");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client2 = subscribe_builder2->Build();
ASSERT_TRUE(mqtt5Client2);

/* publisher */
/* set a different client id so we are not disconnected from the server */
packetConnect->WithClientId("s_TestMqtt5SharedSubscriptionTest" + Aws::Crt::UUID().ToString());
publish_builder->WithConnectOptions(packetConnect);
s_setupConnectionLifeCycle(publish_builder, connectionPromise3, stoppedPromise3, "Publisher");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Publisher = publish_builder->Build();
ASSERT_TRUE(mqtt5Publisher);

/* Connect all clients */
ASSERT_TRUE(mqtt5Client->Start());
ASSERT_TRUE(mqtt5Client2->Start());
ASSERT_TRUE(mqtt5Publisher->Start());

/* Wait for all clents to connect */
ASSERT_TRUE(connectionPromise.get_future().get());
ASSERT_TRUE(connectionPromise2.get_future().get());
ASSERT_TRUE(connectionPromise3.get_future().get());

/* Subscribe to test topic */
Mqtt5::Subscription subscription(sharedTopicFilter, Mqtt5::QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
std::shared_ptr<Mqtt5::SubscribePacket> subscribe = std::make_shared<Mqtt5::SubscribePacket>(allocator);
subscribe->WithSubscription(std::move(subscription));

/* Subscribe to test topic */
Mqtt5::Subscription subscription2(sharedTopicFilter, Mqtt5::QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
std::shared_ptr<Mqtt5::SubscribePacket> subscribe2 = std::make_shared<Mqtt5::SubscribePacket>(allocator);
subscribe2->WithSubscription(std::move(subscription2));

std::promise<void> suback;
auto onSubAck = [&](int, std::shared_ptr<SubAckPacket>) { suback.set_value(); };

/* subscribe first client */
ASSERT_TRUE(mqtt5Client->Subscribe(subscribe, onSubAck));
suback.get_future().wait();

suback = std::promise<void>();

/* subscribe second client */
ASSERT_TRUE(mqtt5Client2->Subscribe(subscribe2, onSubAck));
suback.get_future().wait();

/* Publish message 10 to test topic */
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
std::string payload = std::to_string(i);
std::shared_ptr<Mqtt5::PublishPacket> publish = std::make_shared<Mqtt5::PublishPacket>(
TEST_TOPIC, ByteCursorFromCString(payload.c_str()), Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
ASSERT_TRUE(mqtt5Publisher->Publish(publish));
}

/* Wait for all packets to be received on both clients */
client_received.get_future().wait();

/* Unsubscribe from the topic from both clients*/
Vector<String> unsubList;
unsubList.push_back(TEST_TOPIC);
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client1 =
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client1->WithTopicFilters(unsubList);
ASSERT_TRUE(mqtt5Client->Unsubscribe(unsubscribe_client1));

std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client2 =
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client2->WithTopicFilters(unsubList);
ASSERT_TRUE(mqtt5Client2->Unsubscribe(unsubscribe_client2));

/* make sure all messages are received */
ASSERT_INT_EQUALS(MESSAGE_NUMBER + 1, client_messages); /* We are adding one at the end, so 10 messages received */

/* makes sure both clients received at least one message */
ASSERT_TRUE(client1_received);
ASSERT_TRUE(client2_received);

/* make sure all messages are received with no duplicates*/
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
ASSERT_TRUE(receivedMessages[i] > 0);
}
/* Stop all clients */
ASSERT_TRUE(mqtt5Client->Stop());
ASSERT_TRUE(mqtt5Client2->Stop());
ASSERT_TRUE(mqtt5Publisher->Stop());

/* Wait for all clents to disconnect */
stoppedPromise.get_future().get();
stoppedPromise2.get_future().get();
stoppedPromise3.get_future().get();

delete subscribe_builder;
delete subscribe_builder2;
delete publish_builder;

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(Mqtt5SharedSubscriptionTest, s_TestMqtt5SharedSubscriptionTest)

//////////////////////////////////////////////////////////
// Error Operation Tests [ErrorOp-UC]
//////////////////////////////////////////////////////////
Expand Down Expand Up @@ -2146,7 +2360,7 @@ static int s_TestMqtt5QoS1SubPub(Aws::Crt::Allocator *allocator, void *)

subscribed.get_future().get();

/* Publish message 10 to test topic */
/* Publish 10 messages to test topic */
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
std::string payload = std::to_string(i);
Expand Down

0 comments on commit 5908519

Please sign in to comment.