From 08fff71a73fdf01a16ab2160dd1588ae4626b540 Mon Sep 17 00:00:00 2001 From: Rafael Silva Date: Tue, 14 Dec 2021 14:58:03 +0000 Subject: [PATCH] Add event callback system Signed-off-by: Rafael Silva --- include/mqtt.h | 155 +++++++++++++++++++++++++++++++------------------ src/mqtt.c | 101 ++++++++++++++++++++++++++------ tests.c | 11 ++-- 3 files changed, 188 insertions(+), 79 deletions(-) diff --git a/include/mqtt.h b/include/mqtt.h index 34c879d..07c7674 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -120,7 +120,7 @@ extern "C" { */ - /** +/** * @brief An enumeration of the MQTT control packet types. * @ingroup unpackers * @@ -128,7 +128,7 @@ extern "C" { * MQTT v3.1.1: MQTT Control Packet Types * */ - enum MQTTControlPacketType { +enum MQTTControlPacketType { MQTT_CONTROL_CONNECT=1u, MQTT_CONTROL_CONNACK=2u, MQTT_CONTROL_PUBLISH=3u, @@ -1084,6 +1084,34 @@ struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQT /* CLIENT */ +/** + * @brief An enumeration of callback events. + * @ingroup details + */ +enum MQTTCallbackEvent { + MQTT_EVENT_RECONNECT, + MQTT_EVENT_CONNECTION_REFUSED, + MQTT_EVENT_CONNECTED, + MQTT_EVENT_DISCONNECTED, + MQTT_EVENT_RECEIVE, + MQTT_EVENT_PUBLISH, + MQTT_EVENT_SUBSCRIBE, + MQTT_EVENT_UNSUBSCRIBE, + MQTT_EVENT_PING, + MQTT_EVENT_PUBLISH_TIMEOUT, + MQTT_EVENT_ERROR +}; + +/** + * @brief union to serve as proxy to multiple datatypes on one pointer. + * @ingroup details + */ +union MQTTCallbackData { + struct mqtt_response_publish *received_msg; + struct mqtt_queued_message *queued_msg; + enum MQTTErrors *error; +}; + /** * @brief An MQTT client. * @ingroup details @@ -1154,17 +1182,56 @@ struct mqtt_client { double typical_response_time; /** - * @brief The callback that is called whenever a publish is received from the broker. + * @brief The callback that is called whenever an event happens + * events happen when: + * - publish is received from the broker. + * + * Any topics that you have subscribed to will be returned from the broker as + * mqtt_response_publish messages. All the publishes received from the broker will + * be passed to this function on a MQTT_EVENT_RECEIVE. + * + * - reconnect is called whenever the client enters an error state + * that requires reinitialization. + * + * The job of the MQTT_EVENT_RECONNECT is to: (1) perform error handling/logging, + * (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the + * client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other + * API calls such as \ref mqtt_subscribe. + * + * - (dis)connect (refused) is called whenever a connection is complete, is refused, or disconnected. + * + * MQTT_EVENT_CONNECTION_REFUSED is called on a connection refused error + * MQTT_EVENT_CONNECTED is called whenever a connection is acknowledged and accepted + * MQTT_EVENT_DISCONNECTED is called whenver a disconnect is sent by the client * - * Any topics that you have subscribed to will be returned from the broker as - * mqtt_response_publish messages. All the publishes received from the broker will - * be passed to this function. + * - publish is called whenver a message WE published is successful, ie acknowledged * - * @note A pointer to publish_response_callback_state is always passed to the callback. - * Use publish_response_callback_state to keep track of any state information you + * MQTT_EVENT_PUBLISH is called when + * on QoS == 0: when the message is sent + * on QoS == 1: when the message is acknowledged by the broker + * on QoS == 2: when the message is acknowledged by the broker + * + * MQTT_EVENT_PUBLISH_TIMEOUT is called whenever a message that requires acknowledgement is not so + * for a response_timeout period, the message is requeued automatically + * + * - (un)subscribe is called when a (un)subscription is ackowledged + * + * MQTT_EVENT_SUBSCRIBE on sub + * MQTT_EVENT_UNSUBSCRIBE on unsub + * + * - ping is called when we get a ping response + * + * MQTT_EVENT_PING + * + * - error is called when an error state not handled by any of the other events happens + * + * MQTT_EVENT_ERROR + * + * @note A pointer to user_callback_state is always passed to the callback. + * Use user_callback_state to keep track of any state information you * need. */ - void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish); + void (*user_callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state); /** * @brief A pointer to any publish_response_callback state information you need. @@ -1172,7 +1239,7 @@ struct mqtt_client { * @note A pointer to this pointer will always be publish_response_callback upon * receiving a publish message from the broker. */ - void* publish_response_callback_state; + void* user_callback_state; /** * @brief A user-specified callback, triggered on each \ref mqtt_sync, allowing @@ -1191,21 +1258,6 @@ struct mqtt_client { */ enum MQTTErrors (*inspector_callback)(struct mqtt_client*); - /** - * @brief A callback that is called whenever the client is in an error state. - * - * This callback is responsible for: application level error handling, closing - * previous sockets, and reestabilishing the connection to the broker and - * session configurations (i.e. subscriptions). - */ - void (*reconnect_callback)(struct mqtt_client*, void**); - - /** - * @brief A pointer to some state. A pointer to this member is passed to - * \ref mqtt_client.reconnect_callback. - */ - void* reconnect_state; - /** * @brief The buffer where ingress data is temporarily stored. */ @@ -1276,8 +1328,8 @@ ssize_t __mqtt_recv(struct mqtt_client *client); * being sent to the broker. This function does the actual sending of * those messages. Additionally this function receives traffic (responses and * acknowledgements) from the broker and responds to that traffic accordingly. - * Lastly this function also calls the \c publish_response_callback when - * any \c MQTT_CONTROL_PUBLISH messages are received. + * Lastly this function also calls the \c user_callback when + * any \c MQTTCallbackEvent events happen. * * @pre mqtt_init must have been called. * @@ -1310,12 +1362,11 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client); * @param[in] sendbufsz The size of \p sendbuf in bytes. * @param[in] recvbuf A buffer that will be used for receiving messages from the broker. * @param[in] recvbufsz The size of \p recvbuf in bytes. - * @param[in] publish_response_callback The callback to call whenever application messages - * are received from the broker. + * @param[in] callback The callback to call whenever events happen. * * @post mqtt_connect must be called. * - * @note \p sockfd is a non-blocking TCP connection. + * @note \p sockfd is a non-blocking socket connection. * @note If \p sendbuf fills up completely during runtime a \c MQTT_ERROR_SEND_BUFFER_IS_FULL * error will be set. Similarly if \p recvbuf is ever to small to receive a message from * the broker an MQTT_ERROR_RECV_BUFFER_TOO_SMALL error will be set. @@ -1348,56 +1399,48 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, mqtt_pal_socket_handle sockfd, uint8_t *sendbuf, size_t sendbufsz, uint8_t *recvbuf, size_t recvbufsz, - void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)); + void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)); /** - * @brief Initializes an MQTT client and enables automatic reconnections. + * @brief Briefly initializes an MQTT client, expecting full init in a reconnect event. * @ingroup api * - * An alternative to \ref mqtt_init that allows the client to automatically reconnect to the - * broker after an error occurs (e.g. socket error or internal buffer overflows). - * - * This is accomplished by calling the \p reconnect_callback whenever the client enters an error - * state. The job of the \p reconnect_callback is to: (1) perform error handling/logging, - * (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the - * client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other - * API calls such as \ref mqtt_subscribe. + * An alternative to \ref mqtt_init that expects the client to automatically reconnect to the + * broker in the reconnect event after an error occurs (e.g. socket error or internal buffer overflows).. * - * The first argument to the \p reconnect_callback is the client (which will be in an error - * state) and the second argument is a pointer to a void pointer where you can store some state + * The first argument to the \p user_callback is the client (which will be in an error + * state) and the second argument is an MQTTCallbackEvent which identifies the event type + * here we care about the MQTT_EVENT_RECONNECT event + * lastly a pointer to a void pointer where you can store some state * information. Internally, MQTT-C calls the reconnect callback like so: * * \code - * client->reconnect_callback(client, &client->reconnect_state) + * client->user_callback(client, MQTT_EVENT_RECONNECT, NULL, &client->user_state) * \endcode * - * Note that the \p reconnect_callback is also called to setup the initial session. After + * Note that the \p user_callback is also called to setup the initial session. After * calling \ref mqtt_init_reconnect the client will be in the error state * \c MQTT_ERROR_INITIAL_RECONNECT. * * @pre None. * * @param[in,out] client The MQTT client that will be initialized. - * @param[in] reconnect_callback The callback that will be called to connect/reconnect the - * client to the broker and perform application level error handling. - * @param[in] reconnect_state A pointer to some state data for your \p reconnect_callback. - * If your \p reconnect_callback does not require any state information set this + * @param[in] callback_state A pointer to some state data for your \p user_callback. + * If your \p user_callback does not require any state information set this * to NULL. A pointer to the memory address where the client stores a copy of this - * pointer is passed as the second argumnet to \p reconnect_callback. - * @param[in] publish_response_callback The callback to call whenever application messages - * are received from the broker. + * pointer is passed as an argumnet to \p user_callback. + * @param[in] callback The callback that will be called to connect/reconnect the + * client and every other event. * - * @post Call \p reconnect_callback yourself, or call \ref mqtt_sync - * (which will trigger the call to \p reconnect_callback). + * @post Call \ref mqtt_sync (which will trigger the call to \p user_callback). * * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or * \ref mqtt_init_reconnect more than once per client). * */ void mqtt_init_reconnect(struct mqtt_client *client, - void (*reconnect_callback)(struct mqtt_client *client, void** state), - void *reconnect_state, - void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)); + void *callback_state, + void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)); /** * @brief Safely assign/reassign a socket and buffers to an new/existing client. diff --git a/src/mqtt.c b/src/mqtt.c index 96c709c..9f13621 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -37,8 +37,8 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client) { enum MQTTErrors err; int reconnecting = 0; MQTT_PAL_MUTEX_LOCK(&client->mutex); - if (client->error != MQTT_ERROR_RECONNECTING && client->error != MQTT_OK && client->reconnect_callback != NULL) { - client->reconnect_callback(client, &client->reconnect_state); + if (client->error != MQTT_ERROR_RECONNECTING && client->error != MQTT_OK && client->user_callback != NULL) { + client->user_callback(client, MQTT_EVENT_RECONNECT, NULL, &client->user_callback_state); /* unlocked during CONNECT */ } else { /* mqtt_reconnect will have queued the disconnect packet - that needs to be sent and then call reconnect */ @@ -55,20 +55,30 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client) { MQTT_PAL_MUTEX_LOCK(&client->mutex); err = client->inspector_callback(client); MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - if (err != MQTT_OK) return err; + if (err != MQTT_OK) goto ERR; } /* Call receive */ - err = (enum MQTTErrors)__mqtt_recv(client); - if (err != MQTT_OK) return err; + err = (enum MQTTErrors)__mqtt_recv(client); + if (err != MQTT_OK) goto ERR; /* Call send */ err = (enum MQTTErrors)__mqtt_send(client); /* mqtt_reconnect will essentially be a disconnect if there is no callback */ - if (reconnecting && client->reconnect_callback != NULL) { + if (reconnecting && client->user_callback != NULL) { MQTT_PAL_MUTEX_LOCK(&client->mutex); - client->reconnect_callback(client, &client->reconnect_state); + client->user_callback(client, MQTT_EVENT_RECONNECT, NULL, &client->user_callback_state); + } + + ERR: + + if (err != MQTT_ERROR_RECONNECTING && err != MQTT_ERROR_CONNECT_CLIENT_ID_REFUSED && + err != MQTT_ERROR_CONNECTION_REFUSED && err != MQTT_OK && + client->user_callback != NULL) { + /* call timeout callback */ + union MQTTCallbackData data = {.error = &err}; + client->user_callback(client, MQTT_EVENT_ERROR, &data, &client->user_callback_state); } return err; @@ -106,7 +116,7 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, mqtt_pal_socket_handle sockfd, uint8_t *sendbuf, size_t sendbufsz, uint8_t *recvbuf, size_t recvbufsz, - void (*publish_response_callback)(void** state,struct mqtt_response_publish *publish)) + void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)) { if (client == NULL || sendbuf == NULL || recvbuf == NULL) { return MQTT_ERROR_NULLPTR; @@ -130,21 +140,19 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, client->number_of_timeouts = 0; client->number_of_keep_alives = 0; client->typical_response_time = -1.0; - client->publish_response_callback = publish_response_callback; client->pid_lfsr = 0; client->send_offset = 0; client->inspector_callback = NULL; - client->reconnect_callback = NULL; - client->reconnect_state = NULL; + client->user_callback_state = NULL; + client->user_callback = callback; return MQTT_OK; } void mqtt_init_reconnect(struct mqtt_client *client, - void (*reconnect)(struct mqtt_client *, void**), - void *reconnect_state, - void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)) + void *callback_state, + void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)) { /* initialize mutex */ MQTT_PAL_MUTEX_INIT(&client->mutex); @@ -163,12 +171,11 @@ void mqtt_init_reconnect(struct mqtt_client *client, client->number_of_timeouts = 0; client->number_of_keep_alives = 0; client->typical_response_time = -1.0; - client->publish_response_callback = publish_response_callback; client->send_offset = 0; client->inspector_callback = NULL; - client->reconnect_callback = reconnect; - client->reconnect_state = reconnect_state; + client->user_callback_state = callback_state; + client->user_callback = callback; } void mqtt_reinit(struct mqtt_client* client, @@ -523,6 +530,12 @@ ssize_t __mqtt_send(struct mqtt_client *client) resend = 1; client->number_of_timeouts += 1; client->send_offset = 0; + + if (client->user_callback != NULL) { + /* call timeout callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PUBLISH_TIMEOUT, &data, &client->user_callback_state); + } } } @@ -590,13 +603,25 @@ ssize_t __mqtt_send(struct mqtt_client *client) switch (msg->control_type) { case MQTT_CONTROL_PUBACK: case MQTT_CONTROL_PUBCOMP: + msg->state = MQTT_QUEUED_COMPLETE; + break; case MQTT_CONTROL_DISCONNECT: msg->state = MQTT_QUEUED_COMPLETE; + if (client->user_callback != NULL) { + /* call disconnect callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_DISCONNECTED, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBLISH: inspected = ( MQTT_PUBLISH_QOS_MASK & (msg->start[0]) ) >> 1; /* qos */ if (inspected == 0) { msg->state = MQTT_QUEUED_COMPLETE; + if (client->user_callback != NULL) { + /* call publish callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + } } else if (inspected == 1) { msg->state = MQTT_QUEUED_AWAITING_ACK; /*set DUP flag for subsequent sends [Spec MQTT-3.3.1-1] */ @@ -731,8 +756,18 @@ ssize_t __mqtt_recv(struct mqtt_client *client) client->error = MQTT_ERROR_CONNECTION_REFUSED; mqtt_recv_ret = MQTT_ERROR_CONNECTION_REFUSED; } + if (client->user_callback != NULL) { + /* call connection refused callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_CONNECTION_REFUSED, &data, &client->user_callback_state); + } break; } + if (client->user_callback != NULL) { + /* call connected callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_CONNECTED, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBLISH: /* stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2 */ @@ -756,8 +791,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) break; } } - /* call publish callback */ - client->publish_response_callback(&client->publish_response_callback_state, &response.decoded.publish); + if (client->user_callback != NULL) { + /* call receive callback */ + union MQTTCallbackData data = {.received_msg = &response.decoded.publish}; + client->user_callback(client, MQTT_EVENT_RECEIVE, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBACK: /* release associated PUBLISH */ @@ -770,6 +808,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); + if (client->user_callback != NULL) { + /* call publish callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBREC: /* check if this is a duplicate */ @@ -793,6 +836,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) mqtt_recv_ret = rv; break; } + if (client->user_callback != NULL) { + /* call publish callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBREL: /* release associated PUBREC */ @@ -842,6 +890,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) mqtt_recv_ret = MQTT_ERROR_SUBSCRIBE_FAILED; break; } + if (client->user_callback != NULL) { + /* call subscribed callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_SUBSCRIBE, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_UNSUBACK: /* release associated UNSUBSCRIBE */ @@ -854,6 +907,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); + if (client->user_callback != NULL) { + /* call unsubscribed callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_UNSUBSCRIBE, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PINGRESP: /* release associated PINGREQ */ @@ -866,6 +924,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); + if (client->user_callback != NULL) { + /* call ping callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PING, &data, &client->user_callback_state); + } break; default: client->error = MQTT_ERROR_MALFORMED_RESPONSE; diff --git a/tests.c b/tests.c index e5233b5..0727111 100644 --- a/tests.c +++ b/tests.c @@ -612,7 +612,10 @@ static void TEST__utility__pid_lfsr(void **unused) { assert_true(period == 65535u); } -void publish_callback(void** state, struct mqtt_response_publish *publish) { +void publish_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state) { + (void) client; + (void) event; + (void) data; /*char *name = (char*) malloc(publish->topic_name_size + 1); memcpy(name, publish->topic_name, publish->topic_name_size); name[publish->topic_name_size] = '\0'; @@ -621,7 +624,7 @@ void publish_callback(void** state, struct mqtt_response_publish *publish) { (const char*) (publish->application_message) ); free(name);*/ - **(int**)state += 1; + **(int**)user_state += 1; } static void TEST__api__connect_ping_disconnect(void **unused) { @@ -679,7 +682,7 @@ static void TEST__api__publish_subscribe__single(void **unused) { sockfd = open_nb_socket(addr, port); mqtt_init(&receiver, sockfd, sendmem2, sizeof(sendmem2), recvmem2, sizeof(recvmem2), publish_callback); - receiver.publish_response_callback_state = &state; + receiver.user_callback_state = &state; /* connect both */ assert_true(mqtt_connect(&sender, "liam-123", NULL, NULL, 0, NULL, NULL, 0, 30) > 0); @@ -742,7 +745,7 @@ static void TEST__api__publish_subscribe__multiple(void **unused) { sockfd = open_nb_socket(addr, port); mqtt_init(&receiver, sockfd, sendmem2, sizeof(sendmem2), recvmem2, sizeof(recvmem2), publish_callback); - receiver.publish_response_callback_state = &state; + receiver.user_callback_state = &state; /* connect both */ if ((rv = mqtt_connect(&sender, "liam-123", NULL, NULL, 0, NULL, NULL, MQTT_CONNECT_CLEAN_SESSION, 30)) <= 0) {