Skip to content

Commit

Permalink
Add event enable/disable feature
Browse files Browse the repository at this point in the history
Signed-off-by: Rafael Silva <[email protected]>
  • Loading branch information
perigoso committed Jun 3, 2022
1 parent c589ff4 commit 38f31ac
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 22 deletions.
78 changes: 63 additions & 15 deletions include/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -1090,19 +1090,36 @@ struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQT
* @ingroup details
*/
enum MQTTCallbackEvent {
MQTT_EVENT_RECONNECT,
MQTT_EVENT_CONNECTION_REFUSED,
MQTT_EVENT_CONNECTED,
MQTT_EVENT_DISCONNECTED,
MQTT_EVENT_RECEIVE,
MQTT_EVENT_PUBLISH,
MQTT_EVENT_SUBSCRIBE,
MQTT_EVENT_UNSUBSCRIBE,
MQTT_EVENT_PING,
MQTT_EVENT_PUBLISH_TIMEOUT,
MQTT_EVENT_ERROR
MQTT_EVENT_RECONNECT = (1 << 0), // bit 0
MQTT_EVENT_CONNECTION_REFUSED = (1 << 1), // bit 1
MQTT_EVENT_CONNECTED = (1 << 2), // bit 2
MQTT_EVENT_DISCONNECTED = (1 << 3), // bit 3
MQTT_EVENT_RECEIVED = (1 << 4), // bit 4
MQTT_EVENT_PUBLISHED = (1 << 5), // bit 5
MQTT_EVENT_SUBSCRIBED = (1 << 6), // bit 6
MQTT_EVENT_UNSUBSCRIBED = (1 << 7), // bit 7
MQTT_EVENT_PING = (1 << 8), // bit 8
MQTT_EVENT_PUBLISH_TIMEOUT = (1 << 9), // bit 9
MQTT_EVENT_ERROR = (1 << 10), // bit 10
};

//TODO: brief
/**
* @brief
*
*/
#define MQTT_EVENT_MASK (MQTT_EVENT_RECONNECT | \
MQTT_EVENT_CONNECTION_REFUSED | \
MQTT_EVENT_CONNECTED | \
MQTT_EVENT_DISCONNECTED | \
MQTT_EVENT_RECEIVED | \
MQTT_EVENT_PUBLISHED | \
MQTT_EVENT_SUBSCRIBED | \
MQTT_EVENT_UNSUBSCRIBED | \
MQTT_EVENT_PING | \
MQTT_EVENT_PUBLISH_TIMEOUT | \
MQTT_EVENT_ERROR)

/**
* @brief union to serve as proxy to multiple datatypes on one pointer.
* @ingroup details
Expand Down Expand Up @@ -1189,7 +1206,7 @@ struct mqtt_client {
*
* Any topics that you have subscribed to will be returned from the broker as
* mqtt_response_publish messages. All the publishes received from the broker will
* be passed to this function on a MQTT_EVENT_RECEIVE.
* be passed to this function on a MQTT_EVENT_RECEIVED.
*
* - reconnect is called whenever the client enters an error state
* that requires reinitialization.
Expand All @@ -1207,7 +1224,7 @@ struct mqtt_client {
*
* - publish is called whenver a message WE published is successful, ie acknowledged
*
* MQTT_EVENT_PUBLISH is called when
* MQTT_EVENT_PUBLISHED is called when
* on QoS == 0: when the message is sent
* on QoS == 1: when the message is acknowledged by the broker
* on QoS == 2: when the message is acknowledged by the broker
Expand All @@ -1217,8 +1234,8 @@ struct mqtt_client {
*
* - (un)subscribe is called when a (un)subscription is ackowledged
*
* MQTT_EVENT_SUBSCRIBE on sub
* MQTT_EVENT_UNSUBSCRIBE on unsub
* MQTT_EVENT_SUBSCRIBED on sub
* MQTT_EVENT_UNSUBSCRIBED on unsub
*
* - ping is called when we get a ping response
*
Expand Down Expand Up @@ -1259,6 +1276,14 @@ struct mqtt_client {
*/
enum MQTTErrors (*inspector_callback)(struct mqtt_client*);

/**
* @brief Event enable flag
*
* this is a bit field of the events, where each bit represents an event which is enabled when set to 1
* the bit positions correspond to \ref enum MQTTCallbackEvent
*/
uint16_t event_enable;

/**
* @brief The buffer where ingress data is temporarily stored.
*/
Expand Down Expand Up @@ -1348,6 +1373,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client);
*/
enum MQTTErrors mqtt_sync(struct mqtt_client *client);

//TODO: doc new fields
/**
* @brief Initializes an MQTT client.
* @ingroup api
Expand Down Expand Up @@ -1400,8 +1426,11 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client,
mqtt_pal_socket_handle sockfd,
uint8_t *sendbuf, size_t sendbufsz,
uint8_t *recvbuf, size_t recvbufsz,
uint16_t event_flags,
void *callback_state,
void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state));

//TODO: doc new fields
/**
* @brief Briefly initializes an MQTT client, expecting full init in a reconnect event.
* @ingroup api
Expand Down Expand Up @@ -1440,6 +1469,7 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client,
*
*/
void mqtt_init_reconnect(struct mqtt_client *client,
uint16_t event_flags,
void *callback_state,
void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state));

