diff --git a/crt/aws-c-io b/crt/aws-c-io index 0642c6842..a3555c86b 160000 --- a/crt/aws-c-io +++ b/crt/aws-c-io @@ -1 +1 @@ -Subproject commit 0642c68425f126e833fcf91bdc53dfc32366d0ba +Subproject commit a3555c86bd10149a1ab0c1b2810756ec54cfbe3a diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index d5c268f70..b777be4aa 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit d5c268f70aeccf38e75d3e74ce4eb9629df02e2a +Subproject commit b777be4aa52d233391781256d40006d353e848b9 diff --git a/include/aws/crt/mqtt/Mqtt5Client.h b/include/aws/crt/mqtt/Mqtt5Client.h index 0704546f1..ff7867f1c 100644 --- a/include/aws/crt/mqtt/Mqtt5Client.h +++ b/include/aws/crt/mqtt/Mqtt5Client.h @@ -5,6 +5,7 @@ */ #include #include +#include namespace Aws { @@ -27,6 +28,8 @@ namespace Aws class UnSubAckPacket; class Mqtt5ClientCore; + class Mqtt5to3AdapterOptions; + struct AWS_CRT_CPP_API ReconnectOptions { /** @@ -315,6 +318,13 @@ namespace Aws */ const Mqtt5ClientOperationStatistics &GetOperationStatistics() noexcept; + /** + * Create a new MqttConnection object from the Mqtt5Client. + * + * @return std::shared_ptr + */ + std::shared_ptr NewConnection() noexcept; + virtual ~Mqtt5Client(); private: @@ -324,6 +334,56 @@ namespace Aws std::shared_ptr m_client_core; Mqtt5ClientOperationStatistics m_operationStatistics; + + ScopedResource m_mqtt5to3AdapterOptions; + }; + + /** + * The extra options required to build MqttConnection from Mqtt5Client + */ + class Mqtt5to3AdapterOptions + { + friend class Mqtt5ClientOptions; + friend class Mqtt5ClientCore; + + public: + /* Default constructor */ + Mqtt5to3AdapterOptions(); + + private: + /* Host name of the MQTT server to connect to. */ + Crt::String m_hostName; + + /* Port to connect to */ + uint16_t m_port; + + /* + * If the MqttConnection should overwrite the websocket config. If set to true, m_webSocketInterceptor + * must be set. + */ + bool m_overwriteWebsocket; + + /* + * The transform function invoked during websocket handshake. + */ + Crt::Mqtt::OnWebSocketHandshakeIntercept m_webSocketInterceptor; + + /** + * Controls socket properties of the underlying MQTT connections made by the client. Leave undefined to + * use defaults (no TCP keep alive, 10 second socket timeout). + */ + Crt::Io::SocketOptions m_socketOptions; + + /** + * TLS context for secure socket connections. + * If undefined, a plaintext connection will be used. + */ + Crt::Optional m_tlsConnectionOptions; + + /** + * Configures (tunneling) HTTP proxy usage when establishing MQTT connections + */ + Crt::Optional m_proxyOptions; }; /** @@ -564,6 +624,14 @@ namespace Aws Mqtt5ClientOptions &operator=(Mqtt5ClientOptions &&) = delete; private: + /* + * Allocate and create a new Mqtt5to3AdapterOptions. This function is internally used by Mqtt5Client to + * support the Mqtt5to3Adapter. + * + * @return Mqtt5to3AdapterOptions + */ + ScopedResource NewMqtt5to3AdapterOptions() const noexcept; + /** * This callback allows a custom transformation of the HTTP request that acts as the websocket * handshake. Websockets will be used if this is set to a valid transformation callback. To use @@ -632,7 +700,7 @@ namespace Aws /** * TLS context for secure socket connections. - * If undefined, then a plaintext connection will be used. + * If undefined, a plaintext connection will be used. */ Crt::Optional m_tlsConnectionOptions; diff --git a/include/aws/crt/mqtt/Mqtt5ClientCore.h b/include/aws/crt/mqtt/Mqtt5ClientCore.h index 1f6537b39..77ba3bf1b 100644 --- a/include/aws/crt/mqtt/Mqtt5ClientCore.h +++ b/include/aws/crt/mqtt/Mqtt5ClientCore.h @@ -107,6 +107,17 @@ namespace Aws private: Mqtt5ClientCore(const Mqtt5ClientOptions &options, Allocator *allocator = ApiAllocator()) noexcept; + /** + * Create a new connection object over plain text from the Mqtt5Client. The client must outlive + * all of its connection instances. The Mqtt5 Options will be overwritten by the options passed in here. + * + * @param options the options from Mqtt5Client used to support the MqttConnection + * + * @return std::shared_ptr + */ + std::shared_ptr NewConnection( + const Mqtt5::Mqtt5to3AdapterOptions *options) noexcept; + /* Static Callbacks */ static void s_publishCompletionCallback( enum aws_mqtt5_packet_type packet_type, diff --git a/include/aws/crt/mqtt/MqttClient.h b/include/aws/crt/mqtt/MqttClient.h index 197426c62..4ae6161c1 100644 --- a/include/aws/crt/mqtt/MqttClient.h +++ b/include/aws/crt/mqtt/MqttClient.h @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -30,6 +31,11 @@ namespace Aws class HttpRequest; } + namespace Mqtt5 + { + class Mqtt5ClientCore; + } + namespace Mqtt { class MqttClient; @@ -212,7 +218,8 @@ namespace Aws }; /** - * Represents a persistent Mqtt Connection. The memory is owned by MqttClient. + * Represents a persistent Mqtt Connection. The memory is owned by MqttClient or + * Mqtt5Client. * To get a new instance of this class, see MqttClient::NewConnection. Unless * specified all function arguments need only to live through the duration of the * function call. @@ -220,6 +227,7 @@ namespace Aws class AWS_CRT_CPP_API MqttConnection final { friend class MqttClient; + friend class Mqtt5::Mqtt5ClientCore; public: ~MqttConnection(); @@ -444,6 +452,7 @@ namespace Aws bool m_useTls; bool m_useWebsocket; MqttConnectionOperationStatistics m_operationStatistics; + Allocator *m_allocator; MqttConnection( aws_mqtt_client *client, @@ -460,6 +469,23 @@ namespace Aws const Io::SocketOptions &socketOptions, bool useWebsocket) noexcept; + MqttConnection( + aws_mqtt5_client *mqtt5Client, + const char *hostName, + uint16_t port, + const Io::SocketOptions &socketOptions, + const Crt::Io::TlsConnectionOptions &tlsConnectionOptions, + bool useWebsocket, + Allocator *allocator) noexcept; + + MqttConnection( + aws_mqtt5_client *mqtt5Client, + const char *hostName, + uint16_t port, + const Io::SocketOptions &socketOptions, + bool useWebsocket, + Allocator *allocator) noexcept; + static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData); static void s_onConnectionCompleted( aws_mqtt_client_connection *, @@ -526,7 +552,8 @@ namespace Aws MqttConnection *self, const char *hostName, uint16_t port, - const Io::SocketOptions &socketOptions); + const Io::SocketOptions &socketOptions, + aws_mqtt5_client *mqtt5Client = nullptr); }; /** diff --git a/include/aws/iot/Mqtt5Client.h b/include/aws/iot/Mqtt5Client.h index 2ebfd46fc..d02877bdc 100644 --- a/include/aws/iot/Mqtt5Client.h +++ b/include/aws/iot/Mqtt5Client.h @@ -522,7 +522,7 @@ namespace Aws /** * TLS context for secure socket connections. - * If undefined, then a plaintext connection will be used. + * If undefined, a plaintext connection will be used. */ Crt::Optional m_tlsConnectionOptions; diff --git a/source/mqtt/Mqtt5Client.cpp b/source/mqtt/Mqtt5Client.cpp index 9b145498e..157abc7ae 100644 --- a/source/mqtt/Mqtt5Client.cpp +++ b/source/mqtt/Mqtt5Client.cpp @@ -21,10 +21,23 @@ namespace Aws { namespace Mqtt5 { + Mqtt5to3AdapterOptions::Mqtt5to3AdapterOptions() : m_port(0), m_overwriteWebsocket(false) {} + Mqtt5Client::Mqtt5Client(const Mqtt5ClientOptions &options, Allocator *allocator) noexcept : m_client_core(nullptr) { m_client_core = Mqtt5ClientCore::NewMqtt5ClientCore(options, allocator); + m_mqtt5to3AdapterOptions = options.NewMqtt5to3AdapterOptions(); + } + + std::shared_ptr Mqtt5Client::NewConnection() noexcept + { + if (m_client_core == nullptr) + { + AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "Failed to create mqtt3 connection: Mqtt5 Client is invalid."); + return nullptr; + } + return m_client_core->NewConnection(m_mqtt5to3AdapterOptions.get()); } Mqtt5Client::~Mqtt5Client() @@ -233,6 +246,40 @@ namespace Aws Mqtt5ClientOptions::~Mqtt5ClientOptions() {} + ScopedResource Mqtt5ClientOptions::NewMqtt5to3AdapterOptions() const noexcept + { + Allocator *allocator = m_allocator; + ScopedResource adapterOptions = ScopedResource( + Crt::New(allocator), + [allocator](Mqtt5to3AdapterOptions *options) { Crt::Delete(options, allocator); }); + + adapterOptions->m_hostName = m_hostName; + adapterOptions->m_port = m_port; + adapterOptions->m_socketOptions = m_socketOptions; + if (m_proxyOptions.has_value()) + adapterOptions->m_proxyOptions = m_proxyOptions.value(); + if (m_tlsConnectionOptions.has_value()) + { + adapterOptions->m_tlsConnectionOptions = m_tlsConnectionOptions.value(); + } + if (websocketHandshakeTransform) + { + adapterOptions->m_overwriteWebsocket = true; + + auto signerTransform = [this]( + std::shared_ptr req, + const Crt::Mqtt::OnWebSocketHandshakeInterceptComplete &onComplete) { + this->websocketHandshakeTransform(std::move(req), onComplete); + }; + adapterOptions->m_webSocketInterceptor = std::move(signerTransform); + } + else + { + adapterOptions->m_overwriteWebsocket = false; + } + return adapterOptions; + } + Mqtt5ClientOptions &Mqtt5ClientOptions::WithHostName(Crt::String hostname) { m_hostName = std::move(hostname); diff --git a/source/mqtt/Mqtt5ClientCore.cpp b/source/mqtt/Mqtt5ClientCore.cpp index 38ef27162..fda31c77d 100644 --- a/source/mqtt/Mqtt5ClientCore.cpp +++ b/source/mqtt/Mqtt5ClientCore.cpp @@ -195,6 +195,58 @@ namespace Aws } } + std::shared_ptr Mqtt5ClientCore::NewConnection( + const Mqtt5::Mqtt5to3AdapterOptions *options) noexcept + { + // If you're reading this and asking.... why is this so complicated? Why not use make_shared + // or allocate_shared? Well, MqttConnection constructors are private and stl is dumb like that. + // so, we do it manually. + Allocator *allocator = this->m_allocator; + Crt::Mqtt::MqttConnection *toSeat = reinterpret_cast( + aws_mem_acquire(allocator, sizeof(Crt::Mqtt::MqttConnection))); + if (!toSeat) + { + return nullptr; + } + + if (options->m_tlsConnectionOptions.has_value()) + { + toSeat = new (toSeat) Crt::Mqtt::MqttConnection( + m_client, + options->m_hostName.c_str(), + options->m_port, + options->m_socketOptions, + options->m_tlsConnectionOptions.value(), + options->m_overwriteWebsocket, + allocator); + } + else + { + toSeat = new (toSeat) Crt::Mqtt::MqttConnection( + m_client, + options->m_hostName.c_str(), + options->m_port, + options->m_socketOptions, + options->m_overwriteWebsocket, + allocator); + } + if (options->m_proxyOptions.has_value()) + { + toSeat->SetHttpProxyOptions(options->m_proxyOptions.value()); + } + + if (options->m_overwriteWebsocket) + { + toSeat->WebsocketInterceptor = options->m_webSocketInterceptor; + } + + return std::shared_ptr( + toSeat, [allocator](Crt::Mqtt::MqttConnection *connection) { + connection->~MqttConnection(); + aws_mem_release(allocator, reinterpret_cast(connection)); + }); + } + void Mqtt5ClientCore::s_publishCompletionCallback( enum aws_mqtt5_packet_type packet_type, const void *publishCompletionPacket, @@ -469,7 +521,7 @@ namespace Aws const Mqtt5ClientOptions &options, Allocator *allocator) noexcept { - /* Copied from MqttClient.cpp:ln754 (MqttClient::NewConnection) */ + /* Copied from MqttClient.cpp: MqttClient::NewConnection) */ /* As the constructor is private, make share would not work here. We do make_share manually. */ Mqtt5ClientCore *toSeat = reinterpret_cast(aws_mem_acquire(allocator, sizeof(Mqtt5ClientCore))); diff --git a/source/mqtt/MqttClient.cpp b/source/mqtt/MqttClient.cpp index 82beffa75..8ce04205b 100644 --- a/source/mqtt/MqttClient.cpp +++ b/source/mqtt/MqttClient.cpp @@ -265,14 +265,22 @@ namespace Aws MqttConnection *self, const char *hostName, uint16_t port, - const Io::SocketOptions &socketOptions) + const Io::SocketOptions &socketOptions, + aws_mqtt5_client *mqtt5Client) { self->m_hostName = String(hostName); self->m_port = port; self->m_socketOptions = socketOptions; - self->m_underlyingConnection = aws_mqtt_client_connection_new(self->m_owningClient); + if (mqtt5Client) + { + self->m_underlyingConnection = aws_mqtt_client_connection_new_from_mqtt5_client(mqtt5Client); + } + else + { + self->m_underlyingConnection = aws_mqtt_client_connection_new(self->m_owningClient); + } if (self->m_underlyingConnection) { @@ -293,6 +301,10 @@ namespace Aws aws_mqtt_client_connection_set_connection_closed_handler( self->m_underlyingConnection, MqttConnection::s_onConnectionClosed, self); } + else + { + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "Failed to initialize Mqtt Connection"); + } } void MqttConnection::s_onWebsocketHandshake( @@ -303,7 +315,7 @@ namespace Aws { auto connection = reinterpret_cast(user_data); - Allocator *allocator = connection->m_owningClient->allocator; + Allocator *allocator = connection->m_allocator; // we have to do this because of private constructors. auto toSeat = reinterpret_cast(aws_mem_acquire(allocator, sizeof(Http::HttpRequest))); @@ -329,7 +341,7 @@ namespace Aws const Crt::Io::TlsContext &tlsContext, bool useWebsocket) noexcept : m_owningClient(client), m_tlsContext(tlsContext), m_tlsOptions(tlsContext.NewConnectionOptions()), - m_onAnyCbData(nullptr), m_useTls(true), m_useWebsocket(useWebsocket) + m_onAnyCbData(nullptr), m_useTls(true), m_useWebsocket(useWebsocket), m_allocator(client->allocator) { s_connectionInit(this, hostName, port, socketOptions); } @@ -340,11 +352,39 @@ namespace Aws uint16_t port, const Io::SocketOptions &socketOptions, bool useWebsocket) noexcept - : m_owningClient(client), m_onAnyCbData(nullptr), m_useTls(false), m_useWebsocket(useWebsocket) + : m_owningClient(client), m_onAnyCbData(nullptr), m_useTls(false), m_useWebsocket(useWebsocket), + m_allocator(client->allocator) { s_connectionInit(this, hostName, port, socketOptions); } + MqttConnection::MqttConnection( + aws_mqtt5_client *mqtt5Client, + const char *hostName, + uint16_t port, + const Io::SocketOptions &socketOptions, + const Crt::Io::TlsConnectionOptions &tlsOptions, + bool useWebsocket, + Allocator *allocator) noexcept + : m_owningClient(nullptr), m_tlsOptions(tlsOptions), m_onAnyCbData(nullptr), m_useTls(true), + m_useWebsocket(useWebsocket), m_allocator(allocator) + { + s_connectionInit(this, hostName, port, socketOptions, mqtt5Client); + } + + MqttConnection::MqttConnection( + aws_mqtt5_client *mqtt5Client, + const char *hostName, + uint16_t port, + const Io::SocketOptions &socketOptions, + bool useWebsocket, + Allocator *allocator) noexcept + : m_owningClient(nullptr), m_onAnyCbData(nullptr), m_useTls(false), m_useWebsocket(useWebsocket), + m_allocator(allocator) + { + s_connectionInit(this, hostName, port, socketOptions, mqtt5Client); + } + MqttConnection::~MqttConnection() { if (*this) @@ -491,7 +531,7 @@ namespace Aws bool MqttConnection::SetOnMessageHandler(OnMessageReceivedHandler &&onMessage) noexcept { - auto pubCallbackData = Aws::Crt::New(m_owningClient->allocator); + auto pubCallbackData = Aws::Crt::New(m_allocator); if (!pubCallbackData) { @@ -500,7 +540,7 @@ namespace Aws pubCallbackData->connection = this; pubCallbackData->onMessageReceived = std::move(onMessage); - pubCallbackData->allocator = m_owningClient->allocator; + pubCallbackData->allocator = m_allocator; if (!aws_mqtt_client_connection_set_on_any_publish_handler( m_underlyingConnection, s_onPublish, pubCallbackData)) @@ -535,7 +575,7 @@ namespace Aws OnMessageReceivedHandler &&onMessage, OnSubAckHandler &&onSubAck) noexcept { - auto pubCallbackData = Crt::New(m_owningClient->allocator); + auto pubCallbackData = Crt::New(m_allocator); if (!pubCallbackData) { @@ -544,21 +584,21 @@ namespace Aws pubCallbackData->connection = this; pubCallbackData->onMessageReceived = std::move(onMessage); - pubCallbackData->allocator = m_owningClient->allocator; + pubCallbackData->allocator = m_allocator; - auto subAckCallbackData = Crt::New(m_owningClient->allocator); + auto subAckCallbackData = Crt::New(m_allocator); if (!subAckCallbackData) { - Crt::Delete(pubCallbackData, m_owningClient->allocator); + Crt::Delete(pubCallbackData, m_allocator); return 0; } subAckCallbackData->connection = this; - subAckCallbackData->allocator = m_owningClient->allocator; + subAckCallbackData->allocator = m_allocator; subAckCallbackData->onSubAck = std::move(onSubAck); subAckCallbackData->topic = nullptr; - subAckCallbackData->allocator = m_owningClient->allocator; + subAckCallbackData->allocator = m_allocator; ByteBuf topicFilterBuf = aws_byte_buf_from_c_str(topicFilter); ByteCursor topicFilterCur = aws_byte_cursor_from_buf(&topicFilterBuf); @@ -608,7 +648,7 @@ namespace Aws OnMultiSubAckHandler &&onSubAck) noexcept { uint16_t packetId = 0; - auto subAckCallbackData = Crt::New(m_owningClient->allocator); + auto subAckCallbackData = Crt::New(m_allocator); if (!subAckCallbackData) { @@ -619,15 +659,15 @@ namespace Aws AWS_ZERO_STRUCT(multiPub); if (aws_array_list_init_dynamic( - &multiPub, m_owningClient->allocator, topicFilters.size(), sizeof(aws_mqtt_topic_subscription))) + &multiPub, m_allocator, topicFilters.size(), sizeof(aws_mqtt_topic_subscription))) { - Crt::Delete(subAckCallbackData, m_owningClient->allocator); + Crt::Delete(subAckCallbackData, m_allocator); return 0; } for (auto &topicFilter : topicFilters) { - auto pubCallbackData = Crt::New(m_owningClient->allocator); + auto pubCallbackData = Crt::New(m_allocator); if (!pubCallbackData) { @@ -636,7 +676,7 @@ namespace Aws pubCallbackData->connection = this; pubCallbackData->onMessageReceived = topicFilter.second; - pubCallbackData->allocator = m_owningClient->allocator; + pubCallbackData->allocator = m_allocator; ByteBuf topicFilterBuf = aws_byte_buf_from_c_str(topicFilter.first); ByteCursor topicFilterCur = aws_byte_cursor_from_buf(&topicFilterBuf); @@ -652,10 +692,10 @@ namespace Aws } subAckCallbackData->connection = this; - subAckCallbackData->allocator = m_owningClient->allocator; + subAckCallbackData->allocator = m_allocator; subAckCallbackData->onSubAck = std::move(onSubAck); subAckCallbackData->topic = nullptr; - subAckCallbackData->allocator = m_owningClient->allocator; + subAckCallbackData->allocator = m_allocator; packetId = aws_mqtt_client_connection_subscribe_multiple( m_underlyingConnection, &multiPub, s_onMultiSubAck, subAckCallbackData); @@ -669,10 +709,10 @@ namespace Aws aws_mqtt_topic_subscription *subscription = NULL; aws_array_list_get_at_ptr(&multiPub, reinterpret_cast(&subscription), i); auto pubCallbackData = reinterpret_cast(subscription->on_publish_ud); - Crt::Delete(pubCallbackData, m_owningClient->allocator); + Crt::Delete(pubCallbackData, m_allocator); } - Crt::Delete(subAckCallbackData, m_owningClient->allocator); + Crt::Delete(subAckCallbackData, m_allocator); } aws_array_list_clean_up(&multiPub); @@ -684,7 +724,7 @@ namespace Aws const char *topicFilter, OnOperationCompleteHandler &&onOpComplete) noexcept { - auto opCompleteCallbackData = Crt::New(m_owningClient->allocator); + auto opCompleteCallbackData = Crt::New(m_allocator); if (!opCompleteCallbackData) { @@ -692,7 +732,7 @@ namespace Aws } opCompleteCallbackData->connection = this; - opCompleteCallbackData->allocator = m_owningClient->allocator; + opCompleteCallbackData->allocator = m_allocator; opCompleteCallbackData->onOperationComplete = std::move(onOpComplete); opCompleteCallbackData->topic = nullptr; ByteBuf topicFilterBuf = aws_byte_buf_from_c_str(topicFilter); @@ -703,7 +743,7 @@ namespace Aws if (!packetId) { - Crt::Delete(opCompleteCallbackData, m_owningClient->allocator); + Crt::Delete(opCompleteCallbackData, m_allocator); } return packetId; @@ -717,25 +757,24 @@ namespace Aws OnOperationCompleteHandler &&onOpComplete) noexcept { - auto opCompleteCallbackData = Crt::New(m_owningClient->allocator); + auto opCompleteCallbackData = Crt::New(m_allocator); if (!opCompleteCallbackData) { return 0; } size_t topicLen = strnlen(topic, AWS_MQTT_MAX_TOPIC_LENGTH) + 1; - char *topicCpy = - reinterpret_cast(aws_mem_calloc(m_owningClient->allocator, topicLen, sizeof(char))); + char *topicCpy = reinterpret_cast(aws_mem_calloc(m_allocator, topicLen, sizeof(char))); if (!topicCpy) { - Crt::Delete(opCompleteCallbackData, m_owningClient->allocator); + Crt::Delete(opCompleteCallbackData, m_allocator); } memcpy(topicCpy, topic, topicLen); opCompleteCallbackData->connection = this; - opCompleteCallbackData->allocator = m_owningClient->allocator; + opCompleteCallbackData->allocator = m_allocator; opCompleteCallbackData->onOperationComplete = std::move(onOpComplete); opCompleteCallbackData->topic = topicCpy; ByteCursor topicCur = aws_byte_cursor_from_array(topicCpy, topicLen - 1); @@ -752,8 +791,8 @@ namespace Aws if (!packetId) { - aws_mem_release(m_owningClient->allocator, reinterpret_cast(topicCpy)); - Crt::Delete(opCompleteCallbackData, m_owningClient->allocator); + aws_mem_release(m_allocator, reinterpret_cast(topicCpy)); + Crt::Delete(opCompleteCallbackData, m_allocator); } return packetId; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8ab6a68bd..51002156d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -219,6 +219,14 @@ if(NOT BYO_CRYPTO) add_net_test_case(Mqtt5InterruptUnsub) add_net_test_case(Mqtt5InterruptPublishQoS1) add_net_test_case(Mqtt5OperationStatisticsSimple) + + # Mqtt5-to-3 Adapter + add_net_test_case(Mqtt5to3AdapterNewConnectionMin) + add_net_test_case(Mqtt5to3AdapterDirectConnectionMinimal) + add_net_test_case(Mqtt5to3AdapterDirectConnectionFull) + add_net_test_case(Mqtt5to3AdapterWSConnectionMinimal) + add_net_test_case(Mqtt5to3AdapterWithIoTConnection) + add_net_test_case(Mqtt5to3AdapterDirectConnectionWithMutualTLS) endif() add_test_case(RuleEngine) diff --git a/tests/Mqtt5ClientTest.cpp b/tests/Mqtt5ClientTest.cpp index db902f873..641b5e7eb 100644 --- a/tests/Mqtt5ClientTest.cpp +++ b/tests/Mqtt5ClientTest.cpp @@ -614,14 +614,14 @@ static int s_TestMqtt5DirectConnectionWithMutualTLS(Aws::Crt::Allocator *allocat Aws::Crt::Io::TlsContext tlsContext(tlsCtxOptions, Aws::Crt::Io::TlsMode::CLIENT, allocator); ASSERT_TRUE(tlsContext); Aws::Crt::Io::TlsConnectionOptions tlsConnection = tlsContext.NewConnectionOptions(); - ASSERT_TRUE(tlsConnection); + ASSERT_TRUE(tlsConnection.SetAlpnList("x-amzn-mqtt-ca")); mqtt5Options.WithTlsConnectionOptions(tlsConnection); std::shared_ptr mqtt5Client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, allocator); ASSERT_TRUE(mqtt5Client); ASSERT_TRUE(mqtt5Client->Start()); - connectionPromise.get_future().get(); + ASSERT_TRUE(connectionPromise.get_future().get()); ASSERT_TRUE(mqtt5Client->Stop()); stoppedPromise.get_future().get(); return AWS_OP_SUCCESS; @@ -816,7 +816,7 @@ static int s_TestMqtt5WSConnectionMinimal(Aws::Crt::Allocator *allocator, void * std::shared_ptr mqtt5Client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, allocator); ASSERT_TRUE(mqtt5Client); ASSERT_TRUE(mqtt5Client->Start()); - connectionPromise.get_future().get(); + ASSERT_TRUE(connectionPromise.get_future().get()); ASSERT_TRUE(mqtt5Client->Stop()); stoppedPromise.get_future().get(); return AWS_OP_SUCCESS; @@ -2530,4 +2530,333 @@ static int s_TestMqtt5OperationStatisticsSimple(Aws::Crt::Allocator *allocator, } AWS_TEST_CASE(Mqtt5OperationStatisticsSimple, s_TestMqtt5OperationStatisticsSimple) -#endif // !BYO_CRYPTO +/* Mqtt5-to-Mqtt3 Adapter Test */ + +/* Test Helper Functions */ +static int s_ConnectAndDisconnect(std::shared_ptr connection) +{ + std::promise connectionCompletedPromise; + std::promise connectionClosedPromise; + auto onConnectionCompleted = + [&](Aws::Crt::Mqtt::MqttConnection &, int errorCode, Aws::Crt::Mqtt::ReturnCode returnCode, bool) { + (void)returnCode; + if (errorCode) + { + connectionCompletedPromise.set_value(false); + } + else + { + connectionCompletedPromise.set_value(true); + } + }; + auto onDisconnect = [&](Aws::Crt::Mqtt::MqttConnection &) { connectionClosedPromise.set_value(); }; + connection->OnConnectionCompleted = std::move(onConnectionCompleted); + connection->OnDisconnect = std::move(onDisconnect); + + // Mqtt5 Test client policy only allows client id start with "test-" + Aws::Crt::UUID Uuid; + Aws::Crt::String uuidStr = "test-" + Uuid.ToString(); + + if (!connection->Connect(uuidStr.c_str(), true /*cleanSession*/, 5000 /*keepAliveTimeSecs*/)) + { + printf("Failed to connect"); + return AWS_OP_ERR; + } + if (connectionCompletedPromise.get_future().get() == false) + { + printf("Connection failed"); + return AWS_OP_ERR; + } + if (connection->Disconnect()) + { + connectionClosedPromise.get_future().wait(); + } + return AWS_OP_SUCCESS; +} + +/* + * [Mqtt5to3Adapter-UC1] Happy path. Minimal creation and cleanup + */ +static int s_TestMqtt5to3AdapterNewConnectionMin(Aws::Crt::Allocator *allocator, void *) +{ + ApiHandle apiHandle(allocator); + + Mqtt5::Mqtt5ClientOptions mqtt5Options(allocator); + // Hardcoded the host name and port for creation test + mqtt5Options.WithHostName("localhost").WithPort(1883); + Aws::Crt::Io::SocketOptions socketOptions; + socketOptions.SetConnectTimeoutMs(3000); + std::shared_ptr mqtt5Client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, allocator); + ASSERT_TRUE(mqtt5Client); + std::shared_ptr mqttConnection = mqtt5Client->NewConnection(); + ASSERT_TRUE(mqttConnection); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(Mqtt5to3AdapterNewConnectionMin, s_TestMqtt5to3AdapterNewConnectionMin) + +/* + * [Mqtt5to3Adapter-UC2] Happy path. Minimal direct connection + */ +static int s_TestMqtt5to3AdapterDirectConnectionMinimal(Aws::Crt::Allocator *allocator, void *) +{ + Mqtt5TestEnvVars mqtt5TestVars(allocator, MQTT5CONNECT_DIRECT); + if (!mqtt5TestVars) + { + printf("Environment Variables are not set for the test, skip the test"); + return AWS_OP_SKIP; + } + + ApiHandle apiHandle(allocator); + Aws::Crt::Io::SocketOptions socketOptions; + socketOptions.SetConnectTimeoutMs(3000); + Mqtt5::Mqtt5ClientOptions mqtt5Options(allocator); + + mqtt5Options.WithHostName(mqtt5TestVars.m_hostname_string); + mqtt5Options.WithPort(mqtt5TestVars.m_port_value); + + std::shared_ptr mqtt5Client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, allocator); + ASSERT_TRUE(mqtt5Client); + std::shared_ptr mqttConnection = mqtt5Client->NewConnection(); + ASSERT_TRUE(mqttConnection); + int connectResult = s_ConnectAndDisconnect(mqttConnection); + ASSERT_SUCCESS(connectResult); + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(Mqtt5to3AdapterDirectConnectionMinimal, s_TestMqtt5to3AdapterDirectConnectionMinimal) + +/* + * [Mqtt5to3Adapter-UC3] Full options client creation and cleanup + */ +static int s_TestMqtt5to3AdapterDirectConnectionFull(Aws::Crt::Allocator *allocator, void *) +{ + Mqtt5TestEnvVars mqtt5TestVars(allocator, MQTT5CONNECT_DIRECT); + if (!mqtt5TestVars) + { + printf("Environment Variables are not set for the test, skip the test"); + return AWS_OP_SKIP; + } + + ApiHandle apiHandle(allocator); + + Mqtt5::Mqtt5ClientOptions mqtt5Options(allocator); + mqtt5Options.WithHostName(mqtt5TestVars.m_hostname_string).WithPort(mqtt5TestVars.m_port_value); + + Aws::Crt::Io::SocketOptions socketOptions; + socketOptions.SetConnectTimeoutMs(3000); + + Aws::Crt::Io::EventLoopGroup eventLoopGroup(0, allocator); + ASSERT_TRUE(eventLoopGroup); + + Aws::Crt::Io::DefaultHostResolver defaultHostResolver(eventLoopGroup, 8, 30, allocator); + ASSERT_TRUE(defaultHostResolver); + + Aws::Crt::Io::ClientBootstrap clientBootstrap(eventLoopGroup, defaultHostResolver, allocator); + ASSERT_TRUE(allocator); + clientBootstrap.EnableBlockingShutdown(); + + // Setup will + const Aws::Crt::String TEST_TOPIC = + "test/MQTT5_Binding_CPP/s_TestMqtt5to3AdapterDirectConnectionFull" + Aws::Crt::UUID().ToString(); + ByteBuf will_payload = Aws::Crt::ByteBufFromCString("Will Test"); + std::shared_ptr will = std::make_shared( + TEST_TOPIC, ByteCursorFromByteBuf(will_payload), Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator); + + std::shared_ptr packetConnect = std::make_shared(); + packetConnect->WithClientId("s_TestMqtt5to3AdapterDirectConnectionFull" + Aws::Crt::UUID().ToString()) + .WithKeepAliveIntervalSec(1000) + .WithMaximumPacketSizeBytes(1000L) + .WithReceiveMaximum(1000) + .WithRequestProblemInformation(true) + .WithRequestResponseInformation(true) + .WithSessionExpiryIntervalSec(1000L) + .WithWill(will) + .WithWillDelayIntervalSec(1000); + + Aws::Crt::Mqtt5::UserProperty userProperty("PropertyName", "PropertyValue"); + packetConnect->WithUserProperty(std::move(userProperty)); + + Aws::Crt::Mqtt5::ReconnectOptions reconnectOptions = { + Mqtt5::JitterMode::AWS_EXPONENTIAL_BACKOFF_JITTER_FULL, 1000, 1000, 1000}; + + mqtt5Options.WithConnectOptions(packetConnect); + mqtt5Options.WithBootstrap(&clientBootstrap); + mqtt5Options.WithSocketOptions(socketOptions); + mqtt5Options.WithSessionBehavior(Mqtt5::ClientSessionBehaviorType::AWS_MQTT5_CSBT_REJOIN_POST_SUCCESS); + mqtt5Options.WithClientExtendedValidationAndFlowControl( + Mqtt5::ClientExtendedValidationAndFlowControl::AWS_MQTT5_EVAFCO_NONE); + mqtt5Options.WithOfflineQueueBehavior( + Mqtt5::ClientOperationQueueBehaviorType::AWS_MQTT5_COQBT_FAIL_QOS0_PUBLISH_ON_DISCONNECT); + mqtt5Options.WithReconnectOptions(reconnectOptions); + mqtt5Options.WithPingTimeoutMs(1000); + mqtt5Options.WithConnackTimeoutMs(100); + mqtt5Options.WithAckTimeoutSeconds(1000); + + std::shared_ptr mqtt5Client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, allocator); + ASSERT_TRUE(mqtt5Client); + std::shared_ptr mqttConnection = mqtt5Client->NewConnection(); + ASSERT_TRUE(mqttConnection); + + int connectResult = s_ConnectAndDisconnect(mqttConnection); + ASSERT_SUCCESS(connectResult); + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(Mqtt5to3AdapterDirectConnectionFull, s_TestMqtt5to3AdapterDirectConnectionFull) + +/* + * [Mqtt5to3Adapter-UC4] Websocket creation and connection + */ +static int s_TestMqtt5to3AdapterWSConnectionMinimal(Aws::Crt::Allocator *allocator, void *) +{ + Mqtt5TestEnvVars mqtt5TestVars(allocator, MQTT5CONNECT_WS); + if (!mqtt5TestVars) + { + printf("Environment Variables are not set for the test, skip the test"); + return AWS_OP_SKIP; + } + + ApiHandle apiHandle(allocator); + + Mqtt5::Mqtt5ClientOptions mqtt5Options(allocator); + mqtt5Options.WithHostName(mqtt5TestVars.m_hostname_string); + mqtt5Options.WithPort(mqtt5TestVars.m_port_value); + + Aws::Crt::Io::SocketOptions socketOptions; + socketOptions.SetConnectTimeoutMs(3000); + + Aws::Crt::Auth::CredentialsProviderChainDefaultConfig defaultConfig; + std::shared_ptr provider = + Aws::Crt::Auth::CredentialsProvider::CreateCredentialsProviderChainDefault(defaultConfig); + + ASSERT_TRUE(provider); + + Aws::Iot::WebsocketConfig config("us-east-1", provider); + + int Mqtt5WebSocket = 0; + int Mqtt3WebSocket = 0; + + mqtt5Options.WithWebsocketHandshakeTransformCallback( + [config, &Mqtt5WebSocket]( + std::shared_ptr req, + const Aws::Crt::Mqtt5::OnWebSocketHandshakeInterceptComplete &onComplete) { + auto signingComplete = + [onComplete](const std::shared_ptr &req1, int errorCode) { + onComplete(req1, errorCode); + }; + + auto signerConfig = config.CreateSigningConfigCb(); + + config.Signer->SignRequest(req, *signerConfig, signingComplete); + Mqtt5WebSocket = 1; + }); + + std::shared_ptr mqtt5Client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, allocator); + ASSERT_TRUE(mqtt5Client); + + std::shared_ptr mqttConnection = mqtt5Client->NewConnection(); + ASSERT_TRUE(mqttConnection); + + mqttConnection->WebsocketInterceptor = + [config, &Mqtt3WebSocket]( + std::shared_ptr req, + const Aws::Crt::Mqtt::OnWebSocketHandshakeInterceptComplete &onComplete) { + auto signingComplete = + [onComplete](const std::shared_ptr &req1, int errorCode) { + onComplete(req1, errorCode); + }; + + auto signerConfig = config.CreateSigningConfigCb(); + + config.Signer->SignRequest(req, *signerConfig, signingComplete); + Mqtt3WebSocket = 1; + }; + + int connectResult = s_ConnectAndDisconnect(mqttConnection); + ASSERT_SUCCESS(connectResult); + ASSERT_TRUE(Mqtt3WebSocket == 1); + ASSERT_TRUE(Mqtt5WebSocket == 0); + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(Mqtt5to3AdapterWSConnectionMinimal, s_TestMqtt5to3AdapterWSConnectionMinimal) + +/* + * [Mqtt5to3Adapter-UC5] IoT MutalTLS creation and cleanup with Mqtt5ClientBuilder + */ +static int s_TestMqtt5to3AdapterWithIoTConnection(Aws::Crt::Allocator *allocator, void *) +{ + Mqtt5TestEnvVars mqtt5TestVars(allocator, MQTT5CONNECT_IOT_CORE); + if (!mqtt5TestVars) + { + printf("Environment Variables are not set for the test, skip the test"); + return AWS_OP_SKIP; + } + + ApiHandle apiHandle(allocator); + + Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + mqtt5TestVars.m_hostname_string, + mqtt5TestVars.m_certificate_path_string.c_str(), + mqtt5TestVars.m_private_key_path_string.c_str(), + allocator); + ASSERT_TRUE(builder); + + std::shared_ptr mqtt5Client = builder->Build(); + ASSERT_TRUE(mqtt5Client); + + // Created a Mqtt311 Connection from mqtt5Client. The options are setup by the builder. + std::shared_ptr mqttConnection = mqtt5Client->NewConnection(); + ASSERT_TRUE(mqttConnection); + delete builder; + int connectResult = s_ConnectAndDisconnect(mqttConnection); + ASSERT_SUCCESS(connectResult); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(Mqtt5to3AdapterWithIoTConnection, s_TestMqtt5to3AdapterWithIoTConnection) + +/* + * [Mqtt5to3Adapter-UC6] MutalTLS connection + */ +static int s_TestMqtt5to3AdapterDirectConnectionWithMutualTLS(Aws::Crt::Allocator *allocator, void *) +{ + Mqtt5TestEnvVars mqtt5TestVars(allocator, MQTT5CONNECT_IOT_CORE); + if (!mqtt5TestVars) + { + printf("Environment Variables are not set for the test, skip the test"); + return AWS_OP_SKIP; + } + + ApiHandle apiHandle(allocator); + + Mqtt5::Mqtt5ClientOptions mqtt5Options(allocator); + mqtt5Options.WithHostName(mqtt5TestVars.m_hostname_string); + mqtt5Options.WithPort(443); + + std::promise connectionPromise; + std::promise stoppedPromise; + + s_setupConnectionLifeCycle(mqtt5Options, connectionPromise, stoppedPromise); + + Aws::Crt::Io::TlsContextOptions tlsCtxOptions = Aws::Crt::Io::TlsContextOptions::InitClientWithMtls( + mqtt5TestVars.m_certificate_path_string.c_str(), mqtt5TestVars.m_private_key_path_string.c_str(), allocator); + + Aws::Crt::Io::TlsContext tlsContext(tlsCtxOptions, Aws::Crt::Io::TlsMode::CLIENT, allocator); + ASSERT_TRUE(tlsContext); + Aws::Crt::Io::TlsConnectionOptions tlsConnection = tlsContext.NewConnectionOptions(); + ASSERT_TRUE(tlsConnection); + ASSERT_TRUE(tlsConnection.SetAlpnList("x-amzn-mqtt-ca")); + mqtt5Options.WithTlsConnectionOptions(tlsConnection); + + std::shared_ptr mqtt5Client = Mqtt5::Mqtt5Client::NewMqtt5Client(mqtt5Options, allocator); + ASSERT_TRUE(mqtt5Client); + // Created a Mqtt311 Connection from mqtt5Client. The options are setup by the builder. + std::shared_ptr mqttConnection = mqtt5Client->NewConnection(); + ASSERT_TRUE(mqttConnection); + int connectResult = s_ConnectAndDisconnect(mqttConnection); + ASSERT_SUCCESS(connectResult); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(Mqtt5to3AdapterDirectConnectionWithMutualTLS, s_TestMqtt5to3AdapterDirectConnectionWithMutualTLS) + +#endif // !BYO_CRYPTO∏ \ No newline at end of file