Skip to content

Commit

Permalink
update CR
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Aug 4, 2023
1 parent da3171b commit 6fac5d6
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 56 deletions.
17 changes: 13 additions & 4 deletions include/aws/crt/mqtt/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,28 +335,37 @@ namespace Aws

Mqtt5ClientOperationStatistics m_operationStatistics;

Mqtt5to3AdapterOptions *m_mqtt5to3AdapterOptions;
ScopedResource<Mqtt5to3AdapterOptions> m_mqtt5to3AdapterOptions;
};

/**
* The extra options required to build MqttConnection
* The extra options required to build MqttConnection from Mqtt5Client
*/
class Mqtt5to3AdapterOptions
{
friend class Mqtt5ClientOptions;
friend class Mqtt5ClientCore;

public:
// Default constructor
/* Default constructor */
Mqtt5to3AdapterOptions();

private:
/* Host name of the MQTT server to connect to. */
Crt::String m_hostName;

/* Port to connect to */
uint16_t m_port;

/*
* If the MqttConnection should overwrite the websocket config. If set to true, m_webSocketInterceptor
* must be set.
*/
bool m_overwriteWebsocket;

/*
* The transform function invoked during websocket handshake.
*/
Crt::Mqtt::OnWebSocketHandshakeIntercept m_webSocketInterceptor;

/**
Expand Down Expand Up @@ -621,7 +630,7 @@ namespace Aws
*
* @return Mqtt5to3AdapterOptions
*/
Mqtt5to3AdapterOptions *NewMqtt5to3AdapterOptions() const noexcept;
ScopedResource<Mqtt5to3AdapterOptions> NewMqtt5to3AdapterOptions() const noexcept;

/**
* This callback allows a custom transformation of the HTTP request that acts as the websocket
Expand Down
2 changes: 1 addition & 1 deletion include/aws/crt/mqtt/Mqtt5ClientCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ namespace Aws
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection(
Mqtt5::Mqtt5to3AdapterOptions *options) noexcept;
const Mqtt5::Mqtt5to3AdapterOptions *options) noexcept;

/* Static Callbacks */
static void s_publishCompletionCallback(
Expand Down
11 changes: 3 additions & 8 deletions include/aws/crt/mqtt/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,15 @@ namespace Aws
const Io::SocketOptions &socketOptions,
const Crt::Io::TlsConnectionOptions &tlsConnectionOptions,
bool useWebsocket,
aws_allocator *allocaotr) noexcept;
Allocator *allocator) noexcept;

MqttConnection(
aws_mqtt5_client *mqtt5Client,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
bool useWebsocket,
aws_allocator *allocaotr) noexcept;
Allocator *allocator) noexcept;

static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData);
static void s_onConnectionCompleted(
Expand Down Expand Up @@ -548,17 +548,12 @@ namespace Aws
aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn,
void *complete_ctx);

static void s_connectionInit(
MqttConnection *self,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions);
static void s_connectionInit(
MqttConnection *self,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
aws_mqtt5_client *mqtt5client);
aws_mqtt5_client *mqtt5Client = nullptr);
};

