Skip to content

Commit

Permalink
Mqtt5to3 adapter (#528)
Browse files Browse the repository at this point in the history
* adapter binding

* clang-format

* update submodule

* update cr

* update CR

* use move to pass the share_ptr
  • Loading branch information
xiazhvera authored Aug 4, 2023
1 parent abaa85f commit 3dc6b3d
Show file tree
Hide file tree
Showing 11 changed files with 624 additions and 43 deletions.
70 changes: 69 additions & 1 deletion include/aws/crt/mqtt/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include <aws/crt/http/HttpConnection.h>
#include <aws/crt/mqtt/Mqtt5Types.h>
#include <aws/crt/mqtt/MqttClient.h>

namespace Aws
{
Expand All @@ -27,6 +28,8 @@ namespace Aws
class UnSubAckPacket;
class Mqtt5ClientCore;

class Mqtt5to3AdapterOptions;

struct AWS_CRT_CPP_API ReconnectOptions
{
/**
Expand Down Expand Up @@ -315,6 +318,13 @@ namespace Aws
*/
const Mqtt5ClientOperationStatistics &GetOperationStatistics() noexcept;

/**
* Create a new MqttConnection object from the Mqtt5Client.
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection() noexcept;

virtual ~Mqtt5Client();

private:
Expand All @@ -324,6 +334,56 @@ namespace Aws
std::shared_ptr<Mqtt5ClientCore> m_client_core;

Mqtt5ClientOperationStatistics m_operationStatistics;

ScopedResource<Mqtt5to3AdapterOptions> m_mqtt5to3AdapterOptions;
};

/**
* The extra options required to build MqttConnection from Mqtt5Client
*/
class Mqtt5to3AdapterOptions
{
friend class Mqtt5ClientOptions;
friend class Mqtt5ClientCore;

public:
/* Default constructor */
Mqtt5to3AdapterOptions();

private:
/* Host name of the MQTT server to connect to. */
Crt::String m_hostName;

/* Port to connect to */
uint16_t m_port;

/*
* If the MqttConnection should overwrite the websocket config. If set to true, m_webSocketInterceptor
* must be set.
*/
bool m_overwriteWebsocket;

/*
* The transform function invoked during websocket handshake.
*/
Crt::Mqtt::OnWebSocketHandshakeIntercept m_webSocketInterceptor;

/**
* Controls socket properties of the underlying MQTT connections made by the client. Leave undefined to
* use defaults (no TCP keep alive, 10 second socket timeout).
*/
Crt::Io::SocketOptions m_socketOptions;

/**
* TLS context for secure socket connections.
* If undefined, a plaintext connection will be used.
*/
Crt::Optional<Crt::Io::TlsConnectionOptions> m_tlsConnectionOptions;

/**
* Configures (tunneling) HTTP proxy usage when establishing MQTT connections
*/
Crt::Optional<Crt::Http::HttpClientConnectionProxyOptions> m_proxyOptions;
};

/**
Expand Down Expand Up @@ -564,6 +624,14 @@ namespace Aws
Mqtt5ClientOptions &operator=(Mqtt5ClientOptions &&) = delete;

private:
/*
* Allocate and create a new Mqtt5to3AdapterOptions. This function is internally used by Mqtt5Client to
* support the Mqtt5to3Adapter.
*
* @return Mqtt5to3AdapterOptions
*/
ScopedResource<Mqtt5to3AdapterOptions> NewMqtt5to3AdapterOptions() const noexcept;

/**
* This callback allows a custom transformation of the HTTP request that acts as the websocket
* handshake. Websockets will be used if this is set to a valid transformation callback. To use
Expand Down Expand Up @@ -632,7 +700,7 @@ namespace Aws

/**
* TLS context for secure socket connections.
* If undefined, then a plaintext connection will be used.
* If undefined, a plaintext connection will be used.
*/
Crt::Optional<Crt::Io::TlsConnectionOptions> m_tlsConnectionOptions;

Expand Down
11 changes: 11 additions & 0 deletions include/aws/crt/mqtt/Mqtt5ClientCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ namespace Aws
private:
Mqtt5ClientCore(const Mqtt5ClientOptions &options, Allocator *allocator = ApiAllocator()) noexcept;

/**
* Create a new connection object over plain text from the Mqtt5Client. The client must outlive
* all of its connection instances. The Mqtt5 Options will be overwritten by the options passed in here.
*
* @param options the options from Mqtt5Client used to support the MqttConnection
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection(
const Mqtt5::Mqtt5to3AdapterOptions *options) noexcept;

/* Static Callbacks */
static void s_publishCompletionCallback(
enum aws_mqtt5_packet_type packet_type,
Expand Down
31 changes: 29 additions & 2 deletions include/aws/crt/mqtt/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <aws/crt/io/TlsOptions.h>

#include <aws/mqtt/client.h>
#include <aws/mqtt/v5/mqtt5_client.h>

#include <atomic>
#include <functional>
Expand All @@ -30,6 +31,11 @@ namespace Aws
class HttpRequest;
}

namespace Mqtt5
{
class Mqtt5ClientCore;
}

namespace Mqtt
{
class MqttClient;
Expand Down Expand Up @@ -212,14 +218,16 @@ namespace Aws
};

/**
* Represents a persistent Mqtt Connection. The memory is owned by MqttClient.
* Represents a persistent Mqtt Connection. The memory is owned by MqttClient or
* Mqtt5Client.
* To get a new instance of this class, see MqttClient::NewConnection. Unless
* specified all function arguments need only to live through the duration of the
* function call.
*/
class AWS_CRT_CPP_API MqttConnection final
{
friend class MqttClient;
friend class Mqtt5::Mqtt5ClientCore;

public:
~MqttConnection();
Expand Down Expand Up @@ -444,6 +452,7 @@ namespace Aws
bool m_useTls;
bool m_useWebsocket;
MqttConnectionOperationStatistics m_operationStatistics;
Allocator *m_allocator;

MqttConnection(
aws_mqtt_client *client,
Expand All @@ -460,6 +469,23 @@ namespace Aws
const Io::SocketOptions &socketOptions,
bool useWebsocket) noexcept;

MqttConnection(
aws_mqtt5_client *mqtt5Client,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
const Crt::Io::TlsConnectionOptions &tlsConnectionOptions,
bool useWebsocket,
Allocator *allocator) noexcept;

MqttConnection(
aws_mqtt5_client *mqtt5Client,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
bool useWebsocket,
Allocator *allocator) noexcept;

static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData);
static void s_onConnectionCompleted(
aws_mqtt_client_connection *,
Expand Down Expand Up @@ -526,7 +552,8 @@ namespace Aws
MqttConnection *self,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions);
const Io::SocketOptions &socketOptions,
aws_mqtt5_client *mqtt5Client = nullptr);
};

/**
Expand Down
2 changes: 1 addition & 1 deletion include/aws/iot/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ namespace Aws

/**
* TLS context for secure socket connections.
* If undefined, then a plaintext connection will be used.
* If undefined, a plaintext connection will be used.
*/
Crt::Optional<Crt::Io::TlsContextOptions> m_tlsConnectionOptions;

Expand Down
47 changes: 47 additions & 0 deletions source/mqtt/Mqtt5Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,23 @@ namespace Aws
{
namespace Mqtt5
{
Mqtt5to3AdapterOptions::Mqtt5to3AdapterOptions() : m_port(0), m_overwriteWebsocket(false) {}

Mqtt5Client::Mqtt5Client(const Mqtt5ClientOptions &options, Allocator *allocator) noexcept
: m_client_core(nullptr)
{
m_client_core = Mqtt5ClientCore::NewMqtt5ClientCore(options, allocator);
m_mqtt5to3AdapterOptions = options.NewMqtt5to3AdapterOptions();
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5Client::NewConnection() noexcept
{
if (m_client_core == nullptr)
{
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "Failed to create mqtt3 connection: Mqtt5 Client is invalid.");
return nullptr;
}
return m_client_core->NewConnection(m_mqtt5to3AdapterOptions.get());
}

Mqtt5Client::~Mqtt5Client()
Expand Down Expand Up @@ -233,6 +246,40 @@ namespace Aws

Mqtt5ClientOptions::~Mqtt5ClientOptions() {}

ScopedResource<Mqtt5to3AdapterOptions> Mqtt5ClientOptions::NewMqtt5to3AdapterOptions() const noexcept
{
Allocator *allocator = m_allocator;
ScopedResource<Mqtt5to3AdapterOptions> adapterOptions = ScopedResource<Mqtt5to3AdapterOptions>(
Crt::New<Mqtt5to3AdapterOptions>(allocator),
[allocator](Mqtt5to3AdapterOptions *options) { Crt::Delete(options, allocator); });

adapterOptions->m_hostName = m_hostName;
adapterOptions->m_port = m_port;
adapterOptions->m_socketOptions = m_socketOptions;
if (m_proxyOptions.has_value())
adapterOptions->m_proxyOptions = m_proxyOptions.value();
if (m_tlsConnectionOptions.has_value())
{
adapterOptions->m_tlsConnectionOptions = m_tlsConnectionOptions.value();
}
if (websocketHandshakeTransform)
{
adapterOptions->m_overwriteWebsocket = true;

auto signerTransform = [this](
std::shared_ptr<Crt::Http::HttpRequest> req,
const Crt::Mqtt::OnWebSocketHandshakeInterceptComplete &onComplete) {
this->websocketHandshakeTransform(std::move(req), onComplete);
};
adapterOptions->m_webSocketInterceptor = std::move(signerTransform);
}
else
{
adapterOptions->m_overwriteWebsocket = false;
}
return adapterOptions;
}

Mqtt5ClientOptions &Mqtt5ClientOptions::WithHostName(Crt::String hostname)
{
m_hostName = std::move(hostname);
Expand Down
54 changes: 53 additions & 1 deletion source/mqtt/Mqtt5ClientCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,58 @@ namespace Aws
}
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5ClientCore::NewConnection(
const Mqtt5::Mqtt5to3AdapterOptions *options) noexcept
{
// If you're reading this and asking.... why is this so complicated? Why not use make_shared
// or allocate_shared? Well, MqttConnection constructors are private and stl is dumb like that.
// so, we do it manually.
Allocator *allocator = this->m_allocator;
Crt::Mqtt::MqttConnection *toSeat = reinterpret_cast<Crt::Mqtt::MqttConnection *>(
aws_mem_acquire(allocator, sizeof(Crt::Mqtt::MqttConnection)));
if (!toSeat)
{
return nullptr;
}

if (options->m_tlsConnectionOptions.has_value())
{
toSeat = new (toSeat) Crt::Mqtt::MqttConnection(
m_client,
options->m_hostName.c_str(),
options->m_port,
options->m_socketOptions,
options->m_tlsConnectionOptions.value(),
options->m_overwriteWebsocket,
allocator);
}
else
{
toSeat = new (toSeat) Crt::Mqtt::MqttConnection(
m_client,
options->m_hostName.c_str(),
options->m_port,
options->m_socketOptions,
options->m_overwriteWebsocket,
allocator);
}
if (options->m_proxyOptions.has_value())
{
toSeat->SetHttpProxyOptions(options->m_proxyOptions.value());
}

if (options->m_overwriteWebsocket)
{
toSeat->WebsocketInterceptor = options->m_webSocketInterceptor;
}

return std::shared_ptr<Crt::Mqtt::MqttConnection>(
toSeat, [allocator](Crt::Mqtt::MqttConnection *connection) {
connection->~MqttConnection();
aws_mem_release(allocator, reinterpret_cast<void *>(connection));
});
}

void Mqtt5ClientCore::s_publishCompletionCallback(
enum aws_mqtt5_packet_type packet_type,
const void *publishCompletionPacket,
Expand Down Expand Up @@ -469,7 +521,7 @@ namespace Aws
const Mqtt5ClientOptions &options,
Allocator *allocator) noexcept
{
/* Copied from MqttClient.cpp:ln754 (MqttClient::NewConnection) */
/* Copied from MqttClient.cpp: MqttClient::NewConnection) */
/* As the constructor is private, make share would not work here. We do make_share manually. */
Mqtt5ClientCore *toSeat =
reinterpret_cast<Mqtt5ClientCore *>(aws_mem_acquire(allocator, sizeof(Mqtt5ClientCore)));
Expand Down
Loading

0 comments on commit 3dc6b3d

Please sign in to comment.