diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index c7cda8855..0a0d853db 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -44,6 +44,7 @@ jobs: - name: Test debug run: make test + timeout-minutes: 15 env: BUILD_TYPE: Debug # Workaround for Windows as it seems the previous step is being ignored BUILD_TESTING: OFF # Workaround for Windows as it seems the previous step is being ignored diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 837e41199..fb780510d 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -45,6 +45,8 @@ if(UNIX) add_example(z_scout unix/c11/z_scout.c) add_example(z_ping unix/c11/z_ping.c) add_example(z_pong unix/c11/z_pong.c) + add_example(z_pub_thr unix/c11/z_pub_thr.c) + add_example(z_sub_thr unix/c11/z_sub_thr.c) endif() elseif(MSVC) add_example(z_put windows/z_put.c) diff --git a/examples/unix/c11/z_pub_thr.c b/examples/unix/c11/z_pub_thr.c new file mode 100644 index 000000000..d1379c43c --- /dev/null +++ b/examples/unix/c11/z_pub_thr.c @@ -0,0 +1,75 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#if Z_FEATURE_PUBLICATION == 1 +int main(int argc, char **argv) { + if (argc < 2) { + printf("USAGE:\n\tz_pub_thr []\n\n"); + exit(-1); + } + char *keyexpr = "test/thr"; + size_t len = atoi(argv[1]); + uint8_t *value = (uint8_t *)malloc(len); + memset(value, 1, len); + + // Set config + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(argv[2])) < 0) { + printf("Couldn't insert locator in config: %s\n", argv[2]); + exit(-1); + } + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare publisher + z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL); + if (!z_check(pub)) { + printf("Unable to declare publisher for key expression!\n"); + exit(-1); + } + + // Send packets + while (1) { + z_publisher_put(z_loan(pub), (const uint8_t *)value, len, NULL); + } + // Clean up + z_undeclare_publisher(z_move(pub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + free(value); + exit(0); +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this example requires it.\n"); + return -2; +} +#endif diff --git a/examples/unix/c11/z_sub_thr.c b/examples/unix/c11/z_sub_thr.c new file mode 100644 index 000000000..676d37d9c --- /dev/null +++ b/examples/unix/c11/z_sub_thr.c @@ -0,0 +1,121 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#define PACKET_NB 1000000 + +typedef struct { + volatile unsigned long count; + volatile unsigned long finished_rounds; + z_clock_t start; + z_clock_t first_start; +} z_stats_t; + +#if Z_FEATURE_SUBSCRIPTION == 1 + +z_stats_t *z_stats_make(void) { + z_stats_t *stats = malloc(sizeof(z_stats_t)); + stats->count = 0; + stats->finished_rounds = 0; + stats->first_start.tv_nsec = 0; + return stats; +} + +void on_sample(const z_sample_t *sample, void *context) { + (void)sample; + z_stats_t *stats = (z_stats_t *)context; + stats->count++; + // Start set measurement + if (stats->count == 1) { + stats->start = z_clock_now(); + if (stats->first_start.tv_nsec == 0) { + stats->first_start = stats->start; + } + } else if (stats->count >= PACKET_NB) { + // Stop set measurement + stats->finished_rounds++; + unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->start); + printf("Received %d msg in %lu ms (%.1f msg/s)\n", PACKET_NB, elapsed_ms, + (double)(PACKET_NB * 1000) / (double)elapsed_ms); + stats->count = 0; + } +} + +void drop_stats(void *context) { + z_stats_t *stats = (z_stats_t *)context; + unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->first_start); + const unsigned long sent_messages = PACKET_NB * stats->finished_rounds + stats->count; + printf("Stats after unsubscribing: received %ld messages over %lu miliseconds (%.1f msg/s)\n", sent_messages, + elapsed_ms, (double)(sent_messages * 1000) / (double)elapsed_ms); + free(context); +} + +int main(int argc, char **argv) { + char *keyexpr = "test/thr"; + z_owned_config_t config = z_config_default(); + + // Set config + if (argc > 1) { + if (zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(argv[1])) < 0) { + printf("Failed to insert locator in config: %s\n", argv[1]); + exit(-1); + } + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare Subscriber/resource + z_stats_t *context = z_stats_make(); + z_owned_closure_sample_t callback = z_closure(on_sample, drop_stats, (void *)context); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to create subscriber.\n"); + exit(-1); + } + // Listen until stopped + printf("Start listening.\n"); + char c = 0; + while (c != 'q') { + c = fgetc(stdin); + } + // Wait for everything to settle + printf("End of test\n"); + z_sleep_s(1); + // Clean up + z_undeclare_subscriber(z_move(sub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + exit(0); +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -2; +} +#endif diff --git a/tests/z_client_test.c b/tests/z_client_test.c index 22475a352..1058e5b31 100644 --- a/tests/z_client_test.c +++ b/tests/z_client_test.c @@ -25,6 +25,8 @@ #define MSG 1000 #define MSG_LEN 1024 +#define FRAGMENT_MSG_NB 100 +#define FRAGMENT_MSG_LEN 100000 #define QRY 100 #define QRY_CLT 10 #define SET 100 @@ -92,7 +94,7 @@ void data_handler(const z_sample_t *sample, void *arg) { printf(">> Received data: %s\t(%u/%u)\n", res, datas, total); z_owned_str_t k_str = z_keyexpr_to_string(sample->keyexpr); - assert(sample->payload.len == MSG_LEN); + assert((sample->payload.len == MSG_LEN) || (sample->payload.len == FRAGMENT_MSG_LEN)); assert(_z_str_eq(z_loan(k_str), res) == true); datas++; @@ -233,6 +235,37 @@ int main(int argc, char **argv) { z_sleep_s(SLEEP); + // Write fragment data from first session + if (is_reliable) { + z_free((uint8_t *)payload); + len = FRAGMENT_MSG_LEN; + payload = (uint8_t *)z_malloc(len); + memset(payload, 1, FRAGMENT_MSG_LEN); + + total = FRAGMENT_MSG_NB * SET; + for (unsigned int n = 0; n < FRAGMENT_MSG_NB; n++) { + for (unsigned int i = 0; i < SET; i++) { + z_put_options_t opt = z_put_options_default(); + opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + z_put(z_loan(s1), z_loan(rids1[i]), (const uint8_t *)payload, len, &opt); + printf("Wrote fragment data from session 1: %u %zu b\t(%u/%u)\n", z_loan(rids1[i])._id, len, + n * SET + (i + 1), total); + } + } + // Wait to receive all the data + now = z_clock_now(); + while (datas < total) { + assert(z_clock_elapsed_s(&now) < TIMEOUT); + printf("Waiting for fragment datas... %u/%u\n", datas, total); + z_sleep_s(SLEEP); + } + if (is_reliable == true) { + assert(datas == total); + } + datas = 0; + z_sleep_s(SLEEP); + } + // Query data from first session total = QRY * SET; for (unsigned int n = 0; n < QRY; n++) { diff --git a/tests/z_perf_rx.c b/tests/z_perf_rx.c index 7a67e153a..391bcd231 100644 --- a/tests/z_perf_rx.c +++ b/tests/z_perf_rx.c @@ -60,21 +60,30 @@ void on_sample(const z_sample_t *sample, void *context) { int main(int argc, char **argv) { char *keyexpr = "test/thr"; - const char *mode = "client"; + const char *mode = NULL; char *llocator = NULL; + char *clocator = NULL; (void)argv; - // Check if peer mode + // Check if peer or client mode if (argc > 1) { mode = "peer"; llocator = "udp/224.0.0.224:7447#iface=lo"; + } else { + mode = "client"; + clocator = "tcp/127.0.0.1:7447"; } // Set config z_owned_config_t config = z_config_default(); - zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (mode != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + } if (llocator != NULL) { zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); } + if (clocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); + } // Open session z_owned_session_t s = z_open(z_move(config)); if (!z_check(s)) { diff --git a/tests/z_perf_tx.c b/tests/z_perf_tx.c index 454d72de2..eac90d159 100644 --- a/tests/z_perf_tx.c +++ b/tests/z_perf_tx.c @@ -26,10 +26,7 @@ int send_packets(unsigned long pkt_len, z_owned_publisher_t *pub, uint8_t *value z_clock_t test_start = z_clock_now(); unsigned long elapsed_us = 0; while (elapsed_us < TEST_DURATION_US) { - if (z_publisher_put(z_loan(*pub), (const uint8_t *)value, pkt_len, NULL) != 0) { - printf("Put failed for pkt len: %lu\n", pkt_len); - return -1; - } + z_publisher_put(z_loan(*pub), (const uint8_t *)value, pkt_len, NULL); elapsed_us = z_clock_elapsed_us(&test_start); } return 0; @@ -41,21 +38,30 @@ int main(int argc, char **argv) { uint8_t *value = (uint8_t *)malloc(len_array[0]); memset(value, 1, len_array[0]); char *keyexpr = "test/thr"; - const char *mode = "client"; + const char *mode = NULL; char *llocator = NULL; + char *clocator = NULL; (void)argv; - // Check if peer mode + // Check if peer or client mode if (argc > 1) { mode = "peer"; llocator = "udp/224.0.0.224:7447#iface=lo"; + } else { + mode = "client"; + clocator = "tcp/127.0.0.1:7447"; } // Set config z_owned_config_t config = z_config_default(); - zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (mode != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + } if (llocator != NULL) { zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); } + if (clocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); + } // Open session z_owned_session_t s = z_open(z_move(config)); if (!z_check(s)) {