Skip to content

Commit

Permalink
adapter binding
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Aug 3, 2023
1 parent abaa85f commit 0b75217
Show file tree
Hide file tree
Showing 10 changed files with 627 additions and 36 deletions.
53 changes: 53 additions & 0 deletions include/aws/crt/mqtt/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include <aws/crt/http/HttpConnection.h>
#include <aws/crt/mqtt/Mqtt5Types.h>
#include <aws/crt/mqtt/MqttClient.h>

namespace Aws
{
Expand All @@ -27,6 +28,8 @@ namespace Aws
class UnSubAckPacket;
class Mqtt5ClientCore;

class Mqtt5to3AdapterOptions;

struct AWS_CRT_CPP_API ReconnectOptions
{
/**
Expand Down Expand Up @@ -315,6 +318,13 @@ namespace Aws
*/
const Mqtt5ClientOperationStatistics &GetOperationStatistics() noexcept;

/**
* Create a new connection object from the client5.
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection() noexcept;

virtual ~Mqtt5Client();

private:
Expand All @@ -324,6 +334,47 @@ namespace Aws
std::shared_ptr<Mqtt5ClientCore> m_client_core;

Mqtt5ClientOperationStatistics m_operationStatistics;

Mqtt5to3AdapterOptions *m_mqtt5to3AdapterOptions;
};

/**
* The extra option required to build MqttConnection
*/
class Mqtt5to3AdapterOptions
{
friend class Mqtt5ClientOptions;
friend class Mqtt5ClientCore;

public:
// Default constructor
Mqtt5to3AdapterOptions();

private:
Crt::String m_hostName;

uint16_t m_port;

bool m_overwriteWebsocket;

Crt::Mqtt::OnWebSocketHandshakeIntercept m_webSocketInterceptor;

/**
* Controls socket properties of the underlying MQTT connections made by the client. Leave undefined to
* use defaults (no TCP keep alive, 10 second socket timeout).
*/
Crt::Io::SocketOptions m_socketOptions;

/**
* TLS context for secure socket connections.
* If undefined, then a plaintext connection will be used.
*/
Crt::Optional<Crt::Io::TlsConnectionOptions> m_tlsConnectionOptions;

/**
* Configures (tunneling) HTTP proxy usage when establishing MQTT connections
*/
Crt::Optional<Crt::Http::HttpClientConnectionProxyOptions> m_proxyOptions;
};

/**
Expand Down Expand Up @@ -564,6 +615,8 @@ namespace Aws
Mqtt5ClientOptions &operator=(Mqtt5ClientOptions &&) = delete;

private:
Mqtt5to3AdapterOptions *NewMqtt5to3AdapterOptions() const noexcept;

/**
* This callback allows a custom transformation of the HTTP request that acts as the websocket
* handshake. Websockets will be used if this is set to a valid transformation callback. To use
Expand Down
11 changes: 11 additions & 0 deletions include/aws/crt/mqtt/Mqtt5ClientCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ namespace Aws
private:
Mqtt5ClientCore(const Mqtt5ClientOptions &options, Allocator *allocator = ApiAllocator()) noexcept;

/**
* Create a new connection object over plain text from the client5. The client must outlive
* all of its connection instances.The Mqtt5 Options will be overwritten by the options,
* passed in here.
*
* @param options the options from Mqtt5Client used to support the MqttConnection
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection(Mqtt5::Mqtt5to3AdapterOptions* options) noexcept;

/* Static Callbacks */
static void s_publishCompletionCallback(
enum aws_mqtt5_packet_type packet_type,
Expand Down
31 changes: 31 additions & 0 deletions include/aws/crt/mqtt/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <aws/crt/io/TlsOptions.h>

#include <aws/mqtt/client.h>
#include <aws/mqtt/v5/mqtt5_client.h>

#include <atomic>
#include <functional>
Expand All @@ -30,6 +31,11 @@ namespace Aws
class HttpRequest;
}

namespace Mqtt5
{
class Mqtt5ClientCore;
}

namespace Mqtt
{
class MqttClient;
Expand Down Expand Up @@ -220,6 +226,7 @@ namespace Aws
class AWS_CRT_CPP_API MqttConnection final
{
friend class MqttClient;
friend class Mqtt5::Mqtt5ClientCore;

public:
~MqttConnection();
Expand Down Expand Up @@ -444,6 +451,7 @@ namespace Aws
bool m_useTls;
bool m_useWebsocket;
MqttConnectionOperationStatistics m_operationStatistics;
Allocator *m_allocator;

MqttConnection(
aws_mqtt_client *client,
Expand All @@ -460,6 +468,23 @@ namespace Aws
const Io::SocketOptions &socketOptions,
bool useWebsocket) noexcept;

MqttConnection(
aws_mqtt5_client *client,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
const Crt::Io::TlsConnectionOptions &tlsConnectionOptions,
bool useWebsocket,
aws_allocator *allocaotr) noexcept;

MqttConnection(
aws_mqtt5_client *client,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
bool useWebsocket,
aws_allocator *allocaotr) noexcept;

static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData);
static void s_onConnectionCompleted(
aws_mqtt_client_connection *,
Expand Down Expand Up @@ -527,6 +552,12 @@ namespace Aws
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions);
static void s_connectionInit(
MqttConnection *self,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
aws_mqtt5_client *mqtt5client);
};

/**
Expand Down
44 changes: 44 additions & 0 deletions source/mqtt/Mqtt5Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,23 @@ namespace Aws
{
namespace Mqtt5
{
Mqtt5to3AdapterOptions::Mqtt5to3AdapterOptions() : m_port(0), m_overwriteWebsocket(false) {}

Mqtt5Client::Mqtt5Client(const Mqtt5ClientOptions &options, Allocator *allocator) noexcept
: m_client_core(nullptr)
{
m_client_core = Mqtt5ClientCore::NewMqtt5ClientCore(options, allocator);
m_mqtt5to3AdapterOptions = options.NewMqtt5to3AdapterOptions();
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5Client::NewConnection() noexcept
{
if (m_client_core == nullptr)
{
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "Failed to create mqtt3 connection: Mqtt5 Client is invalid.");
return nullptr;
}
return m_client_core->NewConnection(m_mqtt5to3AdapterOptions);
}

Mqtt5Client::~Mqtt5Client()
Expand All @@ -34,6 +47,7 @@ namespace Aws
m_client_core->Close();
m_client_core.reset();
}
delete m_mqtt5to3AdapterOptions;
}

std::shared_ptr<Mqtt5Client> Mqtt5Client::NewMqtt5Client(
Expand Down Expand Up @@ -233,6 +247,36 @@ namespace Aws

Mqtt5ClientOptions::~Mqtt5ClientOptions() {}

Mqtt5to3AdapterOptions *Mqtt5ClientOptions::NewMqtt5to3AdapterOptions() const noexcept
{
Mqtt5to3AdapterOptions *adapterOptions = new Mqtt5to3AdapterOptions();
adapterOptions->m_hostName = m_hostName;
adapterOptions->m_port = m_port;
adapterOptions->m_socketOptions = m_socketOptions;
if (m_proxyOptions.has_value())
adapterOptions->m_proxyOptions = m_proxyOptions.value();
if (m_tlsConnectionOptions.has_value())
{
adapterOptions->m_tlsConnectionOptions = m_tlsConnectionOptions.value();
}
if (websocketHandshakeTransform)
{
adapterOptions->m_overwriteWebsocket = true;

auto signerTransform = [this](
std::shared_ptr<Crt::Http::HttpRequest> req,
const Crt::Mqtt::OnWebSocketHandshakeInterceptComplete &onComplete) {
this->websocketHandshakeTransform(req, onComplete);
};
adapterOptions->m_webSocketInterceptor = std::move(signerTransform);
}
else
{
adapterOptions->m_overwriteWebsocket = false;
}
return adapterOptions;
}

Mqtt5ClientOptions &Mqtt5ClientOptions::WithHostName(Crt::String hostname)
{
m_hostName = std::move(hostname);
Expand Down
52 changes: 52 additions & 0 deletions source/mqtt/Mqtt5ClientCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,58 @@ namespace Aws
}
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5ClientCore::NewConnection(
Mqtt5::Mqtt5to3AdapterOptions *options) noexcept
{
// If you're reading this and asking.... why is this so complicated? Why not use make_shared
// or allocate_shared? Well, MqttConnection constructors are private and stl is dumb like that.
// so, we do it manually.
Allocator *allocator = this->m_allocator;
Crt::Mqtt::MqttConnection *toSeat = reinterpret_cast<Crt::Mqtt::MqttConnection *>(
aws_mem_acquire(allocator, sizeof(Crt::Mqtt::MqttConnection)));
if (!toSeat)
{
return nullptr;
}

if (options->m_tlsConnectionOptions.has_value())
{
toSeat = new (toSeat) Crt::Mqtt::MqttConnection(
m_client,
options->m_hostName.c_str(),
options->m_port,
options->m_socketOptions,
options->m_tlsConnectionOptions.value(),
options->m_overwriteWebsocket,
allocator);
}
else
{
toSeat = new (toSeat) Crt::Mqtt::MqttConnection(
m_client,
options->m_hostName.c_str(),
options->m_port,
options->m_socketOptions,
options->m_overwriteWebsocket,
allocator);
}
if (options->m_proxyOptions.has_value())
{
toSeat->SetHttpProxyOptions(options->m_proxyOptions.value());
}

if (options->m_overwriteWebsocket)
{
toSeat->WebsocketInterceptor = options->m_webSocketInterceptor;
}

return std::shared_ptr<Crt::Mqtt::MqttConnection>(
toSeat, [allocator](Crt::Mqtt::MqttConnection *connection) {
connection->~MqttConnection();
aws_mem_release(allocator, reinterpret_cast<void *>(connection));
});
}

void Mqtt5ClientCore::s_publishCompletionCallback(
enum aws_mqtt5_packet_type packet_type,
const void *publishCompletionPacket,
Expand Down
Loading

0 comments on commit 0b75217

Please sign in to comment.