Expand Down Expand Up @@ -1510,6 +1540,24 @@ enum MQTTErrors mqtt_connect(struct mqtt_client *client,
todo: will_message should be a void*
*/

//TODO: docs
/**
* @brief
*
* @param event_flags
* @return enum MQTTErrors
*/
void mqtt_event_enable(struct mqtt_client *client, uint16_t event_flags);

//TODO: docs
/**
* @brief
*
* @param event_flags
* @return enum MQTTErrors
*/
void mqtt_event_disable(struct mqtt_client *client, uint16_t event_flags);

/**
* @brief Publish an application message.
* @ingroup api
Expand Down
31 changes: 24 additions & 7 deletions src/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client,
mqtt_pal_socket_handle sockfd,
uint8_t *sendbuf, size_t sendbufsz,
uint8_t *recvbuf, size_t recvbufsz,
uint16_t event_flags,
void *callback_state,
void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state))
{
if (client == NULL || sendbuf == NULL || recvbuf == NULL) {
Expand Down Expand Up @@ -152,13 +154,17 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client,
client->send_offset = 0;

client->inspector_callback = NULL;
client->user_callback_state = NULL;
client->user_callback_state = callback_state;
client->user_callback = callback;

/* RECEIVED event enabled by default */
client->event_enable = MQTT_EVENT_RECEIVED | (event_flags & MQTT_EVENT_MASK);

return MQTT_OK;
}

void mqtt_init_reconnect(struct mqtt_client *client,
uint16_t event_flags,
void *callback_state,
void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state))
{
Expand All @@ -184,6 +190,9 @@ void mqtt_init_reconnect(struct mqtt_client *client,
client->inspector_callback = NULL;
client->user_callback_state = callback_state;
client->user_callback = callback;

/* RECEIVED and RECONNECT event enabled by default */
client->event_enable = MQTT_EVENT_RECEIVED | MQTT_EVENT_RECONNECT | (event_flags & MQTT_EVENT_MASK);
}

void mqtt_reinit(struct mqtt_client* client,
Expand Down Expand Up @@ -274,6 +283,14 @@ enum MQTTErrors mqtt_connect(struct mqtt_client *client,
return MQTT_OK;
}

void mqtt_event_enable(struct mqtt_client *client, uint16_t event_flags) {
client->event_enable |= (event_flags & MQTT_EVENT_MASK);
}

void mqtt_event_disable(struct mqtt_client *client, uint16_t event_flags) {
client->event_enable &= ~(event_flags & MQTT_EVENT_MASK);
}

enum MQTTErrors mqtt_publish(struct mqtt_client *client,
const char* topic_name,
const void* application_message,
Expand Down Expand Up @@ -628,7 +645,7 @@ ssize_t __mqtt_send(struct mqtt_client *client)
if (client->user_callback != NULL) {
/* call publish callback */
union MQTTCallbackData data = {.queued_msg = msg};
client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state);
client->user_callback(client, MQTT_EVENT_PUBLISHED, &data, &client->user_callback_state);
}
} else if (inspected == 1) {
msg->state = MQTT_QUEUED_AWAITING_ACK;
Expand Down Expand Up @@ -802,7 +819,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client)
if (client->user_callback != NULL) {
/* call receive callback */
union MQTTCallbackData data = {.received_msg = &response.decoded.publish};
client->user_callback(client, MQTT_EVENT_RECEIVE, &data, &client->user_callback_state);
client->user_callback(client, MQTT_EVENT_RECEIVED, &data, &client->user_callback_state);
}
break;
case MQTT_CONTROL_PUBACK:
Expand All @@ -819,7 +836,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client)
if (client->user_callback != NULL) {
/* call publish callback */
union MQTTCallbackData data = {.queued_msg = msg};
client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state);
client->user_callback(client, MQTT_EVENT_PUBLISHED, &data, &client->user_callback_state);
}
break;
case MQTT_CONTROL_PUBREC:
Expand Down Expand Up @@ -847,7 +864,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client)
if (client->user_callback != NULL) {
/* call publish callback */
union MQTTCallbackData data = {.queued_msg = msg};
client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state);
client->user_callback(client, MQTT_EVENT_PUBLISHED, &data, &client->user_callback_state);
}
break;
case MQTT_CONTROL_PUBREL:
Expand Down Expand Up @@ -901,7 +918,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client)
if (client->user_callback != NULL) {
/* call subscribed callback */
union MQTTCallbackData data = {.queued_msg = msg};
client->user_callback(client, MQTT_EVENT_SUBSCRIBE, &data, &client->user_callback_state);
client->user_callback(client, MQTT_EVENT_SUBSCRIBED, &data, &client->user_callback_state);
}
break;
case MQTT_CONTROL_UNSUBACK:
Expand All @@ -918,7 +935,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client)
if (client->user_callback != NULL) {
/* call unsubscribed callback */
union MQTTCallbackData data = {.queued_msg = msg};
client->user_callback(client, MQTT_EVENT_UNSUBSCRIBE, &data, &client->user_callback_state);
client->user_callback(client, MQTT_EVENT_UNSUBSCRIBED, &data, &client->user_callback_state);
}
break;
case MQTT_CONTROL_PINGRESP:
Expand Down

0 comments on commit 38f31ac

Please sign in to comment.