Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt5to3 adapter #528

Merged
merged 6 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion include/aws/crt/mqtt/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include <aws/crt/http/HttpConnection.h>
#include <aws/crt/mqtt/Mqtt5Types.h>
#include <aws/crt/mqtt/MqttClient.h>

namespace Aws
{
Expand All @@ -27,6 +28,8 @@ namespace Aws
class UnSubAckPacket;
class Mqtt5ClientCore;

class Mqtt5to3AdapterOptions;

struct AWS_CRT_CPP_API ReconnectOptions
{
/**
Expand Down Expand Up @@ -315,6 +318,13 @@ namespace Aws
*/
const Mqtt5ClientOperationStatistics &GetOperationStatistics() noexcept;

/**
* Create a new MqttConnection object from the Mqtt5Client.
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection() noexcept;
xiazhvera marked this conversation as resolved.
Show resolved Hide resolved

virtual ~Mqtt5Client();

private:
Expand All @@ -324,6 +334,56 @@ namespace Aws
std::shared_ptr<Mqtt5ClientCore> m_client_core;

Mqtt5ClientOperationStatistics m_operationStatistics;

ScopedResource<Mqtt5to3AdapterOptions> m_mqtt5to3AdapterOptions;
};

/**
* The extra options required to build MqttConnection from Mqtt5Client
*/
class Mqtt5to3AdapterOptions
{
friend class Mqtt5ClientOptions;
friend class Mqtt5ClientCore;

public:
/* Default constructor */
Mqtt5to3AdapterOptions();

private:
/* Host name of the MQTT server to connect to. */
Crt::String m_hostName;

/* Port to connect to */
uint16_t m_port;

/*
* If the MqttConnection should overwrite the websocket config. If set to true, m_webSocketInterceptor
* must be set.
*/
bool m_overwriteWebsocket;
xiazhvera marked this conversation as resolved.
Show resolved Hide resolved

/*
* The transform function invoked during websocket handshake.
*/
Crt::Mqtt::OnWebSocketHandshakeIntercept m_webSocketInterceptor;

/**
* Controls socket properties of the underlying MQTT connections made by the client. Leave undefined to
* use defaults (no TCP keep alive, 10 second socket timeout).
*/
Crt::Io::SocketOptions m_socketOptions;

/**
* TLS context for secure socket connections.
* If undefined, a plaintext connection will be used.
*/
Crt::Optional<Crt::Io::TlsConnectionOptions> m_tlsConnectionOptions;

/**
* Configures (tunneling) HTTP proxy usage when establishing MQTT connections
*/
Crt::Optional<Crt::Http::HttpClientConnectionProxyOptions> m_proxyOptions;
};

/**
Expand Down Expand Up @@ -564,6 +624,14 @@ namespace Aws
Mqtt5ClientOptions &operator=(Mqtt5ClientOptions &&) = delete;

private:
/*
* Allocate and create a new Mqtt5to3AdapterOptions. This function is internally used by Mqtt5Client to
* support the Mqtt5to3Adapter.
*
* @return Mqtt5to3AdapterOptions
*/
ScopedResource<Mqtt5to3AdapterOptions> NewMqtt5to3AdapterOptions() const noexcept;

/**
* This callback allows a custom transformation of the HTTP request that acts as the websocket
* handshake. Websockets will be used if this is set to a valid transformation callback. To use
Expand Down Expand Up @@ -632,7 +700,7 @@ namespace Aws

/**
* TLS context for secure socket connections.
* If undefined, then a plaintext connection will be used.
* If undefined, a plaintext connection will be used.
*/
Crt::Optional<Crt::Io::TlsConnectionOptions> m_tlsConnectionOptions;

Expand Down
11 changes: 11 additions & 0 deletions include/aws/crt/mqtt/Mqtt5ClientCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ namespace Aws
private:
Mqtt5ClientCore(const Mqtt5ClientOptions &options, Allocator *allocator = ApiAllocator()) noexcept;

/**
* Create a new connection object over plain text from the Mqtt5Client. The client must outlive
* all of its connection instances. The Mqtt5 Options will be overwritten by the options passed in here.
*
* @param options the options from Mqtt5Client used to support the MqttConnection
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection(
const Mqtt5::Mqtt5to3AdapterOptions *options) noexcept;

/* Static Callbacks */
static void s_publishCompletionCallback(
enum aws_mqtt5_packet_type packet_type,
Expand Down
31 changes: 29 additions & 2 deletions include/aws/crt/mqtt/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <aws/crt/io/TlsOptions.h>

#include <aws/mqtt/client.h>
#include <aws/mqtt/v5/mqtt5_client.h>

#include <atomic>
#include <functional>
Expand All @@ -30,6 +31,11 @@ namespace Aws
class HttpRequest;
}

namespace Mqtt5
{
class Mqtt5ClientCore;
}

namespace Mqtt
{
class MqttClient;
Expand Down Expand Up @@ -212,14 +218,16 @@ namespace Aws
};

/**
* Represents a persistent Mqtt Connection. The memory is owned by MqttClient.
* Represents a persistent Mqtt Connection. The memory is owned by MqttClient or
* Mqtt5Client.
* To get a new instance of this class, see MqttClient::NewConnection. Unless
* specified all function arguments need only to live through the duration of the
* function call.
*/
class AWS_CRT_CPP_API MqttConnection final
{
friend class MqttClient;
friend class Mqtt5::Mqtt5ClientCore;

public:
~MqttConnection();
Expand Down Expand Up @@ -444,6 +452,7 @@ namespace Aws
bool m_useTls;
bool m_useWebsocket;
MqttConnectionOperationStatistics m_operationStatistics;
Allocator *m_allocator;

MqttConnection(
aws_mqtt_client *client,
Expand All @@ -460,6 +469,23 @@ namespace Aws
const Io::SocketOptions &socketOptions,
bool useWebsocket) noexcept;

MqttConnection(
aws_mqtt5_client *mqtt5Client,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
const Crt::Io::TlsConnectionOptions &tlsConnectionOptions,
bool useWebsocket,
Allocator *allocator) noexcept;

MqttConnection(
aws_mqtt5_client *mqtt5Client,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
bool useWebsocket,
Allocator *allocator) noexcept;

static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData);
static void s_onConnectionCompleted(
aws_mqtt_client_connection *,
Expand Down Expand Up @@ -526,7 +552,8 @@ namespace Aws
MqttConnection *self,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions);
const Io::SocketOptions &socketOptions,
aws_mqtt5_client *mqtt5Client = nullptr);
};