/**
Expand Down
11 changes: 7 additions & 4 deletions source/mqtt/Mqtt5Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace Aws
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "Failed to create mqtt3 connection: Mqtt5 Client is invalid.");
return nullptr;
}
return m_client_core->NewConnection(m_mqtt5to3AdapterOptions);
return m_client_core->NewConnection(m_mqtt5to3AdapterOptions.get());
}

Mqtt5Client::~Mqtt5Client()
Expand All @@ -47,7 +47,6 @@ namespace Aws
m_client_core->Close();
m_client_core.reset();
}
delete m_mqtt5to3AdapterOptions;
}

std::shared_ptr<Mqtt5Client> Mqtt5Client::NewMqtt5Client(
Expand Down Expand Up @@ -247,9 +246,13 @@ namespace Aws

Mqtt5ClientOptions::~Mqtt5ClientOptions() {}

Mqtt5to3AdapterOptions *Mqtt5ClientOptions::NewMqtt5to3AdapterOptions() const noexcept
ScopedResource<Mqtt5to3AdapterOptions> Mqtt5ClientOptions::NewMqtt5to3AdapterOptions() const noexcept
{
Mqtt5to3AdapterOptions *adapterOptions = new Mqtt5to3AdapterOptions();
Allocator *allocator = m_allocator;
ScopedResource<Mqtt5to3AdapterOptions> adapterOptions = ScopedResource<Mqtt5to3AdapterOptions>(
Crt::New<Mqtt5to3AdapterOptions>(allocator),
[allocator](Mqtt5to3AdapterOptions *options) { Crt::Delete(options, allocator); });

adapterOptions->m_hostName = m_hostName;
adapterOptions->m_port = m_port;
adapterOptions->m_socketOptions = m_socketOptions;
Expand Down
4 changes: 2 additions & 2 deletions source/mqtt/Mqtt5ClientCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ namespace Aws
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5ClientCore::NewConnection(
Mqtt5::Mqtt5to3AdapterOptions *options) noexcept
const Mqtt5::Mqtt5to3AdapterOptions *options) noexcept
{
// If you're reading this and asking.... why is this so complicated? Why not use make_shared
// or allocate_shared? Well, MqttConnection constructors are private and stl is dumb like that.
Expand Down Expand Up @@ -521,7 +521,7 @@ namespace Aws
const Mqtt5ClientOptions &options,
Allocator *allocator) noexcept
{
/* Copied from MqttClient.cpp:ln754 (MqttClient::NewConnection) */
/* Copied from MqttClient.cpp: MqttClient::NewConnection) */
/* As the constructor is private, make share would not work here. We do make_share manually. */
Mqtt5ClientCore *toSeat =
reinterpret_cast<Mqtt5ClientCore *>(aws_mem_acquire(allocator, sizeof(Mqtt5ClientCore)));
Expand Down
51 changes: 14 additions & 37 deletions source/mqtt/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,49 +265,22 @@ namespace Aws
MqttConnection *self,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions)
const Io::SocketOptions &socketOptions,
aws_mqtt5_client *mqtt5Client)
{

self->m_hostName = String(hostName);
self->m_port = port;
self->m_socketOptions = socketOptions;

self->m_underlyingConnection = aws_mqtt_client_connection_new(self->m_owningClient);

if (self->m_underlyingConnection)
if (mqtt5Client)
{
aws_mqtt_client_connection_set_connection_result_handlers(
self->m_underlyingConnection,
MqttConnection::s_onConnectionSuccess,
self,
MqttConnection::s_onConnectionFailure,
self);

aws_mqtt_client_connection_set_connection_interruption_handlers(
self->m_underlyingConnection,
MqttConnection::s_onConnectionInterrupted,
self,
MqttConnection::s_onConnectionResumed,
self);

aws_mqtt_client_connection_set_connection_closed_handler(
self->m_underlyingConnection, MqttConnection::s_onConnectionClosed, self);
self->m_underlyingConnection = aws_mqtt_client_connection_new_from_mqtt5_client(mqtt5Client);
}
else
{
self->m_underlyingConnection = aws_mqtt_client_connection_new(self->m_owningClient);
}
}

void MqttConnection::s_connectionInit(
MqttConnection *self,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
aws_mqtt5_client *mqtt5client)
{

self->m_hostName = String(hostName);
self->m_port = port;
self->m_socketOptions = socketOptions;

self->m_underlyingConnection = aws_mqtt_client_connection_new_from_mqtt5_client(mqtt5client);

if (self->m_underlyingConnection)
{
Expand All @@ -328,6 +301,10 @@ namespace Aws
aws_mqtt_client_connection_set_connection_closed_handler(
self->m_underlyingConnection, MqttConnection::s_onConnectionClosed, self);
}
else
{
AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "Failed to initialize Mqtt Connection");
}
}

void MqttConnection::s_onWebsocketHandshake(
Expand Down Expand Up @@ -388,7 +365,7 @@ namespace Aws
const Io::SocketOptions &socketOptions,
const Crt::Io::TlsConnectionOptions &tlsOptions,
bool useWebsocket,
aws_allocator *allocator) noexcept
Allocator *allocator) noexcept
: m_owningClient(nullptr), m_tlsOptions(tlsOptions), m_onAnyCbData(nullptr), m_useTls(true),
m_useWebsocket(useWebsocket), m_allocator(allocator)
{
Expand All @@ -401,7 +378,7 @@ namespace Aws
uint16_t port,
const Io::SocketOptions &socketOptions,
bool useWebsocket,
aws_allocator *allocator) noexcept
Allocator *allocator) noexcept
: m_owningClient(nullptr), m_onAnyCbData(nullptr), m_useTls(false), m_useWebsocket(useWebsocket),
m_allocator(allocator)
{
Expand Down

0 comments on commit 6fac5d6

Please sign in to comment.