diff --git a/.github/workflows/fsanitize-check.yml b/.github/workflows/fsanitize-check.yml index 073496907..fdba323c9 100644 --- a/.github/workflows/fsanitize-check.yml +++ b/.github/workflows/fsanitize-check.yml @@ -63,22 +63,29 @@ jobs: - name: make check run: make check + - name: configure without TLS + run: ./configure CC="gcc -fsanitize=address" --enable-all --disable-tls + - name: make + run: make + - name: make check + run: make check + - name: configure with SN Enabled - run: ./configure --enable-sn CC="gcc -fsanitize=address" + run: ./configure CC="gcc -fsanitize=address" --enable-sn - name: make run: make - name: make check run: make check - name: configure with Non-Block - run: ./configure --enable-nonblock CFLAGS="-DWOLFMQTT_TEST_NONBLOCK" CC="gcc -fsanitize=address" + run: ./configure CC="gcc -fsanitize=address" --enable-nonblock CFLAGS="-DWOLFMQTT_TEST_NONBLOCK" - name: make run: make - name: make check run: make check - name: configure with Non-Block and Multi-threading - run: ./configure --enable-mt --enable-nonblock CFLAGS="-DWOLFMQTT_TEST_NONBLOCK" CC="gcc -fsanitize=address" + run: ./configure CC="gcc -fsanitize=address" --enable-mt --enable-nonblock CFLAGS="-DWOLFMQTT_TEST_NONBLOCK" - name: make run: make - name: make check @@ -89,3 +96,4 @@ jobs: if: failure() || cancelled() run: | cat test-suite.log + cat scripts/*.log diff --git a/.github/workflows/macos-check.yml b/.github/workflows/macos-check.yml index b79b5af59..18c724a22 100644 --- a/.github/workflows/macos-check.yml +++ b/.github/workflows/macos-check.yml @@ -6,20 +6,24 @@ on: pull_request: branches: [ '*' ] +env: + WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1 + jobs: build: runs-on: macos-latest timeout-minutes: 10 - env: - WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1 steps: - uses: actions/checkout@master with: repository: wolfssl/wolfssl path: wolfssl - name: brew - run: brew install automake libtool md5sha1sum + run: | + brew install automake libtool md5sha1sum mosquitto + echo "/usr/local/sbin/" >> $GITHUB_PATH + echo "/usr/local/opt/mosquitto/sbin/" >> $GITHUB_PATH - name: wolfssl autogen working-directory: ./wolfssl @@ -45,6 +49,13 @@ jobs: - name: make check run: make check + - name: configure without TLS + run: ./configure --enable-all --disable-tls + - name: make + run: make + - name: make check + run: make check + - name: configure with SN Enabled run: ./configure --enable-sn - name: make @@ -71,3 +82,4 @@ jobs: if: failure() || cancelled() run: | cat test-suite.log + cat scripts/*.log diff --git a/.github/workflows/ubuntu-check.yml b/.github/workflows/ubuntu-check.yml index cd1ecf65b..5f6bef603 100644 --- a/.github/workflows/ubuntu-check.yml +++ b/.github/workflows/ubuntu-check.yml @@ -63,6 +63,15 @@ jobs: - name: wolfmqtt make check run: make check + - name: wolfmqtt configure without TLS + env: + WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1 + run: ./configure --enable-all --disable-tls + - name: wolfmqtt make + run: make + - name: wolfmqtt make check + run: make check + - name: wolfmqtt configure with SN Enabled env: WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1 @@ -104,3 +113,4 @@ jobs: if: failure() || cancelled() run: | cat test-suite.log + cat scripts/*.log diff --git a/examples/firmware/fwclient.c b/examples/firmware/fwclient.c index b28914ef7..351eaf4b4 100644 --- a/examples/firmware/fwclient.c +++ b/examples/firmware/fwclient.c @@ -36,18 +36,18 @@ /* The signature wrapper for this example was added in wolfSSL after 3.7.1 */ #if defined(LIBWOLFSSL_VERSION_HEX) && LIBWOLFSSL_VERSION_HEX > 0x03007001 \ && defined(HAVE_ECC) && !defined(NO_SIG_WRAPPER) - #undef ENABLE_FIRMWARE_EXAMPLE - #define ENABLE_FIRMWARE_EXAMPLE + #undef ENABLE_FIRMWARE_SIG + #define ENABLE_FIRMWARE_SIG #endif #endif -#if defined(ENABLE_FIRMWARE_EXAMPLE) - +#if defined(ENABLE_FIRMWARE_SIG) #include #include #include #include +#endif #include "fwclient.h" #include "firmware.h" @@ -109,10 +109,12 @@ static int fwfile_save(const char* filePath, byte* fileBuf, int fileLen) static int fw_message_process(MQTTCtx *mqttCtx, byte* buffer, word32 len) { - int rc; + int rc = 0; FirmwareHeader* header = (FirmwareHeader*)buffer; byte *sigBuf, *pubKeyBuf, *fwBuf; +#ifdef ENABLE_FIRMWARE_SIG ecc_key eccKey; +#endif word32 check_len = sizeof(FirmwareHeader) + header->sigLen + header->pubKeyLen + header->fwLen; @@ -129,6 +131,7 @@ static int fw_message_process(MQTTCtx *mqttCtx, byte* buffer, word32 len) fwBuf = (buffer + sizeof(FirmwareHeader) + header->sigLen + header->pubKeyLen); +#ifdef ENABLE_FIRMWARE_SIG /* Import the public key */ wc_ecc_init(&eccKey); rc = wc_ecc_import_x963(pubKeyBuf, header->pubKeyLen, &eccKey); @@ -141,17 +144,22 @@ static int fw_message_process(MQTTCtx *mqttCtx, byte* buffer, word32 len) &eccKey, sizeof(eccKey)); PRINTF("Firmware Signature Verification: %s (%d)", (rc == 0) ? "Pass" : "Fail", rc); - +#else + (void)pubKeyBuf; + (void)sigBuf; +#endif if (rc == 0) { /* TODO: Process firmware image */ /* For example, save to disk using topic name */ fwfile_save(mqttCtx->pub_file, fwBuf, header->fwLen); } +#ifdef ENABLE_FIRMWARE_SIG } else { PRINTF("ECC public key import failed! %d", rc); } wc_ecc_free(&eccKey); +#endif return rc; } @@ -484,89 +492,76 @@ int fwclient_test(MQTTCtx *mqttCtx) return rc; } -#endif /* ENABLE_FIRMWARE_EXAMPLE */ /* so overall tests can pull in test function */ - #ifdef USE_WINDOWS_API - #include /* for ctrl handler */ +#ifdef USE_WINDOWS_API + #include /* for ctrl handler */ - static BOOL CtrlHandler(DWORD fdwCtrlType) - { - if (fdwCtrlType == CTRL_C_EVENT) { - #if defined(ENABLE_FIRMWARE_EXAMPLE) - mStopRead = 1; - #endif - PRINTF("Received Ctrl+c"); - return TRUE; - } - return FALSE; + static BOOL CtrlHandler(DWORD fdwCtrlType) + { + if (fdwCtrlType == CTRL_C_EVENT) { + #if defined(ENABLE_FIRMWARE_SIG) + mStopRead = 1; + #endif + PRINTF("Received Ctrl+c"); + return TRUE; } - #elif HAVE_SIGNAL - #include - static void sig_handler(int signo) - { - if (signo == SIGINT) { - #if defined(ENABLE_FIRMWARE_EXAMPLE) - mStopRead = 1; - #endif - PRINTF("Received SIGINT"); - } + return FALSE; + } +#elif HAVE_SIGNAL + #include + static void sig_handler(int signo) + { + if (signo == SIGINT) { + #if defined(ENABLE_FIRMWARE_SIG) + mStopRead = 1; + #endif + PRINTF("Received SIGINT"); } - #endif + } +#endif #if defined(NO_MAIN_DRIVER) - int fwclient_main(int argc, char** argv) +int fwclient_main(int argc, char** argv) #else - int main(int argc, char** argv) +int main(int argc, char** argv) #endif - { - int rc; - #ifdef ENABLE_FIRMWARE_EXAMPLE - MQTTCtx mqttCtx; - - /* init defaults */ - mqtt_init_ctx(&mqttCtx); - mqttCtx.app_name = "fwclient"; - mqttCtx.client_id = mqtt_append_random(FIRMWARE_CLIIENT_ID, - (word32)XSTRLEN(FIRMWARE_CLIIENT_ID)); - mqttCtx.dynamicClientId = 1; - mqttCtx.topic_name = FIRMWARE_TOPIC_NAME; - mqttCtx.qos = FIRMWARE_MQTT_QOS; - mqttCtx.pub_file = FIRMWARE_DEF_SAVE_AS; - - /* parse arguments */ - rc = mqtt_parse_args(&mqttCtx, argc, argv); - if (rc != 0) { - return rc; - } - #endif - - #ifdef USE_WINDOWS_API - if (SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE) == FALSE) { - PRINTF("Error setting Ctrl Handler! Error %d", (int)GetLastError()); - } - #elif HAVE_SIGNAL - if (signal(SIGINT, sig_handler) == SIG_ERR) { - PRINTF("Can't catch SIGINT"); - } - #endif - - #ifdef ENABLE_FIRMWARE_EXAMPLE - do { - rc = fwclient_test(&mqttCtx); - } while (!mStopRead && rc == MQTT_CODE_CONTINUE); +{ + int rc; + MQTTCtx mqttCtx; + + /* init defaults */ + mqtt_init_ctx(&mqttCtx); + mqttCtx.app_name = "fwclient"; + mqttCtx.client_id = mqtt_append_random(FIRMWARE_CLIIENT_ID, + (word32)XSTRLEN(FIRMWARE_CLIIENT_ID)); + mqttCtx.dynamicClientId = 1; + mqttCtx.topic_name = FIRMWARE_TOPIC_NAME; + mqttCtx.qos = FIRMWARE_MQTT_QOS; + mqttCtx.pub_file = FIRMWARE_DEF_SAVE_AS; + + /* parse arguments */ + rc = mqtt_parse_args(&mqttCtx, argc, argv); + if (rc != 0) { + return rc; + } - mqtt_free_ctx(&mqttCtx); - #else - (void)argc; - (void)argv; +#ifdef USE_WINDOWS_API + if (SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE) == FALSE) { + PRINTF("Error setting Ctrl Handler! Error %d", (int)GetLastError()); + } +#elif HAVE_SIGNAL + if (signal(SIGINT, sig_handler) == SIG_ERR) { + PRINTF("Can't catch SIGINT"); + } +#endif - /* This example requires wolfSSL after 3.7.1 for signature wrapper */ - PRINTF("Example not compiled in!"); - rc = 0; /* return success, so make check passes with TLS disabled */ - #endif + do { + rc = fwclient_test(&mqttCtx); + } while (!mStopRead && rc == MQTT_CODE_CONTINUE); - return (rc == 0) ? 0 : EXIT_FAILURE; - } + mqtt_free_ctx(&mqttCtx); + return (rc == 0) ? 0 : EXIT_FAILURE; +} diff --git a/examples/firmware/fwpush.c b/examples/firmware/fwpush.c index c16bbb77e..7b2ee2ad6 100644 --- a/examples/firmware/fwpush.c +++ b/examples/firmware/fwpush.c @@ -36,18 +36,19 @@ /* The signature wrapper for this example was added in wolfSSL after 3.7.1 */ #if defined(LIBWOLFSSL_VERSION_HEX) && LIBWOLFSSL_VERSION_HEX > 0x03007001 \ && defined(HAVE_ECC) && !defined(NO_SIG_WRAPPER) - #undef ENABLE_FIRMWARE_EXAMPLE - #define ENABLE_FIRMWARE_EXAMPLE + #undef ENABLE_FIRMWARE_SIG + #define ENABLE_FIRMWARE_SIG #endif #endif -#if defined(ENABLE_FIRMWARE_EXAMPLE) +#ifdef ENABLE_FIRMWARE_SIG #include #include #include #include +#endif #include "fwpush.h" #include "firmware.h" @@ -143,10 +144,12 @@ static int fw_message_build(MQTTCtx *mqttCtx, const char* fwFile, int msgLen = 0, fwLen = 0; word32 keyLen = 0, sigLen = 0; FirmwareHeader *header; +#ifdef ENABLE_FIRMWARE_SIG ecc_key eccKey; WC_RNG rng; wc_InitRng(&rng); +#endif /* Verify file can be loaded */ rc = mqtt_file_load(fwFile, &fwBuf, &fwLen); @@ -157,6 +160,7 @@ static int fw_message_build(MQTTCtx *mqttCtx, const char* fwFile, } PRINTF("Firmware File %s is %d bytes", fwFile, fwLen); +#ifdef ENABLE_FIRMWARE_SIG /* Generate Key */ /* Note: Real implementation would use previously exchanged/signed key */ wc_ecc_init(&eccKey); @@ -192,11 +196,13 @@ static int fw_message_build(MQTTCtx *mqttCtx, const char* fwFile, rc = EXIT_FAILURE; goto exit; } +#endif /* Display lengths */ PRINTF("Firmware Message: Sig %d bytes, Key %d bytes, File %d bytes", sigLen, keyLen, fwLen); +#ifdef ENABLE_FIRMWARE_SIG /* Generate Signature */ rc = wc_SignatureGenerate( FIRMWARE_HASH_TYPE, FIRMWARE_SIG_TYPE, @@ -209,6 +215,7 @@ static int fw_message_build(MQTTCtx *mqttCtx, const char* fwFile, rc = EXIT_FAILURE; goto exit; } +#endif /* Assemble message */ msgLen = sizeof(FirmwareHeader) + sigLen + keyLen + fwLen; @@ -225,8 +232,10 @@ static int fw_message_build(MQTTCtx *mqttCtx, const char* fwFile, header->sigLen = sigLen; header->pubKeyLen = keyLen; header->fwLen = fwLen; - XMEMCPY(&msgBuf[sizeof(FirmwareHeader)], sigBuf, sigLen); - XMEMCPY(&msgBuf[sizeof(FirmwareHeader) + sigLen], keyBuf, keyLen); + if (sigLen > 0) + XMEMCPY(&msgBuf[sizeof(FirmwareHeader)], sigBuf, sigLen); + if (keyLen > 0) + XMEMCPY(&msgBuf[sizeof(FirmwareHeader) + sigLen], keyBuf, keyLen); rc = 0; @@ -246,8 +255,10 @@ static int fw_message_build(MQTTCtx *mqttCtx, const char* fwFile, if (sigBuf) WOLFMQTT_FREE(sigBuf); if (fwBuf) WOLFMQTT_FREE(fwBuf); +#ifdef ENABLE_FIRMWARE_SIG wc_ecc_free(&eccKey); wc_FreeRng(&rng); +#endif return rc; } @@ -272,7 +283,7 @@ int fwpush_test(MQTTCtx *mqttCtx) goto disconn; } - switch(mqttCtx->stat) + switch (mqttCtx->stat) { case WMQ_BEGIN: { @@ -522,87 +533,76 @@ int fwpush_test(MQTTCtx *mqttCtx) return rc; } -#endif /* ENABLE_FIRMWARE_EXAMPLE */ /* so overall tests can pull in test function */ -#ifndef NO_MAIN_DRIVER - #ifdef USE_WINDOWS_API - #include /* for ctrl handler */ +#ifdef USE_WINDOWS_API + #include /* for ctrl handler */ - static BOOL CtrlHandler(DWORD fdwCtrlType) - { - if (fdwCtrlType == CTRL_C_EVENT) { - #if defined(ENABLE_FIRMWARE_EXAMPLE) - mStopRead = 1; - #endif - PRINTF("Received Ctrl+c"); - return TRUE; - } - return FALSE; - } - #elif HAVE_SIGNAL - #include - static void sig_handler(int signo) - { - if (signo == SIGINT) { - #if defined(ENABLE_FIRMWARE_EXAMPLE) - mStopRead = 1; - #endif - PRINTF("Received SIGINT"); - } - } - #endif - - int main(int argc, char** argv) + static BOOL CtrlHandler(DWORD fdwCtrlType) { - int rc; - #ifdef ENABLE_FIRMWARE_EXAMPLE - MQTTCtx mqttCtx; - - /* init defaults */ - mqtt_init_ctx(&mqttCtx); - mqttCtx.app_name = "fwpush"; - mqttCtx.client_id = mqtt_append_random(FIRMWARE_PUSH_CLIENT_ID, - (word32)XSTRLEN(FIRMWARE_PUSH_CLIENT_ID)); - mqttCtx.dynamicClientId = 1; - mqttCtx.topic_name = FIRMWARE_TOPIC_NAME; - mqttCtx.qos = FIRMWARE_MQTT_QOS; - mqttCtx.pub_file = FIRMWARE_PUSH_DEF_FILE; - - /* parse arguments */ - rc = mqtt_parse_args(&mqttCtx, argc, argv); - if (rc != 0) { - return rc; + if (fdwCtrlType == CTRL_C_EVENT) { + #if defined(ENABLE_FIRMWARE_SIG) + mStopRead = 1; + #endif + PRINTF("Received Ctrl+c"); + return TRUE; } - #endif - - #ifdef USE_WINDOWS_API - if (SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE) == FALSE) { - PRINTF("Error setting Ctrl Handler! Error %d", (int)GetLastError()); - } - #elif HAVE_SIGNAL - if (signal(SIGINT, sig_handler) == SIG_ERR) { - PRINTF("Can't catch SIGINT"); + return FALSE; + } +#elif HAVE_SIGNAL + #include + static void sig_handler(int signo) + { + if (signo == SIGINT) { + #if defined(ENABLE_FIRMWARE_SIG) + mStopRead = 1; + #endif + PRINTF("Received SIGINT"); } - #endif + } +#endif - #ifdef ENABLE_FIRMWARE_EXAMPLE - do { - rc = fwpush_test(&mqttCtx); - } while (!mStopRead && rc == MQTT_CODE_CONTINUE); +#if defined(NO_MAIN_DRIVER) +int fwpush_main(int argc, char** argv) +#else +int main(int argc, char** argv) +#endif +{ + int rc; + MQTTCtx mqttCtx; + + /* init defaults */ + mqtt_init_ctx(&mqttCtx); + mqttCtx.app_name = "fwpush"; + mqttCtx.client_id = mqtt_append_random(FIRMWARE_PUSH_CLIENT_ID, + (word32)XSTRLEN(FIRMWARE_PUSH_CLIENT_ID)); + mqttCtx.dynamicClientId = 1; + mqttCtx.topic_name = FIRMWARE_TOPIC_NAME; + mqttCtx.qos = FIRMWARE_MQTT_QOS; + mqttCtx.pub_file = FIRMWARE_PUSH_DEF_FILE; + + /* parse arguments */ + rc = mqtt_parse_args(&mqttCtx, argc, argv); + if (rc != 0) { + return rc; + } - mqtt_free_ctx(&mqttCtx); - #else - (void)argc; - (void)argv; +#ifdef USE_WINDOWS_API + if (SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE) == FALSE) { + PRINTF("Error setting Ctrl Handler! Error %d", (int)GetLastError()); + } +#elif HAVE_SIGNAL + if (signal(SIGINT, sig_handler) == SIG_ERR) { + PRINTF("Can't catch SIGINT"); + } +#endif - /* This example requires wolfSSL after 3.7.1 for signature wrapper */ - PRINTF("Example not compiled in!"); - rc = 0; /* return success, so make check passes with TLS disabled */ - #endif + do { + rc = fwpush_test(&mqttCtx); + } while (!mStopRead && rc == MQTT_CODE_CONTINUE); - return (rc == 0) ? 0 : EXIT_FAILURE; - } + mqtt_free_ctx(&mqttCtx); -#endif /* NO_MAIN_DRIVER */ + return (rc == 0) ? 0 : EXIT_FAILURE; +} diff --git a/examples/firmware/fwpush.h b/examples/firmware/fwpush.h index 270b593bc..3230652de 100644 --- a/examples/firmware/fwpush.h +++ b/examples/firmware/fwpush.h @@ -38,4 +38,8 @@ typedef struct FwpushCBdata_s { /* Exposed functions */ int fwpush_test(MQTTCtx *mqttCtx); +#if defined(NO_MAIN_DRIVER) +int fwpush_main(int argc, char** argv); +#endif + #endif /* WOLFMQTT_FWPUSH_H */ diff --git a/examples/mqttexample.c b/examples/mqttexample.c index fdfa604a6..07d6bf374 100644 --- a/examples/mqttexample.c +++ b/examples/mqttexample.c @@ -150,6 +150,7 @@ static int mqtt_get_rand(byte* data, word32 len) for (i = 0; i Port to connect on, default: Normal %d, TLS %d", MQTT_DEFAULT_PORT, MQTT_SECURE_PORT); - PRINTF("-t Enable TLS"); + PRINTF("-t Enable TLS"); /* Note: this string is used in test + * scripts to detect TLS feature */ PRINTF("-A Load CA (validate peer)"); PRINTF("-K Use private key (for TLS mutual auth)"); PRINTF("-c Use certificate (for TLS mutual auth)"); diff --git a/examples/mqttnet.c b/examples/mqttnet.c index 6b6b4cf74..acd9d3340 100644 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -800,7 +800,7 @@ static int NetRead_ex(void *context, byte* buf, int buf_len, if (do_read) { /* Try and read number of buf_len provided, - minus what's already been read */ + * minus what's already been read */ rc = (int)SOCK_RECV(sock->fd, &buf[bytes], buf_len - bytes, diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index b00365b95..4114d4549 100755 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -103,13 +103,14 @@ static int mqtt_stop_get(void) return rc; } +#define MQTT_CODE_TEST_EXIT -200 static int check_response(MQTTCtx* mqttCtx, int rc, word32* startSec, int packet_type, word32 timeoutMs) { /* check for test mode */ - if (mqtt_stop_get()) { + if (mqtt_stop_get() && packet_type != MQTT_PACKET_TYPE_UNSUBSCRIBE) { PRINTF("MQTT Exiting Thread..."); - return MQTT_CODE_SUCCESS; + return MQTT_CODE_TEST_EXIT; } #ifdef WOLFMQTT_NONBLOCK @@ -387,6 +388,11 @@ static int multithread_test_finish(MQTTCtx *mqttCtx) PRINTF("MQTT Client Done: %d", mqttCtx->return_code); + if (mStopRead && mqttCtx->return_code == MQTT_CODE_TEST_EXIT) { + /* this is okay, we requested termination */ + mqttCtx->return_code = MQTT_CODE_SUCCESS; + } + return mqttCtx->return_code; } @@ -507,12 +513,17 @@ static void *waitMessage_task(void *param) } /* Try and read packet */ - rc = MqttClient_WaitMessage(&mqttCtx->client, cmd_timeout_ms); + rc = MqttClient_WaitMessage_ex(&mqttCtx->client, &mqttCtx->client.msg, + cmd_timeout_ms); if (mqttCtx->test_mode && rc == MQTT_CODE_ERROR_TIMEOUT) { rc = 0; } rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_ANY, cmd_timeout_ms); + if (rc != MQTT_CODE_SUCCESS && rc != MQTT_CODE_CONTINUE) { + MqttClient_CancelMessage(&mqttCtx->client, + (MqttObject*)&mqttCtx->client.msg); + } /* check return code */ if (rc == MQTT_CODE_CONTINUE) { @@ -540,6 +551,10 @@ static void *waitMessage_task(void *param) rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); } while (rc == MQTT_CODE_CONTINUE); + if (rc != MQTT_CODE_SUCCESS) { + MqttClient_CancelMessage(&mqttCtx->client, + (MqttObject*)&mqttCtx->publish); + } PRINTF("MQTT Publish: Topic %s, %s (%d)", mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); @@ -548,8 +563,8 @@ static void *waitMessage_task(void *param) #endif else if (rc == MQTT_CODE_ERROR_TIMEOUT) { if (mqttCtx->test_mode) { - mqtt_stop_set(); /* timeout in test mode should exit */ + mqtt_stop_set(); PRINTF("MQTT Exiting timeout..."); break; } @@ -605,11 +620,11 @@ static void *publish_task(void *param) MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish); } - wm_SemLock(&mtLock); PRINTF("MQTT Publish: Topic %s, %s (%d)", publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); + wm_SemLock(&mtLock); mNumMsgsDone++; wm_SemUnlock(&mtLock); @@ -734,11 +749,11 @@ int multithread_test(MQTTCtx *mqttCtx) /* Join threads - wait for completion */ if (THREAD_JOIN(threadList, threadCount)) { #ifdef __GLIBC__ - /* %m is specific to glibc/uclibc/musl, and recently (2018) - * added to FreeBSD */ + /* "%m" is specific to glibc/uclibc/musl, and FreeBSD (as of 2018). + * Uses errno and not argument required */ PRINTF("THREAD_JOIN failed: %m"); #else - PRINTF("THREAD_JOIN failed: %d",errno); + PRINTF("THREAD_JOIN failed: %d", errno); #endif } diff --git a/scripts/awsiot.test b/scripts/awsiot.test index ce7e90472..ec65c8122 100755 --- a/scripts/awsiot.test +++ b/scripts/awsiot.test @@ -5,8 +5,15 @@ # Check for application [ ! -x ./examples/aws/awsiot ] && echo -e "\n\nAWS IoT MQTT Client doesn't exist" && exit 1 -if test -n "$WOLFMQTT_NO_EXTERNAL_BROKER_TESTS"; then - echo "WOLFMQTT_NO_EXTERNAL_BROKER_TESTS set, won't run" +# Check for TLS support +has_tls=no +./examples/aws/awsiot -? 2>&1 | grep -- 'Enable TLS' +if [ $? -eq 0 ]; then + has_tls=yes +fi + +if test -n "$WOLFMQTT_NO_EXTERNAL_BROKER_TESTS" && test $has_tls == yes; then + echo "WOLFMQTT_NO_EXTERNAL_BROKER_TESTS set or no TLS, won't run" else def_args="-T -C 2000" diff --git a/scripts/azureiothub.test b/scripts/azureiothub.test index 9616946a5..2c8a37d3d 100755 --- a/scripts/azureiothub.test +++ b/scripts/azureiothub.test @@ -5,8 +5,15 @@ # Check for application [ ! -x ./examples/azure/azureiothub ] && echo -e "\n\nAzureIotHub MQTT Client doesn't exist" && exit 1 -if test -n "$WOLFMQTT_NO_EXTERNAL_BROKER_TESTS"; then - echo "WOLFMQTT_NO_EXTERNAL_BROKER_TESTS set, won't run" +# Check for TLS support +has_tls=no +./examples/azure/azureiothub -? 2>&1 | grep -- 'Enable TLS' +if [ $? -eq 0 ]; then + has_tls=yes +fi + +if test -n "$WOLFMQTT_NO_EXTERNAL_BROKER_TESTS" && test $has_tls == yes; then + echo "WOLFMQTT_NO_EXTERNAL_BROKER_TESTS set or no TLS, won't run" else # Use short timeout here, since we can't get a publish response to complete test # So use the timeout and ping response to complete test diff --git a/scripts/client.test b/scripts/client.test index 482427790..e86752c26 100755 --- a/scripts/client.test +++ b/scripts/client.test @@ -19,36 +19,67 @@ do_cleanup() { fi } +generate_port() { # function to produce a random port number + if [[ "$OSTYPE" == "linux"* ]]; then + port=$(($(od -An -N2 /dev/urandom) % (65535-49152) + 49152)) + elif [[ "$OSTYPE" == "darwin"* ]]; then + port=$(($(od -An -N2 /dev/random) % (65535-49152) + 49152)) + else + echo "Unknown OS TYPE" + exit 1 + fi + echo -e "Using port $port" +} + + # Check for application [ ! -x ./examples/mqttclient/mqttclient ] && echo -e "\n\nMQTT Client doesn't exist" && exit 1 +# Check for TLS support +has_tls=no +./examples/mqttclient/mqttclient -? 2>&1 | grep -- 'Enable TLS' +if [ $? -eq 0 ]; then + has_tls=yes +fi + def_args="-T -C 2000" # Check for mosquitto if command -v mosquitto then - # bwrap only if using a local mosquitto instance - if [ "${AM_BWRAPPED-}" != "yes" ]; then - bwrap_path="$(command -v bwrap)" - if [ -n "$bwrap_path" ]; then - echo "Client test using bwrap" + bwrap_path="$(command -v bwrap)" + if [ -n "$bwrap_path" ]; then + # bwrap only if using a local mosquitto instance + if [ "${AM_BWRAPPED-}" != "yes" ]; then + echo "Using bwrap" export AM_BWRAPPED=yes exec "$bwrap_path" --unshare-net --dev-bind / / "$0" "$@" fi + unset AM_BWRAPPED + + broker_args="-c scripts/broker_test/mosquitto.conf" + port=11883 + else + # mosquitto broker custom port non-TLS only + has_tls=no + generate_port + broker_args="-p $port" fi - # Run mosquitto broker - mosquitto -c scripts/broker_test/mosquitto.conf & + mosquitto $broker_args & broker_pid=$! echo "Broker PID is $broker_pid" + sleep 0.1 + def_args="${def_args} -h localhost" tls_port_args="-p 18883" - port_args="-p 11883" + port_args="-p ${port}" mutual_auth_args="-c certs/client-cert.pem -K certs/client-key.pem" ecc_mutual_auth_args="-c certs/client-ecc-cert.pem -K certs/ecc-client-key.pem" fi -# Run with and without TLS and QoS 0-2 +echo -e "Base args: $def_args $port_args" +# Run without TLS and QoS 0-2 ./examples/mqttclient/mqttclient $def_args $port_args -q 0 $1 RESULT=$? [ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=Off, QoS=0" && do_cleanup "-1" @@ -61,29 +92,33 @@ RESULT=$? RESULT=$? [ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=Off, QoS=2" && do_cleanup "-1" -./examples/mqttclient/mqttclient $def_args $tls_port_args -t -q 0 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=0" && do_cleanup "-1" - -./examples/mqttclient/mqttclient $def_args $tls_port_args -t -q 1 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=1" && do_cleanup "-1" - -./examples/mqttclient/mqttclient $def_args $tls_port_args -t -q 2 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=2" && do_cleanup "-1" - -./examples/mqttclient/mqttclient $def_args $mutual_auth_args $tls_port_args -t -q 0 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=0, RSA mutual auth" && do_cleanup "-1" - -./examples/mqttclient/mqttclient $def_args $ecc_mutual_auth_args $tls_port_args -t -q 0 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=0, ECC mutual auth" && do_cleanup "-1" +if test $has_tls == yes +then + # Run with TLS and QoS 0-2 + ./examples/mqttclient/mqttclient $def_args $tls_port_args -t -q 0 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=0" && do_cleanup "-1" + + ./examples/mqttclient/mqttclient $def_args $tls_port_args -t -q 1 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=1" && do_cleanup "-1" + + ./examples/mqttclient/mqttclient $def_args $tls_port_args -t -q 2 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=2" && do_cleanup "-1" + + ./examples/mqttclient/mqttclient $def_args $mutual_auth_args $tls_port_args -t -q 0 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=0, RSA mutual auth" && do_cleanup "-1" + + ./examples/mqttclient/mqttclient $def_args $ecc_mutual_auth_args $tls_port_args -t -q 0 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nMQTT Client failed! TLS=On, QoS=0, ECC mutual auth" && do_cleanup "-1" +fi # End broker do_cleanup "0" - + echo -e "\n\nMQTT Client Tests Passed" exit 0 diff --git a/scripts/firmware.test b/scripts/firmware.test index 9284a8e78..13d743b9e 100755 --- a/scripts/firmware.test +++ b/scripts/firmware.test @@ -6,10 +6,8 @@ no_pid=-1 broker_pid=$no_pid do_cleanup() { - if [ $ENABLE_MQTT_TLS -ne 1 ]; then - # Delete file - rm $fileout - fi + # Delete file + rm $fileout if [ $broker_pid != $no_pid ] then @@ -24,35 +22,75 @@ do_cleanup() { fi } +generate_port() { # function to produce a random port number + if [[ "$OSTYPE" == "linux"* ]]; then + port=$(($(od -An -N2 /dev/urandom) % (65535-49152) + 49152)) + elif [[ "$OSTYPE" == "darwin"* ]]; then + port=$(($(od -An -N2 /dev/random) % (65535-49152) + 49152)) + else + echo "Unknown OS TYPE" + exit 1 + fi + echo -e "Using port $port" +} + + # Check for application [ ! -x ./examples/firmware/fwpush ] && echo -e "\n\nMQTT Example fwpush doesn't exist" && exit 1 [ ! -x ./examples/firmware/fwclient ] && echo -e "\n\nMQTT Example fwclient doesn't exist" && exit 1 -def_args="-t -T -C 5000 -n wolfMQTT/example/firmware_$((RANDOM))" +# Check for TLS support +has_tls=no +./examples/mqttclient/mqttclient -? 2>&1 | grep -- 'Enable TLS' +if [ $? -eq 0 ]; then + has_tls=yes +fi + +def_args="-T -C 5000 -n wolfMQTT/example/firmware_$((RANDOM))" filein=./examples/publish.dat fileout=./examples/publish.dat.trs + # Check for mosquitto if command -v mosquitto then - # bwrap only if using a local mosquitto instance - if [ "${AM_BWRAPPED-}" != "yes" ]; then - bwrap_path="$(command -v bwrap)" - if [ -n "$bwrap_path" ]; then - echo "Firmware test using bwrap" + bwrap_path="$(command -v bwrap)" + if [ -n "$bwrap_path" ]; then + # bwrap only if using a local mosquitto instance + if [ "${AM_BWRAPPED-}" != "yes" ]; then + echo "Using bwrap" export AM_BWRAPPED=yes exec "$bwrap_path" --unshare-net --dev-bind / / "$0" "$@" fi + unset AM_BWRAPPED + + broker_args="-c scripts/broker_test/mosquitto.conf" + if test $has_tls == yes + then + port=18883 + else + port=11883 + fi + else + # mosquitto broker custom port non-TLS only + has_tls=no + generate_port + broker_args="-p $port" fi - # Run mosquitto broker - mosquitto -c scripts/broker_test/mosquitto.conf & + mosquitto $broker_args & broker_pid=$! echo "Broker PID is $broker_pid" - def_args="${def_args} -h localhost -p 18883" + sleep 0.1 + + def_args="${def_args} -h localhost -p ${port}" fi -grep -F -e 'ENABLE_MQTT_TLS' ./wolfmqtt/options.h -ENABLE_MQTT_TLS=$? +if test $has_tls == yes +then + def_args="${def_args} -t" +fi + +echo -e "Base args: $def_args" # Start firmware client ./examples/firmware/fwclient $def_args -f $fileout $1 & @@ -70,13 +108,11 @@ server_result=$? # give some time for the client complete sleep 0.5 -if [ $ENABLE_MQTT_TLS -ne 1 ]; then - # Compare files - echo "Comparing files" - md5sum -b $filein $fileout - compare_result=$? - [ $compare_result -ne 0 ] && echo -e "\n\nMQTT Example firmware compare failed!" && do_cleanup "-1" -fi +# Compare files +echo "Comparing files" +md5sum -b $filein $fileout +compare_result=$? +[ $compare_result -ne 0 ] && echo -e "\n\nMQTT Example firmware compare failed!" && do_cleanup "-1" # End broker do_cleanup "0" diff --git a/scripts/multithread.test b/scripts/multithread.test index 72f671df1..0f382e717 100755 --- a/scripts/multithread.test +++ b/scripts/multithread.test @@ -19,34 +19,64 @@ do_cleanup() { fi } +generate_port() { # function to produce a random port number + if [[ "$OSTYPE" == "linux"* ]]; then + port=$(($(od -An -N2 /dev/urandom) % (65535-49152) + 49152)) + elif [[ "$OSTYPE" == "darwin"* ]]; then + port=$(($(od -An -N2 /dev/random) % (65535-49152) + 49152)) + else + echo "Unknown OS TYPE" + exit 1 + fi + echo -e "Using port $port" +} + # Check for application [ ! -x ./examples/multithread/multithread ] && echo -e "\n\nMultithread Client doesn't exist" && exit 1 +# Check for TLS support +has_tls=no +./examples/multithread/multithread -? 2>&1 | grep -- 'Enable TLS' +if [ $? -eq 0 ]; then + has_tls=yes +fi + def_args="-T -C 2000" # Check for mosquitto if command -v mosquitto then - # bwrap only if using a local mosquitto instance - if [ "${AM_BWRAPPED-}" != "yes" ]; then - bwrap_path="$(command -v bwrap)" - if [ -n "$bwrap_path" ]; then - echo "multithread test using bwrap" + bwrap_path="$(command -v bwrap)" + if [ -n "$bwrap_path" ]; then + # bwrap only if using a local mosquitto instance + if [ "${AM_BWRAPPED-}" != "yes" ]; then + echo "Using bwrap" export AM_BWRAPPED=yes exec "$bwrap_path" --unshare-net --dev-bind / / "$0" "$@" fi + unset AM_BWRAPPED + + broker_args="-c scripts/broker_test/mosquitto.conf" + port=11883 + else + # mosquitto broker custom port non-TLS only + has_tls=no + generate_port + broker_args="-p $port" fi - # Run mosquitto broker - mosquitto -c scripts/broker_test/mosquitto.conf & + mosquitto $broker_args & broker_pid=$! echo "Broker PID is $broker_pid" + sleep 0.1 + def_args="${def_args} -h localhost" tls_port_args="-p 18883" - port_args="-p 11883" + port_args="-p ${port}" fi -# Run with and without TLS and QoS 0-2 +echo -e "Base args: $def_args $port_args" +# Run without TLS and QoS 0-2 ./examples/multithread/multithread $def_args $port_args -q 0 $1 RESULT=$? [ $RESULT -ne 0 ] && echo -e "\n\nMultithread Client failed! TLS=Off, QoS=0" && do_cleanup "-1" @@ -59,21 +89,25 @@ RESULT=$? RESULT=$? [ $RESULT -ne 0 ] && echo -e "\n\nMultithread Client failed! TLS=Off, QoS=2" && do_cleanup "-1" -./examples/multithread/multithread $def_args $tls_port_args -t -q 0 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nMultithread Client failed! TLS=On, QoS=0" && do_cleanup "-1" - -./examples/multithread/multithread $def_args $tls_port_args -t -q 1 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nMultithread Client failed! TLS=On, QoS=1" && do_cleanup "-1" - -./examples/multithread/multithread $def_args $tls_port_args -t -q 2 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nMultithread Client failed! TLS=On, QoS=2" && do_cleanup "-1" +if test $has_tls == yes +then + # Run with TLS and QoS 0-2 + ./examples/multithread/multithread $def_args $tls_port_args -t -q 0 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nMultithread Client failed! TLS=On, QoS=0" && do_cleanup "-1" + + ./examples/multithread/multithread $def_args $tls_port_args -t -q 1 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nMultithread Client failed! TLS=On, QoS=1" && do_cleanup "-1" + + ./examples/multithread/multithread $def_args $tls_port_args -t -q 2 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nMultithread Client failed! TLS=On, QoS=2" && do_cleanup "-1" +fi # End broker do_cleanup "0" - + echo -e "\n\nMultithread MQTT Client Tests Passed" exit 0 diff --git a/scripts/nbclient.test b/scripts/nbclient.test index 8d760d083..9c15f786b 100755 --- a/scripts/nbclient.test +++ b/scripts/nbclient.test @@ -19,9 +19,28 @@ do_cleanup() { fi } +generate_port() { # function to produce a random port number + if [[ "$OSTYPE" == "linux"* ]]; then + port=$(($(od -An -N2 /dev/urandom) % (65535-49152) + 49152)) + elif [[ "$OSTYPE" == "darwin"* ]]; then + port=$(($(od -An -N2 /dev/random) % (65535-49152) + 49152)) + else + echo "Unknown OS TYPE" + exit 1 + fi + echo -e "Using port $port" +} + # Check for application [ ! -x ./examples/nbclient/nbclient ] && echo -e "\n\nNon-blocking Client doesn't exist" && exit 1 +# Check for TLS support +has_tls=no +./examples/multithread/multithread -? 2>&1 | grep -- 'Enable TLS' +if [ $? -eq 0 ]; then + has_tls=yes +fi + # Use minimum of 2 seconds # The check timout will sometimes incorrectly trigger if 1 second is used def_args="-T -C 2000" @@ -29,26 +48,37 @@ def_args="-T -C 2000" # Check for mosquitto if command -v mosquitto then - # bwrap only if using a local mosquitto instance - if [ "${AM_BWRAPPED-}" != "yes" ]; then - bwrap_path="$(command -v bwrap)" - if [ -n "$bwrap_path" ]; then - echo "nbclient test using bwrap" + bwrap_path="$(command -v bwrap)" + if [ -n "$bwrap_path" ]; then + # bwrap only if using a local mosquitto instance + if [ "${AM_BWRAPPED-}" != "yes" ]; then + echo "Using bwrap" export AM_BWRAPPED=yes exec "$bwrap_path" --unshare-net --dev-bind / / "$0" "$@" fi + unset AM_BWRAPPED + + broker_args="-c scripts/broker_test/mosquitto.conf" + port=11883 + else + # mosquitto broker custom port non-TLS only + has_tls=no + generate_port + broker_args="-p $port" fi - # Run mosquitto broker - mosquitto -c scripts/broker_test/mosquitto.conf & + mosquitto $broker_args & broker_pid=$! echo "Broker PID is $broker_pid" + sleep 0.1 + def_args="${def_args} -h localhost" tls_port_args="-p 18883" - port_args="-p 11883" + port_args="-p ${port}" fi -# Run with and without TLS and QoS 0-2 +echo -e "Base args: $def_args $port_args" +# Run without TLS and QoS 0-2 ./examples/nbclient/nbclient $def_args $port_args -q 0 $1 RESULT=$? [ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=Off, QoS=0" && do_cleanup "-1" @@ -61,21 +91,25 @@ RESULT=$? RESULT=$? [ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=Off, QoS=2" && do_cleanup "-1" -./examples/nbclient/nbclient $def_args $tls_port_args -t -q 0 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=On, QoS=0" && do_cleanup "-1" - -./examples/nbclient/nbclient $def_args $tls_port_args -t -q 1 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=On, QoS=1" && do_cleanup "-1" - -./examples/nbclient/nbclient $def_args $tls_port_args -t -q 2 $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=On, QoS=2" && do_cleanup "-1" +if test $has_tls == yes +then + # Run with TLS and QoS 0-2 + ./examples/nbclient/nbclient $def_args $tls_port_args -t -q 0 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=On, QoS=0" && do_cleanup "-1" + + ./examples/nbclient/nbclient $def_args $tls_port_args -t -q 1 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=On, QoS=1" && do_cleanup "-1" + + ./examples/nbclient/nbclient $def_args $tls_port_args -t -q 2 $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=On, QoS=2" && do_cleanup "-1" +fi # End broker do_cleanup "0" - + echo -e "\n\nNon-blocking MQTT Client Tests Passed" exit 0 diff --git a/scripts/wiot.test b/scripts/wiot.test index 9d1e9493b..0d14e930a 100755 --- a/scripts/wiot.test +++ b/scripts/wiot.test @@ -7,14 +7,25 @@ # Check for application [ ! -x ./examples/wiot/wiot ] && echo -e "\n\nWatson IoT MQTT Client doesn't exist" && exit 1 +# Check for TLS support +has_tls=no +./examples/azure/azureiothub -? 2>&1 | grep -- 'Enable TLS' +if [ $? -eq 0 ]; then + has_tls=yes +fi + def_args="-T -C 2000" -# Run +if test -n "$WOLFMQTT_NO_EXTERNAL_BROKER_TESTS" && test $has_tls == yes; then + echo "WOLFMQTT_NO_EXTERNAL_BROKER_TESTS set or no TLS, won't run" +else + # Run -./examples/wiot/wiot $def_args $1 -RESULT=$? -[ $RESULT -ne 0 ] && echo -e "\n\nWatson IoT MQTT Client failed! TLS=On, QoS=0" && exit 1 + ./examples/wiot/wiot $def_args $1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\nWatson IoT MQTT Client failed! TLS=On, QoS=0" && exit 1 -echo -e "\n\nWatson IoT MQTT Client Tests Passed" + echo -e "\n\nWatson IoT MQTT Client Tests Passed" +fi exit 0 diff --git a/src/mqtt_client.c b/src/mqtt_client.c index de2bc6c57..f284b0207 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -31,6 +31,10 @@ /* forward declarations */ static int MqttClient_Publish_ReadPayload(MqttClient* client, MqttPublish* publish, int timeout_ms); +#if !defined(WOLFMQTT_MULTITHREAD) && !defined(WOLFMQTT_NONBLOCK) +static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg); +#endif + #ifdef WOLFMQTT_MULTITHREAD @@ -49,7 +53,7 @@ static int MqttClient_Publish_ReadPayload(MqttClient* client, int wm_SemInit(wm_Sem *s){ /* dispatch_release() fails hard, with Trace/BPT trap signal, if the * sem's internal count is less than the value passed in with - * dispatch_semaphore_create(). work around this by initing + * dispatch_semaphore_create(). work around this by initializing * with 0, then incrementing it afterwards. */ s->sem = dispatch_semaphore_create(0); @@ -157,8 +161,110 @@ static int MqttClient_Publish_ReadPayload(MqttClient* client, return 0; } +#endif /* MUTEX */ +#endif /* WOLFMQTT_MULTITHREAD */ + +static int MqttWriteStart(MqttClient* client, MqttMsgStat* stat) +{ + int rc = MQTT_CODE_SUCCESS; + +#ifdef WOLFMQTT_MULTITHREAD + #ifdef WOLFMQTT_DEBUG_CLIENT + if (stat->isWriteActive) { + MQTT_TRACE_MSG("Warning, send already locked!"); + rc = MQTT_CODE_ERROR_SYSTEM; + } + if (rc != 0) { + return rc; + } + #endif /* WOLFMQTT_DEBUG_CLIENT */ + + rc = wm_SemLock(&client->lockSend); +#endif /* WOLFMQTT_MULTITHREAD */ + if (rc == 0) { + stat->isWriteActive = 1; + MQTT_TRACE_MSG("lockSend"); + } + + (void)client; + + return rc; +} +static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat) +{ +#ifdef WOLFMQTT_DEBUG_CLIENT + if (!stat->isWriteActive) { + MQTT_TRACE_MSG("Warning, send not locked!"); + return; + } #endif + /* reset write */ + XMEMSET(&client->write, 0, sizeof(client->write)); + + if (stat->isWriteActive) { + MQTT_TRACE_MSG("unlockSend"); + stat->isWriteActive = 0; + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockSend); + #endif + } +} + +static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) +{ + int rc = MQTT_CODE_SUCCESS; + +#ifdef WOLFMQTT_MULTITHREAD + #ifdef WOLFMQTT_DEBUG_CLIENT + if (stat->isReadActive) { + MQTT_TRACE_MSG("Warning, recv already locked!"); + rc = MQTT_CODE_ERROR_SYSTEM; + } + /* detect if a write is already in progress */ + if (wm_SemLock(&client->lockClient) == 0) { + if (client->read.total > 0) { + MQTT_TRACE_MSG("Partial read in progress!"); + rc = MQTT_CODE_CONTINUE; /* can't read yet */ + } + wm_SemUnlock(&client->lockClient); + } + if (rc != 0) + return rc; + #endif /* WOLFMQTT_DEBUG_CLIENT */ + + rc = wm_SemLock(&client->lockRecv); +#endif /* WOLFMQTT_MULTITHREAD */ + if (rc == 0) { + stat->isReadActive = 1; + MQTT_TRACE_MSG("lockRecv"); + } + (void)client; + return rc; +} +static void MqttReadStop(MqttClient* client, MqttMsgStat* stat) +{ +#ifdef WOLFMQTT_DEBUG_CLIENT + if (!stat->isReadActive) { + MQTT_TRACE_MSG("Warning, recv not locked!"); + return; + } +#endif + + /* reset read */ + XMEMSET(&client->read, 0, sizeof(client->read)); + + if (stat->isReadActive) { + stat->isReadActive = 0; + MQTT_TRACE_MSG("unlockRecv"); + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockRecv); + #endif + } +} + +#ifdef WOLFMQTT_MULTITHREAD + /* These RespList functions assume caller has locked client->lockClient mutex */ int MqttClient_RespList_Add(MqttClient *client, MqttPacketType packet_type, word16 packet_id, MqttPendResp *newResp, @@ -205,7 +311,7 @@ int MqttClient_RespList_Add(MqttClient *client, client->lastPendResp->next = newResp; client->lastPendResp = newResp; } - return 0; + return MQTT_CODE_SUCCESS; } void MqttClient_RespList_Remove(MqttClient *client, MqttPendResp *rmResp) @@ -252,6 +358,7 @@ void MqttClient_RespList_Remove(MqttClient *client, MqttPendResp *rmResp) #endif } +/* return codes: 0=not found, 1=found */ int MqttClient_RespList_Find(MqttClient *client, MqttPacketType packet_type, word16 packet_id, MqttPendResp **retResp) { @@ -954,21 +1061,14 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, #ifdef WOLFMQTT_MULTITHREAD /* Check to see if packet type and id have already completed */ rc = MqttClient_CheckPendResp(client, wait_type, wait_packet_id); - if (rc != MQTT_CODE_ERROR_NOT_FOUND - && rc != MQTT_CODE_CONTINUE - ) { + if (rc != MQTT_CODE_ERROR_NOT_FOUND && rc != MQTT_CODE_CONTINUE) { return rc; } + #endif - /* Lock recv socket mutex */ - rc = wm_SemLock(&client->lockRecv); - if (rc != 0) { - PRINTF("MqttClient_WaitType: recv lock error!"); + if ((rc = MqttReadStart(client, mms_stat)) != 0) { return rc; } - mms_stat->isReadLocked = 1; - MQTT_TRACE_MSG("lockRecv"); - #endif /* reset the packet state used by MqttPacket_Read */ client->packet.stat = MQTT_PK_BEGIN; @@ -987,7 +1087,8 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, if (rc <= 0) { #ifdef WOLFMQTT_NONBLOCK if (rc == MQTT_CODE_CONTINUE && - client->packet.stat > MQTT_PK_BEGIN) { + (client->packet.stat > MQTT_PK_BEGIN || + client->read.total > 0)) { /* advance state, since we received some data */ mms_stat->read = MQTT_MSG_HEADER; } @@ -1037,7 +1138,8 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, #ifdef WOLFMQTT_DEBUG_CLIENT PRINTF("Using INCOMING packet_obj %p", use_packet_obj); #endif - if (packet_type == wait_type || wait_type == MQTT_PACKET_TYPE_ANY) { + if (packet_type == wait_type || + wait_type == MQTT_PACKET_TYPE_ANY) { /* Only stop waiting when matched or waiting for "any" */ waitMatchFound = 1; } @@ -1147,14 +1249,8 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, mms_stat->read = MQTT_MSG_BEGIN; } - #ifdef WOLFMQTT_MULTITHREAD - /* release read lock, done reading */ - if (mms_stat->isReadLocked) { - mms_stat->isReadLocked = 0; - MQTT_TRACE_MSG("unlockRecv"); - wm_SemUnlock(&client->lockRecv); - } - #endif + /* done reading */ + MqttReadStop(client, mms_stat); /* if error, leave */ if (rc != MQTT_CODE_SUCCESS) { @@ -1166,13 +1262,10 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, break; } - #ifdef WOLFMQTT_MULTITHREAD - /* Lock send socket mutex */ - rc = wm_SemLock(&client->lockSend); - if (rc != 0) break; - mms_stat->isWriteLocked = 1; - MQTT_TRACE_MSG("lockSend"); - #endif + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, mms_stat)) != 0) { + break; + } /* setup ACK in shared context */ XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp)); @@ -1180,7 +1273,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, client->packetAck.protocol_level = client->protocol_level; #endif - mms_stat->write = MQTT_MSG_ACK; + mms_stat->ack = MQTT_MSG_ACK; break; } @@ -1200,7 +1293,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, } } /* switch (mms_stat->read) */ - switch (mms_stat->write) + switch (mms_stat->ack) { case MQTT_MSG_BEGIN: case MQTT_MSG_WAIT: @@ -1209,15 +1302,6 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, case MQTT_MSG_ACK: { - #ifdef WOLFMQTT_MULTITHREAD - if (!mms_stat->isWriteLocked) { - rc = wm_SemLock(&client->lockSend); - if (rc != 0) break; - mms_stat->isWriteLocked = 1; - MQTT_TRACE_MSG("lockSend"); - } - #endif - /* send ack */ rc = MqttEncode_PublishResp(client->tx_buf, client->tx_buf_len, client->packetAck.packet_type, &client->packetAck); @@ -1226,49 +1310,59 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, rc, MqttPacket_TypeDesc(client->packetAck.packet_type), client->packetAck.packet_type, client->packetAck.packet_id); #endif - if (rc > 0) { - client->write.len = rc; - - /* Send publish response packet */ - rc = MqttPacket_Write(client, client->tx_buf, - client->write.len); - #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE) { - /* keep send mutex locked and return to caller */ - return rc; - } - #endif - if (rc == client->write.len) { - rc = 0; /* success */ - } + if (rc < 0) { + MqttWriteStop(client, mms_stat); + break; } - mms_stat->write = MQTT_MSG_BEGIN; - #ifdef WOLFMQTT_MULTITHREAD - MQTT_TRACE_MSG("unlockSend"); - mms_stat->isWriteLocked = 0; - wm_SemUnlock(&client->lockSend); + client->write.len = rc; + /* Note: static analyzer complains about set, but not used here. + * Keeping it to ensure no future issues with rc > 0 */ + rc = MQTT_CODE_SUCCESS; + (void)rc; /* inhibit clang-analyzer-deadcode.DeadStores */ + + mms_stat->ack = MQTT_MSG_HEADER; + } + FALL_THROUGH; + + case MQTT_MSG_HEADER: + { + int xfer = client->write.len; + + /* Send publish response packet */ + rc = MqttPacket_Write(client, client->tx_buf, xfer); + #ifdef WOLFMQTT_NONBLOCK + if (rc == MQTT_CODE_CONTINUE) { + /* keep send mutex locked and return to caller */ + /* must keep send locked */ + return rc; + } #endif + MqttWriteStop(client, mms_stat); + if (rc == xfer) { + rc = MQTT_CODE_SUCCESS; /* success */ + } + + mms_stat->ack = MQTT_MSG_BEGIN; /* reset write state */ break; } case MQTT_MSG_AUTH: - case MQTT_MSG_HEADER: case MQTT_MSG_PAYLOAD: case MQTT_MSG_PAYLOAD2: default: #ifdef WOLFMQTT_DEBUG_CLIENT - PRINTF("MqttClient_WaitType: Invalid write state %d!", - mms_stat->write); + PRINTF("MqttClient_WaitType: Invalid ack state %d!", + mms_stat->ack); #endif rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_STAT); break; - } /* switch (mms_stat->write) */ + } /* switch (mms_stat->ack) */ #ifdef WOLFMQTT_DEBUG_CLIENT if (rc != MQTT_CODE_CONTINUE) { - PRINTF("MqttClient_WaitType: rc %d, state %d-%d", - rc, mms_stat->read, mms_stat->write); + PRINTF("MqttClient_WaitType: rc %d, state %d-%d-%d", + rc, mms_stat->read, mms_stat->write, mms_stat->ack); } #endif @@ -1284,18 +1378,20 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, } #endif -#ifdef WOLFMQTT_MULTITHREAD - if (mms_stat->isReadLocked) { - mms_stat->isReadLocked = 0; - MQTT_TRACE_MSG("unlockRecv"); - wm_SemUnlock(&client->lockRecv); - } -#endif + MqttReadStop(client, mms_stat); #ifdef WOLFMQTT_NONBLOCK #ifdef WOLFMQTT_DEBUG_CLIENT - client->lastRc = rc; + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + client->lastRc = rc; + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); #endif + } + #endif /* WOLFMQTT_DEBUG_CLIENT */ if (rc == MQTT_CODE_CONTINUE) { return rc; } @@ -1439,13 +1535,10 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) } if (mc_connect->stat.write == MQTT_MSG_BEGIN) { - #ifdef WOLFMQTT_MULTITHREAD - /* Lock send socket mutex */ - rc = wm_SemLock(&client->lockSend); - if (rc != 0) { + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, &mc_connect->stat)) != 0) { return rc; } - #endif #ifdef WOLFMQTT_V5 /* Use specified protocol version if set */ @@ -1460,9 +1553,7 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) MQTT_PACKET_TYPE_CONNECT, 0, 0); #endif if (rc <= 0) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - #endif + MqttWriteStop(client, &mc_connect->stat); return rc; } client->write.len = rc; @@ -1476,26 +1567,30 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) wm_SemUnlock(&client->lockClient); } if (rc != 0) { - wm_SemUnlock(&client->lockSend); + MqttWriteStop(client, &mc_connect->stat); return rc; /* Error locking client */ } #endif + mc_connect->stat.write = MQTT_MSG_HEADER; + } + if (mc_connect->stat.write == MQTT_MSG_HEADER) { + int xfer = client->write.len; + /* Send connect packet */ - rc = MqttPacket_Write(client, client->tx_buf, client->write.len); - if (rc != client->write.len) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - if (wm_SemLock(&client->lockClient) == 0) { - MqttClient_RespList_Remove(client, &mc_connect->pendResp); - wm_SemUnlock(&client->lockClient); - } - #endif + rc = MqttPacket_Write(client, client->tx_buf, xfer); + #ifdef WOLFMQTT_NONBLOCK + if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + /* keep send locked and return early */ return rc; } - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); #endif + MqttWriteStop(client, &mc_connect->stat); + if (rc != xfer) { + MqttClient_CancelMessage(client, (MqttObject*)mc_connect); + return rc; + } + #ifdef WOLFMQTT_V5 /* Enhanced authentication */ if (client->enable_eauth == 1) { @@ -1559,6 +1654,7 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) #endif return rc; } + mc_connect->stat.write = MQTT_MSG_WAIT; } #endif /* WOLFMQTT_V5 */ @@ -1635,15 +1731,6 @@ static int MqttClient_Publish_ReadPayload(MqttClient* client, /* make sure there is something to read */ if (msg_len > 0) { - #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testNbAlt = 0; - if (!testNbAlt) { - testNbAlt = 1; - return MQTT_CODE_CONTINUE; - } - testNbAlt = 0; - #endif - rc = MqttSocket_Read(client, client->rx_buf, msg_len, timeout_ms); if (rc < 0) { @@ -1847,15 +1934,10 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, { case MQTT_MSG_BEGIN: { - #ifdef WOLFMQTT_MULTITHREAD - /* Lock send socket mutex */ - rc = wm_SemLock(&client->lockSend); - if (rc != 0) { + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, &publish->stat)) != 0) { return rc; } - publish->stat.isWriteLocked = 1; - MQTT_TRACE_MSG("lockSend"); - #endif /* Encode the publish packet */ rc = MqttEncode_Publish(client->tx_buf, client->tx_buf_len, @@ -1868,11 +1950,7 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, publish->qos); #endif if (rc <= 0) { - #ifdef WOLFMQTT_MULTITHREAD - MQTT_TRACE_MSG("unlockSend"); - publish->stat.isWriteLocked = 0; - wm_SemUnlock(&client->lockSend); - #endif + MqttWriteStop(client, &publish->stat); return rc; } client->write.len = rc; @@ -1891,9 +1969,7 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, wm_SemUnlock(&client->lockClient); } if (rc != 0) { - MQTT_TRACE_MSG("unlockSend"); - publish->stat.isWriteLocked = 0; - wm_SemUnlock(&client->lockSend); + MqttWriteStop(client, &publish->stat); return rc; /* Error locking client */ } } @@ -1905,30 +1981,24 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, case MQTT_MSG_HEADER: { - /* Send packet */ - rc = MqttPacket_Write(client, client->tx_buf, client->write.len); + int xfer = client->write.len; + + /* Send publish packet */ + rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE) + if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) + /* keep send locked and return early */ return rc; #endif - if (rc < 0) { - #ifdef WOLFMQTT_MULTITHREAD - MQTT_TRACE_MSG("unlockSend"); - publish->stat.isWriteLocked = 0; - wm_SemUnlock(&client->lockSend); - #endif - #ifdef WOLFMQTT_MULTITHREAD - if (wm_SemLock(&client->lockClient) == 0) { - MqttClient_RespList_Remove(client, &publish->pendResp); - wm_SemUnlock(&client->lockClient); - } - #endif + client->write.len = 0; /* reset len, so publish chunk resets */ + + /* if failure or no data was written yet */ + if (rc != xfer) { + MqttWriteStop(client, &publish->stat); + MqttClient_CancelMessage(client, (MqttObject*)publish); return rc; } - /* reset client->write.len */ - client->write.len = 0; - /* advance state */ publish->stat.write = MQTT_MSG_PAYLOAD; } @@ -1941,19 +2011,9 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE) return rc; #endif - #ifdef WOLFMQTT_MULTITHREAD - MQTT_TRACE_MSG("unlockSend"); - publish->stat.isWriteLocked = 0; - wm_SemUnlock(&client->lockSend); - #endif - + MqttWriteStop(client, &publish->stat); if (rc < 0) { - #ifdef WOLFMQTT_MULTITHREAD - if (wm_SemLock(&client->lockClient) == 0) { - MqttClient_RespList_Remove(client, &publish->pendResp); - wm_SemUnlock(&client->lockClient); - } - #endif + MqttClient_CancelMessage(client, (MqttObject*)publish); break; } @@ -1976,9 +2036,16 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, #ifdef WOLFMQTT_MULTITHREAD if (writeOnly) { - /* another thread will handle the wait type */ + /* another thread will handle response */ + /* check if response already received from other thread */ rc = MqttClient_CheckPendResp(client, resp_type, publish->packet_id); + #ifndef WOLFMQTT_NONBLOCK + if (rc == MQTT_CODE_CONTINUE) { + /* mark success, let other thread handle response */ + rc = MQTT_CODE_SUCCESS; + } + #endif } else #endif @@ -1989,6 +2056,7 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, rc = MqttClient_WaitType(client, &publish->resp, resp_type, publish->packet_id, client->cmd_timeout_ms); } + #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) if (rc == MQTT_CODE_CONTINUE) break; @@ -2066,13 +2134,10 @@ int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe) #endif if (subscribe->stat.write == MQTT_MSG_BEGIN) { - #ifdef WOLFMQTT_MULTITHREAD - /* Lock send socket mutex */ - rc = wm_SemLock(&client->lockSend); - if (rc != 0) { + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, &subscribe->stat)) != 0) { return rc; } - #endif /* Encode the subscribe packet */ rc = MqttEncode_Subscribe(client->tx_buf, client->tx_buf_len, @@ -2083,9 +2148,7 @@ int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe) MQTT_PACKET_TYPE_SUBSCRIBE, subscribe->packet_id); #endif if (rc <= 0) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - #endif + MqttWriteStop(client, &subscribe->stat); return rc; } client->write.len = rc; @@ -2099,26 +2162,29 @@ int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe) wm_SemUnlock(&client->lockClient); } if (rc != 0) { - wm_SemUnlock(&client->lockSend); + MqttWriteStop(client, &subscribe->stat); return rc; /* Error locking client */ } #endif + subscribe->stat.write = MQTT_MSG_HEADER; + } + if (subscribe->stat.write == MQTT_MSG_HEADER) { + int xfer = client->write.len; + /* Send subscribe packet */ - rc = MqttPacket_Write(client, client->tx_buf, client->write.len); - if (rc != client->write.len) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - if (wm_SemLock(&client->lockClient) == 0) { - MqttClient_RespList_Remove(client, &subscribe->pendResp); - wm_SemUnlock(&client->lockClient); - } - #endif + rc = MqttPacket_Write(client, client->tx_buf, xfer); + #ifdef WOLFMQTT_NONBLOCK + if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + /* keep send locked and return early */ return rc; } - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); #endif + MqttWriteStop(client, &subscribe->stat); + if (rc != xfer) { + MqttClient_CancelMessage(client, (MqttObject*)subscribe); + return rc; + } subscribe->stat.write = MQTT_MSG_WAIT; } @@ -2168,13 +2234,10 @@ int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe) #endif if (unsubscribe->stat.write == MQTT_MSG_BEGIN) { - #ifdef WOLFMQTT_MULTITHREAD - /* Lock send socket mutex */ - rc = wm_SemLock(&client->lockSend); - if (rc != 0) { + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, &unsubscribe->stat)) != 0) { return rc; } - #endif /* Encode the subscribe packet */ rc = MqttEncode_Unsubscribe(client->tx_buf, client->tx_buf_len, @@ -2185,9 +2248,7 @@ int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe) MQTT_PACKET_TYPE_UNSUBSCRIBE, unsubscribe->packet_id, 0); #endif if (rc <= 0) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - #endif + MqttWriteStop(client, &unsubscribe->stat); return rc; } client->write.len = rc; @@ -2202,26 +2263,29 @@ int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe) wm_SemUnlock(&client->lockClient); } if (rc != 0) { - wm_SemUnlock(&client->lockSend); /* Error locking client */ + MqttWriteStop(client, &unsubscribe->stat); return rc; } #endif + unsubscribe->stat.write = MQTT_MSG_HEADER; + } + if (unsubscribe->stat.write == MQTT_MSG_HEADER) { + int xfer = client->write.len; + /* Send unsubscribe packet */ - rc = MqttPacket_Write(client, client->tx_buf, client->write.len); - if (rc != client->write.len) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - if (wm_SemLock(&client->lockClient) == 0) { - MqttClient_RespList_Remove(client, &unsubscribe->pendResp); - wm_SemUnlock(&client->lockClient); - } - #endif + rc = MqttPacket_Write(client, client->tx_buf, xfer); + #ifdef WOLFMQTT_NONBLOCK + if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + /* keep send locked and return early */ return rc; } - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); #endif + MqttWriteStop(client, &unsubscribe->stat); + if (rc != xfer) { + MqttClient_CancelMessage(client, (MqttObject*)unsubscribe); + return rc; + } unsubscribe->stat.write = MQTT_MSG_WAIT; } @@ -2265,13 +2329,10 @@ int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping) } if (ping->stat.write == MQTT_MSG_BEGIN) { - #ifdef WOLFMQTT_MULTITHREAD - /* Lock send socket mutex */ - rc = wm_SemLock(&client->lockSend); - if (rc != 0) { + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, &ping->stat)) != 0) { return rc; } - #endif /* Encode the subscribe packet */ rc = MqttEncode_Ping(client->tx_buf, client->tx_buf_len, ping); @@ -2281,9 +2342,7 @@ int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping) MQTT_PACKET_TYPE_PING_REQ, 0, 0); #endif if (rc <= 0) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - #endif + MqttWriteStop(client, &ping->stat); return rc; } client->write.len = rc; @@ -2297,26 +2356,30 @@ int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping) wm_SemUnlock(&client->lockClient); } if (rc != 0) { - wm_SemUnlock(&client->lockSend); + MqttWriteStop(client, &ping->stat); return rc; /* Error locking client */ } #endif + ping->stat.write = MQTT_MSG_HEADER; + } + if (ping->stat.write == MQTT_MSG_HEADER) { + int xfer = client->write.len; + /* Send ping req packet */ - rc = MqttPacket_Write(client, client->tx_buf, client->write.len); - if (rc != client->write.len) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - if (wm_SemLock(&client->lockClient) == 0) { - MqttClient_RespList_Remove(client, &ping->pendResp); - wm_SemUnlock(&client->lockClient); - } - #endif + rc = MqttPacket_Write(client, client->tx_buf, xfer); + #ifdef WOLFMQTT_NONBLOCK + if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + /* keep send locked and return early */ return rc; } - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); #endif + MqttWriteStop(client, &ping->stat); + if (rc != xfer) { + MqttClient_CancelMessage(client, (MqttObject*)ping); + return rc; + } + ping->stat.write = MQTT_MSG_WAIT; } @@ -2351,67 +2414,82 @@ int MqttClient_Disconnect(MqttClient *client) return MqttClient_Disconnect_ex(client, NULL); } -int MqttClient_Disconnect_ex(MqttClient *client, MqttDisconnect *disconnect) +int MqttClient_Disconnect_ex(MqttClient *client, MqttDisconnect *p_disconnect) { - int rc; + int rc, xfer; + MqttDisconnect *disconnect = p_disconnect, lcl_disconnect; /* Validate required arguments */ if (client == NULL) { return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); } + if (disconnect == NULL) { + disconnect = &lcl_disconnect; + XMEMSET(disconnect, 0, sizeof(*disconnect)); + } -#ifdef WOLFMQTT_V5 - if (disconnect != NULL) { + if (disconnect->stat.write == MQTT_MSG_BEGIN) { + #ifdef WOLFMQTT_V5 /* Use specified protocol version if set */ disconnect->protocol_level = client->protocol_level; - } -#endif + #endif -#ifdef WOLFMQTT_MULTITHREAD - /* Lock send socket mutex */ - rc = wm_SemLock(&client->lockSend); - if (rc != 0) { - return rc; - } -#endif + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, &disconnect->stat)) != 0) { + return rc; + } - /* Encode the disconnect packet */ - rc = MqttEncode_Disconnect(client->tx_buf, client->tx_buf_len, disconnect); -#ifdef WOLFMQTT_DEBUG_CLIENT - PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d", - rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_DISCONNECT), - MQTT_PACKET_TYPE_DISCONNECT, 0, 0); -#endif - if (rc <= 0) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); + /* Encode the disconnect packet */ + rc = MqttEncode_Disconnect(client->tx_buf, client->tx_buf_len, + disconnect); + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d", + rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_DISCONNECT), + MQTT_PACKET_TYPE_DISCONNECT, 0, 0); #endif - return rc; + if (rc <= 0) { + MqttWriteStop(client, &disconnect->stat); + return rc; + } + client->write.len = rc; + + disconnect->stat.write = MQTT_MSG_HEADER; } - client->write.len = rc; /* Send disconnect packet */ - rc = MqttPacket_Write(client, client->tx_buf, client->write.len); - if (rc != client->write.len) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - #endif + xfer = client->write.len; + rc = MqttPacket_Write(client, client->tx_buf, xfer); +#ifdef WOLFMQTT_NONBLOCK + /* if disconnect context avail allow partial write in non-blocking mode */ + if (p_disconnect != NULL && + rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + /* keep send locked and return early */ return rc; } -#ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); #endif - - rc = MQTT_CODE_SUCCESS; + MqttWriteStop(client, &disconnect->stat); + if (rc == xfer) { + rc = MQTT_CODE_SUCCESS; + } #if defined(WOLFMQTT_DISCONNECT_CB) && defined(WOLFMQTT_USE_CB_ON_DISCONNECT) - /* Trigger disconnect callback */ - if (client->disconnect_cb) + /* Trigger disconnect callback - for intentional disconnect + * This callback may occur on a network failure during an intentional + * disconnect if the transport/socket is not setup yet. */ + if (client->disconnect_cb + #ifdef WOLFMQTT_NONBLOCK + && rc != MQTT_CODE_CONTINUE + #endif + ) { client->disconnect_cb(client, rc, client->disconnect_ctx); + } #endif /* No response for MQTT disconnect packet */ + /* reset state */ + disconnect->stat.write = MQTT_MSG_BEGIN; + return rc; } @@ -2426,13 +2504,10 @@ int MqttClient_Auth(MqttClient *client, MqttAuth* auth) } if (auth->stat.write == MQTT_MSG_BEGIN) { - #ifdef WOLFMQTT_MULTITHREAD - /* Lock send socket mutex */ - rc = wm_SemLock(&client->lockSend); - if (rc != 0) { + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, &auth->stat)) != 0) { return rc; } - #endif /* Encode the authentication packet */ rc = MqttEncode_Auth(client->tx_buf, client->tx_buf_len, auth); @@ -2442,9 +2517,7 @@ int MqttClient_Auth(MqttClient *client, MqttAuth* auth) MQTT_PACKET_TYPE_AUTH, 0, 0); #endif if (rc <= 0) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - #endif + MqttWriteStop(client, &auth->stat); return rc; } client->write.len = rc; @@ -2458,26 +2531,29 @@ int MqttClient_Auth(MqttClient *client, MqttAuth* auth) wm_SemUnlock(&client->lockClient); } if (rc != 0) { - wm_SemUnlock(&client->lockSend); + MqttWriteStop(client, &auth->stat); return rc; /* Error locking client */ } #endif + auth->stat.write = MQTT_MSG_HEADER; + } + if (auth->stat.write == MQTT_MSG_BEGIN) { + int xfer = client->write.len; + /* Send authentication packet */ - rc = MqttPacket_Write(client, client->tx_buf, client->write.len); - if (rc != client->write.len) { - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); - if (wm_SemLock(&client->lockClient) == 0) { - MqttClient_RespList_Remove(client, &auth->pendResp); - wm_SemUnlock(&client->lockClient); - } - #endif + rc = MqttPacket_Write(client, client->tx_buf, xfer); + #ifdef WOLFMQTT_NONBLOCK + if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + /* keep send locked and return early */ return rc; } - #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockSend); #endif + MqttWriteStop(client, &auth->stat); + if (rc != xfer) { + MqttClient_CancelMessage(client, (MqttObject*)auth); + return rc; + } auth->stat.write = MQTT_MSG_WAIT; } @@ -2528,20 +2604,21 @@ int MqttClient_WaitMessage(MqttClient *client, int timeout_ms) return MqttClient_WaitMessage_ex(client, &client->msg, timeout_ms); } -#if defined(WOLFMQTT_MULTITHREAD) || defined(WOLFMQTT_NONBLOCK) +#if !defined(WOLFMQTT_MULTITHREAD) && !defined(WOLFMQTT_NONBLOCK) +static +#endif int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg) { int rc = MQTT_CODE_SUCCESS; + MqttMsgStat* mms_stat; #ifdef WOLFMQTT_MULTITHREAD MqttPendResp* tmpResp; - MqttMsgStat* mms_stat; #endif if (client == NULL || msg == NULL) { return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); } -#ifdef WOLFMQTT_MULTITHREAD /* all packet type structures must have MqttMsgStat at top */ mms_stat = (MqttMsgStat*)msg; @@ -2549,7 +2626,12 @@ int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg) PRINTF("Cancel Msg: %p", msg); #endif - /* Find pending response entry and remove */ + /* reset states */ + mms_stat->write = MQTT_MSG_BEGIN; + mms_stat->read = MQTT_MSG_BEGIN; + +#ifdef WOLFMQTT_MULTITHREAD + /* Remove any pending responses expected */ rc = wm_SemLock(&client->lockClient); if (rc != MQTT_CODE_SUCCESS) { return rc; @@ -2581,27 +2663,26 @@ int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg) } } wm_SemUnlock(&client->lockClient); +#endif /* WOLFMQTT_MULTITHREAD */ - /* clear any locks */ - if (mms_stat->isReadLocked) { + /* cancel any active flags / locks */ + if (mms_stat->isReadActive) { #ifdef WOLFMQTT_DEBUG_CLIENT PRINTF("Cancel Read Lock"); #endif - mms_stat->isReadLocked = 0; - wm_SemUnlock(&client->lockRecv); + mms_stat->isReadActive = 0; + MqttReadStop(client, mms_stat); } - if (mms_stat->isWriteLocked) { + if (mms_stat->isWriteActive) { #ifdef WOLFMQTT_DEBUG_CLIENT PRINTF("Cancel Write Lock"); #endif - client->write.pos = 0; /* reset current write position */ - mms_stat->isWriteLocked = 0; - wm_SemUnlock(&client->lockSend); + mms_stat->isWriteActive = 0; + MqttWriteStop(client, mms_stat); } -#endif + return rc; } -#endif /* WOLFMQTT_MULTITHREAD || WOLFMQTT_NONBLOCK */ #ifdef WOLFMQTT_NONBLOCK static inline int IsMessageActive(MqttObject *msg) @@ -2664,9 +2745,19 @@ int MqttClient_NetDisconnect(MqttClient *client) /* Get client lock on to ensure no other threads are active */ wm_SemLock(&client->lockClient); +#ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("Net Disconnect: Removing pending responses"); +#endif for (tmpResp = client->firstPendResp; tmpResp != NULL; tmpResp = tmpResp->next) { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d", + tmpResp, tmpResp->packet_obj, + MqttPacket_TypeDesc(tmpResp->packet_type), + tmpResp->packet_type, tmpResp->packet_id, + tmpResp->packetProcessing, tmpResp->packetDone); + #endif MqttClient_RespList_Remove(client, tmpResp); } wm_SemUnlock(&client->lockClient); diff --git a/src/mqtt_packet.c b/src/mqtt_packet.c index cafa8e842..1b9fd0008 100644 --- a/src/mqtt_packet.c +++ b/src/mqtt_packet.c @@ -1963,7 +1963,6 @@ int MqttPacket_Read(MqttClient *client, byte* rx_buf, int rx_buf_len, { case MQTT_PK_BEGIN: { - client->read.pos = 0; client->packet.header_len = MQTT_PACKET_HEADER_MIN_SIZE; client->packet.remain_len = 0; diff --git a/src/mqtt_sn_client.c b/src/mqtt_sn_client.c index 40b7c20c4..1144d6362 100644 --- a/src/mqtt_sn_client.c +++ b/src/mqtt_sn_client.c @@ -506,7 +506,7 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj, PRINTF("SN_Client_WaitType recv lock error"); return rc; } - mms_stat->isReadLocked = 1; + mms_stat->isReadActive = 1; MQTT_TRACE_MSG("SN lockRecv"); #endif @@ -667,8 +667,8 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj, mms_stat->read = MQTT_MSG_BEGIN; #ifdef WOLFMQTT_MULTITHREAD - if (mms_stat->isReadLocked) { - mms_stat->isReadLocked = 0; + if (mms_stat->isReadActive) { + mms_stat->isReadActive = 0; wm_SemUnlock(&client->lockRecv); } #endif diff --git a/src/mqtt_socket.c b/src/mqtt_socket.c index f8888d254..299577e80 100644 --- a/src/mqtt_socket.c +++ b/src/mqtt_socket.c @@ -41,6 +41,12 @@ #ifdef WOLFMQTT_NO_STDIO #undef WOLFMQTT_DEBUG_SOCKET #endif + +/* #define WOLFMQTT_TEST_NONBLOCK */ +#ifdef WOLFMQTT_TEST_NONBLOCK + #define WOLFMQTT_TEST_NONBLOCK_TIMES 1 +#endif + /* lwip */ #ifdef WOLFSSL_LWIP #undef read @@ -125,8 +131,8 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) static int testNbWriteAlt = 0; - if (!testNbWriteAlt) { - testNbWriteAlt = 1; + if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { + testNbWriteAlt++; return MQTT_CODE_CONTINUE; } testNbWriteAlt = 0; @@ -167,9 +173,8 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) static int testSmallerWrite = 0; if (!testSmallerWrite) { - if (buf_len > 100) { + if (buf_len > 1) buf_len /= 2; - } testSmallerWrite = 1; } else { @@ -211,6 +216,7 @@ int MqttSocket_Write(MqttClient *client, const byte* buf, int buf_len, buf_len - client->write.pos, timeout_ms); if (rc >= 0) { client->write.pos += rc; + client->write.total += rc; if (client->write.pos < buf_len) { rc = MQTT_CODE_CONTINUE; } @@ -227,6 +233,7 @@ int MqttSocket_Write(MqttClient *client, const byte* buf, int buf_len, break; } client->write.pos += rc; + client->write.total += rc; } while (client->write.pos < buf_len); #endif /* WOLFMQTT_NONBLOCK */ @@ -247,8 +254,8 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) static int testNbReadAlt = 0; - if (!testNbReadAlt) { - testNbReadAlt = 1; + if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { + testNbReadAlt++; return MQTT_CODE_CONTINUE; } testNbReadAlt = 0; @@ -292,9 +299,8 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) static int testSmallerRead = 0; if (!testSmallerRead) { - if (buf_len > 100) { + if (buf_len > 1) buf_len /= 2; - } testSmallerRead = 1; } else { @@ -333,6 +339,7 @@ int MqttSocket_Read(MqttClient *client, byte* buf, int buf_len, int timeout_ms) buf_len - client->read.pos, timeout_ms); if (rc >= 0) { client->read.pos += rc; + client->read.total += rc; if (client->read.pos < buf_len) { rc = MQTT_CODE_CONTINUE; } @@ -349,6 +356,7 @@ int MqttSocket_Read(MqttClient *client, byte* buf, int buf_len, int timeout_ms) break; } client->read.pos += rc; + client->read.total += rc; } while (client->read.pos < buf_len); #endif /* WOLFMQTT_NONBLOCK */ diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index f381d2f8f..e8ad0299c 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -128,8 +128,9 @@ typedef struct _MqttPkRead { } MqttPkRead; typedef struct _MqttSk { - int pos; - int len; + int pos; /* position inside current buffer */ + int len; /* length of current segment being sent */ + int total; /* number bytes sent or received */ } MqttSk; #ifdef WOLFMQTT_DISCONNECT_CB diff --git a/wolfmqtt/mqtt_packet.h b/wolfmqtt/mqtt_packet.h index 4c4ba5d0c..efa2d46b5 100644 --- a/wolfmqtt/mqtt_packet.h +++ b/wolfmqtt/mqtt_packet.h @@ -291,11 +291,10 @@ typedef enum _MqttMsgState { typedef struct _MqttMsgStat { MqttMsgState read; MqttMsgState write; + MqttMsgState ack; -#ifdef WOLFMQTT_MULTITHREAD - byte isReadLocked:1; - byte isWriteLocked:1; -#endif + byte isReadActive:1; + byte isWriteActive:1; } MqttMsgStat; #ifdef WOLFMQTT_MULTITHREAD