/**
Expand Down
2 changes: 1 addition & 1 deletion include/aws/iot/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ namespace Aws

/**
* TLS context for secure socket connections.
* If undefined, then a plaintext connection will be used.
* If undefined, a plaintext connection will be used.
*/
Crt::Optional<Crt::Io::TlsContextOptions> m_tlsConnectionOptions;

Expand Down
47 changes: 47 additions & 0 deletions source/mqtt/Mqtt5Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,23 @@ namespace Aws
{
namespace Mqtt5
{
Mqtt5to3AdapterOptions::Mqtt5to3AdapterOptions() : m_port(0), m_overwriteWebsocket(false) {}

Mqtt5Client::Mqtt5Client(const Mqtt5ClientOptions &options, Allocator *allocator) noexcept
: m_client_core(nullptr)
{
m_client_core = Mqtt5ClientCore::NewMqtt5ClientCore(options, allocator);
m_mqtt5to3AdapterOptions = options.NewMqtt5to3AdapterOptions();
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5Client::NewConnection() noexcept
{
if (m_client_core == nullptr)
{
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "Failed to create mqtt3 connection: Mqtt5 Client is invalid.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be the ERROR log level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a good idea to set the error level to ERROR for those "invalid mqtt5 client" issue here. As if the issue happened (the client core is nullptr), it usually means the Mqtt5Client is unusable, and we should expose it to user.
Since we've been using DEBUG log for all other operations, I will keep it here for now and create a separate PR to update the log level all together.

return nullptr;
}
return m_client_core->NewConnection(m_mqtt5to3AdapterOptions.get());
}

Mqtt5Client::~Mqtt5Client()
Expand Down Expand Up @@ -233,6 +246,40 @@ namespace Aws

Mqtt5ClientOptions::~Mqtt5ClientOptions() {}

ScopedResource<Mqtt5to3AdapterOptions> Mqtt5ClientOptions::NewMqtt5to3AdapterOptions() const noexcept
{
Allocator *allocator = m_allocator;
ScopedResource<Mqtt5to3AdapterOptions> adapterOptions = ScopedResource<Mqtt5to3AdapterOptions>(
Crt::New<Mqtt5to3AdapterOptions>(allocator),
[allocator](Mqtt5to3AdapterOptions *options) { Crt::Delete(options, allocator); });

adapterOptions->m_hostName = m_hostName;
adapterOptions->m_port = m_port;
adapterOptions->m_socketOptions = m_socketOptions;
if (m_proxyOptions.has_value())
adapterOptions->m_proxyOptions = m_proxyOptions.value();
if (m_tlsConnectionOptions.has_value())
{
adapterOptions->m_tlsConnectionOptions = m_tlsConnectionOptions.value();
}
if (websocketHandshakeTransform)
{
adapterOptions->m_overwriteWebsocket = true;

auto signerTransform = [this](
std::shared_ptr<Crt::Http::HttpRequest> req,
const Crt::Mqtt::OnWebSocketHandshakeInterceptComplete &onComplete) {
this->websocketHandshakeTransform(std::move(req), onComplete);
};
adapterOptions->m_webSocketInterceptor = std::move(signerTransform);
}
else
{
adapterOptions->m_overwriteWebsocket = false;
}
return adapterOptions;
}

Mqtt5ClientOptions &Mqtt5ClientOptions::WithHostName(Crt::String hostname)
{
m_hostName = std::move(hostname);
Expand Down
54 changes: 53 additions & 1 deletion source/mqtt/Mqtt5ClientCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,58 @@ namespace Aws
}
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5ClientCore::NewConnection(
const Mqtt5::Mqtt5to3AdapterOptions *options) noexcept
{
// If you're reading this and asking.... why is this so complicated? Why not use make_shared
// or allocate_shared? Well, MqttConnection constructors are private and stl is dumb like that.
// so, we do it manually.
Allocator *allocator = this->m_allocator;
Crt::Mqtt::MqttConnection *toSeat = reinterpret_cast<Crt::Mqtt::MqttConnection *>(
aws_mem_acquire(allocator, sizeof(Crt::Mqtt::MqttConnection)));
if (!toSeat)
{
return nullptr;
}

if (options->m_tlsConnectionOptions.has_value())
{
toSeat = new (toSeat) Crt::Mqtt::MqttConnection(
m_client,
options->m_hostName.c_str(),
options->m_port,
options->m_socketOptions,
options->m_tlsConnectionOptions.value(),
options->m_overwriteWebsocket,
allocator);
}
else
{
toSeat = new (toSeat) Crt::Mqtt::MqttConnection(
m_client,
options->m_hostName.c_str(),
options->m_port,
options->m_socketOptions,
options->m_overwriteWebsocket,
allocator);
}
if (options->m_proxyOptions.has_value())
{
toSeat->SetHttpProxyOptions(options->m_proxyOptions.value());
}

if (options->m_overwriteWebsocket)
{
toSeat->WebsocketInterceptor = options->m_webSocketInterceptor;
}

return std::shared_ptr<Crt::Mqtt::MqttConnection>(
toSeat, [allocator](Crt::Mqtt::MqttConnection *connection) {
connection->~MqttConnection();
aws_mem_release(allocator, reinterpret_cast<void *>(connection));
});
}

void Mqtt5ClientCore::s_publishCompletionCallback(
enum aws_mqtt5_packet_type packet_type,
const void *publishCompletionPacket,
Expand Down Expand Up @@ -469,7 +521,7 @@ namespace Aws
const Mqtt5ClientOptions &options,
Allocator *allocator) noexcept
{
/* Copied from MqttClient.cpp:ln754 (MqttClient::NewConnection) */
/* Copied from MqttClient.cpp: MqttClient::NewConnection) */
/* As the constructor is private, make share would not work here. We do make_share manually. */
Mqtt5ClientCore *toSeat =
reinterpret_cast<Mqtt5ClientCore *>(aws_mem_acquire(allocator, sizeof(Mqtt5ClientCore)));
Expand Down
Loading