From 38f31accc64446aa485381c8914be7d9f3a35015 Mon Sep 17 00:00:00 2001 From: Rafael Silva Date: Wed, 22 Dec 2021 15:28:33 +0000 Subject: [PATCH] Add event enable/disable feature Signed-off-by: Rafael Silva --- include/mqtt.h | 78 ++++++++++++++++++++++++++++++++++++++++---------- src/mqtt.c | 31 +++++++++++++++----- 2 files changed, 87 insertions(+), 22 deletions(-) diff --git a/include/mqtt.h b/include/mqtt.h index 9513c6c..5ff29bb 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -1090,19 +1090,36 @@ struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQT * @ingroup details */ enum MQTTCallbackEvent { - MQTT_EVENT_RECONNECT, - MQTT_EVENT_CONNECTION_REFUSED, - MQTT_EVENT_CONNECTED, - MQTT_EVENT_DISCONNECTED, - MQTT_EVENT_RECEIVE, - MQTT_EVENT_PUBLISH, - MQTT_EVENT_SUBSCRIBE, - MQTT_EVENT_UNSUBSCRIBE, - MQTT_EVENT_PING, - MQTT_EVENT_PUBLISH_TIMEOUT, - MQTT_EVENT_ERROR + MQTT_EVENT_RECONNECT = (1 << 0), // bit 0 + MQTT_EVENT_CONNECTION_REFUSED = (1 << 1), // bit 1 + MQTT_EVENT_CONNECTED = (1 << 2), // bit 2 + MQTT_EVENT_DISCONNECTED = (1 << 3), // bit 3 + MQTT_EVENT_RECEIVED = (1 << 4), // bit 4 + MQTT_EVENT_PUBLISHED = (1 << 5), // bit 5 + MQTT_EVENT_SUBSCRIBED = (1 << 6), // bit 6 + MQTT_EVENT_UNSUBSCRIBED = (1 << 7), // bit 7 + MQTT_EVENT_PING = (1 << 8), // bit 8 + MQTT_EVENT_PUBLISH_TIMEOUT = (1 << 9), // bit 9 + MQTT_EVENT_ERROR = (1 << 10), // bit 10 }; +//TODO: brief +/** + * @brief + * + */ +#define MQTT_EVENT_MASK (MQTT_EVENT_RECONNECT | \ + MQTT_EVENT_CONNECTION_REFUSED | \ + MQTT_EVENT_CONNECTED | \ + MQTT_EVENT_DISCONNECTED | \ + MQTT_EVENT_RECEIVED | \ + MQTT_EVENT_PUBLISHED | \ + MQTT_EVENT_SUBSCRIBED | \ + MQTT_EVENT_UNSUBSCRIBED | \ + MQTT_EVENT_PING | \ + MQTT_EVENT_PUBLISH_TIMEOUT | \ + MQTT_EVENT_ERROR) + /** * @brief union to serve as proxy to multiple datatypes on one pointer. * @ingroup details @@ -1189,7 +1206,7 @@ struct mqtt_client { * * Any topics that you have subscribed to will be returned from the broker as * mqtt_response_publish messages. All the publishes received from the broker will - * be passed to this function on a MQTT_EVENT_RECEIVE. + * be passed to this function on a MQTT_EVENT_RECEIVED. * * - reconnect is called whenever the client enters an error state * that requires reinitialization. @@ -1207,7 +1224,7 @@ struct mqtt_client { * * - publish is called whenver a message WE published is successful, ie acknowledged * - * MQTT_EVENT_PUBLISH is called when + * MQTT_EVENT_PUBLISHED is called when * on QoS == 0: when the message is sent * on QoS == 1: when the message is acknowledged by the broker * on QoS == 2: when the message is acknowledged by the broker @@ -1217,8 +1234,8 @@ struct mqtt_client { * * - (un)subscribe is called when a (un)subscription is ackowledged * - * MQTT_EVENT_SUBSCRIBE on sub - * MQTT_EVENT_UNSUBSCRIBE on unsub + * MQTT_EVENT_SUBSCRIBED on sub + * MQTT_EVENT_UNSUBSCRIBED on unsub * * - ping is called when we get a ping response * @@ -1259,6 +1276,14 @@ struct mqtt_client { */ enum MQTTErrors (*inspector_callback)(struct mqtt_client*); + /** + * @brief Event enable flag + * + * this is a bit field of the events, where each bit represents an event which is enabled when set to 1 + * the bit positions correspond to \ref enum MQTTCallbackEvent + */ + uint16_t event_enable; + /** * @brief The buffer where ingress data is temporarily stored. */ @@ -1348,6 +1373,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client); */ enum MQTTErrors mqtt_sync(struct mqtt_client *client); +//TODO: doc new fields /** * @brief Initializes an MQTT client. * @ingroup api @@ -1400,8 +1426,11 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, mqtt_pal_socket_handle sockfd, uint8_t *sendbuf, size_t sendbufsz, uint8_t *recvbuf, size_t recvbufsz, + uint16_t event_flags, + void *callback_state, void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)); +//TODO: doc new fields /** * @brief Briefly initializes an MQTT client, expecting full init in a reconnect event. * @ingroup api @@ -1440,6 +1469,7 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, * */ void mqtt_init_reconnect(struct mqtt_client *client, + uint16_t event_flags, void *callback_state, void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)); @@ -1510,6 +1540,24 @@ enum MQTTErrors mqtt_connect(struct mqtt_client *client, todo: will_message should be a void* */ +//TODO: docs +/** + * @brief + * + * @param event_flags + * @return enum MQTTErrors + */ +void mqtt_event_enable(struct mqtt_client *client, uint16_t event_flags); + +//TODO: docs +/** + * @brief + * + * @param event_flags + * @return enum MQTTErrors + */ +void mqtt_event_disable(struct mqtt_client *client, uint16_t event_flags); + /** * @brief Publish an application message. * @ingroup api diff --git a/src/mqtt.c b/src/mqtt.c index 1172133..868e176 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -124,6 +124,8 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, mqtt_pal_socket_handle sockfd, uint8_t *sendbuf, size_t sendbufsz, uint8_t *recvbuf, size_t recvbufsz, + uint16_t event_flags, + void *callback_state, void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)) { if (client == NULL || sendbuf == NULL || recvbuf == NULL) { @@ -152,13 +154,17 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, client->send_offset = 0; client->inspector_callback = NULL; - client->user_callback_state = NULL; + client->user_callback_state = callback_state; client->user_callback = callback; + /* RECEIVED event enabled by default */ + client->event_enable = MQTT_EVENT_RECEIVED | (event_flags & MQTT_EVENT_MASK); + return MQTT_OK; } void mqtt_init_reconnect(struct mqtt_client *client, + uint16_t event_flags, void *callback_state, void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)) { @@ -184,6 +190,9 @@ void mqtt_init_reconnect(struct mqtt_client *client, client->inspector_callback = NULL; client->user_callback_state = callback_state; client->user_callback = callback; + + /* RECEIVED and RECONNECT event enabled by default */ + client->event_enable = MQTT_EVENT_RECEIVED | MQTT_EVENT_RECONNECT | (event_flags & MQTT_EVENT_MASK); } void mqtt_reinit(struct mqtt_client* client, @@ -274,6 +283,14 @@ enum MQTTErrors mqtt_connect(struct mqtt_client *client, return MQTT_OK; } +void mqtt_event_enable(struct mqtt_client *client, uint16_t event_flags) { + client->event_enable |= (event_flags & MQTT_EVENT_MASK); +} + +void mqtt_event_disable(struct mqtt_client *client, uint16_t event_flags) { + client->event_enable &= ~(event_flags & MQTT_EVENT_MASK); +} + enum MQTTErrors mqtt_publish(struct mqtt_client *client, const char* topic_name, const void* application_message, @@ -628,7 +645,7 @@ ssize_t __mqtt_send(struct mqtt_client *client) if (client->user_callback != NULL) { /* call publish callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_PUBLISHED, &data, &client->user_callback_state); } } else if (inspected == 1) { msg->state = MQTT_QUEUED_AWAITING_ACK; @@ -802,7 +819,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call receive callback */ union MQTTCallbackData data = {.received_msg = &response.decoded.publish}; - client->user_callback(client, MQTT_EVENT_RECEIVE, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_RECEIVED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_PUBACK: @@ -819,7 +836,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call publish callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_PUBLISHED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_PUBREC: @@ -847,7 +864,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call publish callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_PUBLISHED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_PUBREL: @@ -901,7 +918,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call subscribed callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_SUBSCRIBE, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_SUBSCRIBED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_UNSUBACK: @@ -918,7 +935,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call unsubscribed callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_UNSUBSCRIBE, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_UNSUBSCRIBED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_PINGRESP: